Python开发-035_多线程开发

并发编程其目的是为了提高程序的执行效率,而实现方式主要有三种进程,线程,协程

1 进程与线程

  • 在工厂中,一个厂房至少有一台流水线,生成物品是在流水线上进行,厂房可以为流水线提供资源,要提高效率,可以修建多个厂房,也可以一个厂房中修建多条流水线
  • 在Python程序中,一个进程最少有一条线程,运行程序是线程在运行,进程为线程提供资源,要提高效率,可以创建多个进程,也可以在一个进程中创建多个线程

即:

线程,是计算机中可以被cpu调度的最小单元(真正在工作)
进程,是计算机资源分配的最小单元(进程为线程提供资源)

一个进程中可以有多个线程,同一个进程中的线程可以共享此进程中的资源,多进程比多线程资源消耗多

以前我们开发的程序中所有的行为都只能通过串行的形式运行,排队逐一执行,前面未完成,后面也无法继续

import time
import requests

url_list = [
    ("无名之辈.mp4", "https://v95-hb.douyinvod.com/bd17d2f845b961e476e9807bf4fdc648/62dd091c/video/tos/cn/tos-cn-ve-15c001-alinc2/a4bd7e7c4d574b9ab5d212bba3a8d98d/?a=1128&ch=0&cr=0&dr=0&lr=xigua_aweme_play_suffix&cd=0%7C0%7C0%7C0&cv=1&br=1430&bt=1430&btag=80000&cs=0&ds=3&ft=blh3-IQQqUuIf_oZmo0OW_EklpPixBGm4uZ39eFZ0X3Xr12&mime_type=video_mp4&qs=0&rc=Njg2NWdlNGVlPDk3ODs1O0BpanRwZmU6ZjRuZTMzNGkzM0AtM2EuLmAtXzMxNC4tMC0zYSNjcS0vcjRnZDNgLS1kLTBzcw%3D%3D&l=2022072415532301020801622832BC4FA9"),
    ("铁板肥肠鱼头.mp4", "https://v11.douyinvod.com/d9b20a9a9c9983e9f4d92081edddab82/62dd0789/video/tos/cn/tos-cn-ve-15c001-alinc2/01ed7dea18654933820162f3bae4de5a/?a=1128&ch=0&cr=0&dr=0&cd=0%7C0%7C0%7C0&cv=1&br=2544&bt=2544&btag=80000&cs=0&ds=3&ft=blh3-IQQqUuIf_oZmo0OW_EklpPixBGd~uZ39eFZ0X3Xr12&mime_type=video_mp4&qs=0&rc=Z2czZWlmNWc1NTZnaTw2NkBpMzM1N2c6ZnVkZTMzNGkzM0AzYi5gXzItNl8xMTIuNl9fYSM1ZmQucjRnYWFgLS1kLS9zcw%3D%3D&l=2022072415462101020817416917B80729"),
    ("孝警阿特.mp4", "https://v95.douyinvod.com/5c79c95579fd0135a461fe138c3b2133/62dd0822/video/tos/cn/tos-cn-ve-15c001-alinc2/d182233e39c44ffcb5179076bbbc99ec/?a=1128&ch=0&cr=0&dr=0&cd=0%7C0%7C0%7C0&cv=1&br=2295&bt=2295&btag=80000&cs=0&ds=3&ft=blh3-IQQqUuIf_oZmo0OW_EklpPixBGiBuZ39eFZ0X3Xr12&mime_type=video_mp4&qs=0&rc=NGg1OWU1M2g7ZDQ1PDczNkBpamhwNDo6ZmY0ZTMzNGkzM0AxLmJfNTExNTMxM2FhMF42YSNrNWNfcjRvM2FgLS1kLS9zcw%3D%3D&l=2022072415475601020915816833A8E812")
]

print(time.time())
for file_name, url in url_list:
    res = requests.get(url)
    with open(file_name, mode='wb') as f:
        f.write(res.content)
    print(file_name, time.time())
    
    
'''46-49总共使用三秒左右
1658651146.3676732
无名之辈.mp4 1658651146.899162
铁板肥肠鱼头.mp4 1658651148.0246933
孝警阿特.mp4 1658651149.3370574
'''

通过 进程线程 都可以将 串行 的程序变为并发,对于上述示例来说就是同时下载三个视频,这样很短的时间内就可以下载完成

1.1 多线程

多线程的基础语法

import threading
def func(a1,a2,a3):
    pass
				# target = 被执行的函数名 , args=(函数的参数)
t = threading.Thread(target=func,args=(11,22,33))
t.start() # 启动多线程

基于多线程对上述下载的案例进行一下优化

import time
import requests
import threading # 多线程库


url_list = [
    ("无名之辈.mp4", "https://v95-hb.douyinvod.com/bd17d2f845b961e476e9807bf4fdc648/62dd091c/video/tos/cn/tos-cn-ve-15c001-alinc2/a4bd7e7c4d574b9ab5d212bba3a8d98d/?a=1128&ch=0&cr=0&dr=0&lr=xigua_aweme_play_suffix&cd=0%7C0%7C0%7C0&cv=1&br=1430&bt=1430&btag=80000&cs=0&ds=3&ft=blh3-IQQqUuIf_oZmo0OW_EklpPixBGm4uZ39eFZ0X3Xr12&mime_type=video_mp4&qs=0&rc=Njg2NWdlNGVlPDk3ODs1O0BpanRwZmU6ZjRuZTMzNGkzM0AtM2EuLmAtXzMxNC4tMC0zYSNjcS0vcjRnZDNgLS1kLTBzcw%3D%3D&l=2022072415532301020801622832BC4FA9"),
    ("铁板肥肠鱼头.mp4", "https://v11.douyinvod.com/d9b20a9a9c9983e9f4d92081edddab82/62dd0789/video/tos/cn/tos-cn-ve-15c001-alinc2/01ed7dea18654933820162f3bae4de5a/?a=1128&ch=0&cr=0&dr=0&cd=0%7C0%7C0%7C0&cv=1&br=2544&bt=2544&btag=80000&cs=0&ds=3&ft=blh3-IQQqUuIf_oZmo0OW_EklpPixBGd~uZ39eFZ0X3Xr12&mime_type=video_mp4&qs=0&rc=Z2czZWlmNWc1NTZnaTw2NkBpMzM1N2c6ZnVkZTMzNGkzM0AzYi5gXzItNl8xMTIuNl9fYSM1ZmQucjRnYWFgLS1kLS9zcw%3D%3D&l=2022072415462101020817416917B80729"),
    ("孝警阿特.mp4", "https://v95.douyinvod.com/5c79c95579fd0135a461fe138c3b2133/62dd0822/video/tos/cn/tos-cn-ve-15c001-alinc2/d182233e39c44ffcb5179076bbbc99ec/?a=1128&ch=0&cr=0&dr=0&cd=0%7C0%7C0%7C0&cv=1&br=2295&bt=2295&btag=80000&cs=0&ds=3&ft=blh3-IQQqUuIf_oZmo0OW_EklpPixBGiBuZ39eFZ0X3Xr12&mime_type=video_mp4&qs=0&rc=NGg1OWU1M2g7ZDQ1PDczNkBpamhwNDo6ZmY0ZTMzNGkzM0AxLmJfNTExNTMxM2FhMF42YSNrNWNfcjRvM2FgLS1kLS9zcw%3D%3D&l=2022072415475601020915816833A8E812")
]

print(time.time())

def task(file_name,url):
    # 将下载进行封装成函数
    res = requests.get(url)
    with open(file_name, mode='wb') as f:
        f.write(res.content)
    print(file_name, time.time())

for file_name, url in url_list:
    # 创建线程 让每个线程都去执行task函数,参数使用file_name 和 url
    t = threading.Thread(target=task,args=(file_name,url))
    t.start()
    
'''05-07 使用2秒,不过第一个和第二个视频是同一秒,第三个视频是因为太大了
1658651205.1446807
孝警阿特.mp4 1658651206.6937478
无名之辈.mp4 1658651206.7566335
铁板肥肠鱼头.mp4 1658651207.961741
'''

1.2 多进程

多进程的基础语法,语法结构来看,多线程和多进程类似

import multiprocessing
def func(a1,a2,a3):
    pass
if __name__ == '__main__':
    # target = 被执行的函数名 , args=(函数的参数)
    t = multiprocessing.Process(target=func,args=(11,22,33))
    # 在进程创建后,会自动在进程中创建一个线程
    t.start() # 启动多线程

注意:

对于内部在创建进程的时候,Linux的内部机制是基于fork,windows是基于spawn,mac系统是两个都支持,但python3.8默认设置spawn

# mac系统中设置机制为fork
multiprocessing.set_start_method('fork') # 放于文件头部

spawn模式创建进程要求代码放入if __name__ == '__main__'

多进程优化下载案例

import time
import requests
import multiprocessing

url_list = [
    ("无名之辈.mp4", "https://v95-hb.douyinvod.com/bd17d2f845b961e476e9807bf4fdc648/62dd091c/video/tos/cn/tos-cn-ve-15c001-alinc2/a4bd7e7c4d574b9ab5d212bba3a8d98d/?a=1128&ch=0&cr=0&dr=0&lr=xigua_aweme_play_suffix&cd=0%7C0%7C0%7C0&cv=1&br=1430&bt=1430&btag=80000&cs=0&ds=3&ft=blh3-IQQqUuIf_oZmo0OW_EklpPixBGm4uZ39eFZ0X3Xr12&mime_type=video_mp4&qs=0&rc=Njg2NWdlNGVlPDk3ODs1O0BpanRwZmU6ZjRuZTMzNGkzM0AtM2EuLmAtXzMxNC4tMC0zYSNjcS0vcjRnZDNgLS1kLTBzcw%3D%3D&l=2022072415532301020801622832BC4FA9"),
    ("铁板肥肠鱼头.mp4", "https://v11.douyinvod.com/d9b20a9a9c9983e9f4d92081edddab82/62dd0789/video/tos/cn/tos-cn-ve-15c001-alinc2/01ed7dea18654933820162f3bae4de5a/?a=1128&ch=0&cr=0&dr=0&cd=0%7C0%7C0%7C0&cv=1&br=2544&bt=2544&btag=80000&cs=0&ds=3&ft=blh3-IQQqUuIf_oZmo0OW_EklpPixBGd~uZ39eFZ0X3Xr12&mime_type=video_mp4&qs=0&rc=Z2czZWlmNWc1NTZnaTw2NkBpMzM1N2c6ZnVkZTMzNGkzM0AzYi5gXzItNl8xMTIuNl9fYSM1ZmQucjRnYWFgLS1kLS9zcw%3D%3D&l=2022072415462101020817416917B80729"),
    ("孝警阿特.mp4", "https://v95.douyinvod.com/5c79c95579fd0135a461fe138c3b2133/62dd0822/video/tos/cn/tos-cn-ve-15c001-alinc2/d182233e39c44ffcb5179076bbbc99ec/?a=1128&ch=0&cr=0&dr=0&cd=0%7C0%7C0%7C0&cv=1&br=2295&bt=2295&btag=80000&cs=0&ds=3&ft=blh3-IQQqUuIf_oZmo0OW_EklpPixBGiBuZ39eFZ0X3Xr12&mime_type=video_mp4&qs=0&rc=NGg1OWU1M2g7ZDQ1PDczNkBpamhwNDo6ZmY0ZTMzNGkzM0AxLmJfNTExNTMxM2FhMF42YSNrNWNfcjRvM2FgLS1kLS9zcw%3D%3D&l=2022072415475601020915816833A8E812")
]

def task(file_name, video_url):
    res = requests.get(video_url)
    with open(file_name, mode='wb') as f:
        f.write(res.content)
    print(time.time())

if __name__ == '__main__':
    print(time.time())
    for name, url in url_list:
        t = multiprocessing.Process(target=task, args=(name, url))
        t.start()

1.3 GIL锁

GIL, 全局解释器锁(Global Interpreter Lock),是CPython解释器特有一个玩意,让同一个时刻一个进程中只能有一个线程被CPU调用,其他CPU无法再调用该进程中的其他线程

image-20210218184651385

但是电脑有多核CPU的情况,想利用多核优势运行程序,就必须使用多进程开发,即使资源开销大

常见的程序开发中,计算操作需要使用CPU多核优势,IO操作不需要利用CPU的多核优势,所以,就有这一句话:

  • 计算密集型,需要CPU进行计算数据,用多进程,例如:大量的数据计算【累加计算示例】
  • IO密集型,需要CPU等待外部交互数据,用多线程,例如:文件读写、网络数据传输【下载抖音视频示例】

当然,在程序开发中 多线程 和 多进程 是可以结合使用,例如:创建2个进程(建议与CPU个数相同),每个进程中创建3个线程

import multiprocessing
import threading

def thread_task():
    pass

def task(start,end):
    # 进程中创建多线程
    t1 = threading.Thread(target=thread_task) # 没有参数
    t1.start()
    t2 = threading.Thread(target=thread_task)
    t2.start()
    t3 = threading.Thread(target=thread_task)
    t3.start()

if __name__ == '__main__':
    # 创建多进程
    p1 = multiprocessing.Process(target=task,args=(0,500000))
    p1.start()
    p2 = multiprocessing.Process(target=task, args=(500000,1000000))
    p2.start()

2 多线程开发

import threading

def task(arg):
	pass

# 创建一个Thread对象(线程),并封装线程被CPU调度时应该执行的任务和相关参数。
t = threading.Thread(target=task,args=('xxx',))
# 线程准备就绪(等待CPU调度),代码继续向下执行。
t.start()

print("继续执行...") # 主线程执行完所有代码,不结束(等待子线程)

程序执行的时候,会默认创建一个进程,进程中在创建一个线程来运行程序,那个线程被称为主线程,当主线程运行到 threading.Thread的时候,知道我们需要创建子线程,他创建完成后就会继续向下运行,运行完成之后,会等待子线程的运行,当所有子线程都运行结束,再由主线程关闭整个程序

2.1 线程对象查常见的方法

2.1.1 t.start()

当前线程准备就绪(等待CPU调度,具体运行时间由CPU决定)

import threading

loop = 10000000
number = 0

def _add(count):
    global number
    for i in range(count):
        number+=1
    print('子线程运行完成')

t = threading.Thread(target=_add,args=(loop,))
t.start() # 启动子线程 主线程继续向下执行
print('主线程运行完成,等待子线程完毕后关闭程序')

主线程优先运行到输出,子线程还在运行

主线程运行完成,等待子线程完毕后关闭程序
子线程运行完成

2.1.2 t.join()

等待子线程t的任务执行完毕后再继续向下执行主线程

import threading

loop = 10000000
number = 0

def _add(count):
    global number
    for i in range(count):
        number+=1
    print('子线程运行完成')

t = threading.Thread(target=_add,args=(loop,))
t.start() # 启动子线程 主线程继续向下执行
print('主进程继续运行')
t.join() # 等待子线程完成后再继续运行主线程
print('主线程运行完成,等待子线程完毕后关闭程序')

主线程遇到t.join(),则等待子线程t进程运行完成后,再继续运行主进程

主线程继续运行
子线程运行完成
主线程运行完成,等待子线程完毕后关闭程序

并且,t1.join()只会限制主线程,不会限制子线程,所以t1t2都会向下运行,但由于GIL锁的存在,t1t2不能同时运行,而是分片交替运行

import threading

'''
为了体现效果,需要将range数值设置到一定数值以上才能看到交替运行的情况发送
'''

number = 0
def _add():
    global number
    for i in range(50):
        number += 1
        print('add={}'.format(number))

def _sub():
    global number
    for i in range(50):
        number -= 1
        print('sub={}'.format(number))

t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t2.start()
# 虽然cpython解释器有GIL锁,但并不会运行完成t1再运行t2,而是分片交替运行
t1.join() # t1.join()暂停的是主进程,并不会暂停
t2.join()
print('主线程执行完成')

2.1.3 t.setDaemon(布尔值)

守护线程(必须放在start之前)

import threading

number = 0

def _add():
    global number
    for i in range(100000000000000000):
        number += 1

t = threading.Thread(target=_add)
t.setDaemon(True) 
'''
如果:
t.setDaemon(True) 则主线程结束,不等待t子线程完成,立即结束整个程序运行
t.setDaemon(False) 默认情况 主线程结束,等待t子线程完成,再结束整个程序运行
'''

2.1.4 线程名字的设置和获取

import threading

def task(arg):
    name = threading.current_thread().getName() # 获取当前线程名字
    print(name)

for i in range(10):
    t = threading.Thread(target=task,args=(11,))
    t.setName('线程序号-{}'.format(i)) # 给线程设置名字
    t.start() # 给线程设置名字一定要在start之前

2.1.5 自定义线程类

import threading

class MyThread(threading.Thread):
    def run(self):
        print('执行此线程',self._args)

t = MyThread(args=(100,))
t.start()

原本调用多进程采用t = threading.Thread(target=task,args=(100,))的方式进行,我们可以将其转化为类来简化操作,如上文所示,自定义一个类 MyThread,继承threading.Thread的方法,def run()就是就是执行t.start()后运行的代码,self._args存放的是args=(100,)

也就是说,实际上

t = MyThread(args=(100,))
t = threading.Thread(target=run,args=(100,))

3 线程安全

在演示案例中,出现了两个同级线程交替调用同一个全局变量的情况,如果恰好是同一时间申请资源,就可能存在多线程抢占资源导致结果混乱的问题

import threading

number = 0
def _add():
    global number
    for i in range(50):
        number += 1
        print('add={}'.format(number))

def _sub():
    global number
    for i in range(50):
        number -= 1
        print('sub={}'.format(number))

t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t2.start()
print('主线程执行完成')

3.1 线程锁基础

针对自变量可能出现的分片运行情况,python推出了线程锁的概念

lock_object = threading.RLook() # 定义锁
lock_object.acquire() # 申请锁
lock_object.release() # 释放锁 

在申请了同一个锁名的线程中,不会出现分片运行的情况,而是优先申请锁的优先运行,直到运行到释放锁的代码,才会运行另一个申请了该锁名的线程运行

import threading

lock_object = threading.RLock() # 申请锁
number = 0
def _add():
    lock_object.acquire()
    global number
    for i in range(50):
        number += 1
        print('add={}'.format(number))
    lock_object.release()
def _sub():
    lock_object.acquire()
    global number
    for i in range(50):
        number -= 1
        print('sub={}'.format(number))
    lock_object.release()

t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t2.start()
print('主线程执行完成')

在该案例中,_add函数和_sub函数中都申请了lock_object锁,所以在t1.start()启动时,会优先申请到lock_object锁,运行到t2.start()时,发现它也申请了lock_object锁,所以就会暂停该线程等待,t1运行到lock_object.release(),启动t2.lock_object.acquire()下方的代码线程运行

3.2 通过锁解决数据混乱的问题

import threading

num = 0

def _add():
    print('开始')
    global num
    for i in range(10000000):
        num += 1
    print(num)

for i in range(2):
    t = threading.Thread(target=_add)
    t.start()

当数字过大的时候,很可能存在多线程分片调用同一个变量导致数据混乱的问题,例如上述案例,实际输出为

开始
开始
12061314
12346801 # 最后的结果也混乱了

这时可以通过加锁解决数据混乱的问题

import threading

num = 0
lock_object = threading.RLock()

def _add():
    lock_object.acquire()
    print('开始')
    global num
    for i in range(10000000):
        num += 1
    print(num)
    lock_object.release()

for i in range(2):
    t = threading.Thread(target=_add)
    t.start()

加锁之后,数据的混乱就会因为同锁名依次执行的梳理,导致数据恢复正常

开始
10000000
开始
20000000

3.3 上下文管理线程锁

锁可以手动开关,也可以通过with 锁名:进行上下文管理

import threading

num = 0
lock_object = threading.RLock()

def task():
    print("开始")
    with lock_object: # 使用with对锁进行上下文管理,解决忘记解锁的问题
        global num
        for i in range(1000):
            num += 1
    print(num)

for i in range(2):
    t = threading.Thread(target=task)
    t.start()

3.4 线程安全

在开发的过程中要注意有些操作默认都是带有线程安全的(内部集成了锁的机制),我们在使用的时无需再通过锁再处理,例如:

import threading

num_lsit = []

def _add():
    for i in range(10000000):
        num_lsit.append(i) # .append操作就是线程安全的
    print(len(num_lsit))

for i in range(2):
    t = threading.Thread(target=_add)
    t.start()

由于_add函数中没有线程锁,所以导致运行也会交替,但由于.append有线程安全机制,所以导致最后的输出结果并不会混乱

19698209
20000000

至于哪些函数是线程安全的,python的开发文档中有详细的解释,不过平时自己写代码,加上锁也不会有错

image-20210225102151570

4 线程锁

在上文3.1,我们介绍了基本的线程锁,但线程锁一般有两种LockRLock,它们的区别在于能否进行嵌套

4.1 Lock 同步锁

同步锁是一种效率高,但是不支持嵌套的锁

import threading

num = 0
lock_object = threading.Lock()

def task():
    print('开始')
    lock_object.acquire() # 第一个抵达的线程加锁
    global num
    for i in range(100000):
        num += 1
    lock_object.release() # 线程解锁
    print(num)

for i in range(2):
    t = threading.Thread(target=task)
    t.start()

如果使用同步锁进行申请锁嵌套,就可能导致线程申请遇到了第一个申请了锁,遇到了第二个申请,不会得到执行,但是释放第一个锁资源又需要运行第二个申请后代码的情况,导致程序卡死在中无法运行的情况,被称为死锁

# 死锁演示
lock_object = threading.Lock() # 定义锁

def run():
    lock_object.acquire() # 第一个申请了锁资源
    # 程序卡死在了中间
    lock_object.acquire() # 遇到了第二个申请,不会得到执行
    lock_object.release() # 释放资源的代码行 不会得到执行
    lock_object.release()

4.2 RLock 递归锁

RLock内部维护者一个Lockcounter变量,counter变量记录着acquire的次数,从而使得资源可以被多次acquire,直到一个线程所有的acquire都被release,其他的线程才能获得资源

import threading

num = 0
lock_object = threading.RLock()

def task():
    print('开始')
    lock_object.acquire() # 第一个抵达的线程加锁
    global num
    for i in range(100000):
        num += 1
    lock_object.release() # 线程解锁
    print(num)

for i in range(2):
    t = threading.Thread(target=task)
    t.start()

如果递归锁进行申请所资源,第一个申请了锁资源(counter -> acquire=1),在遇到第二个的时候,发现与第一个属于同一个子线程,就会将(counter -> acquire=1+1),并且添加锁,然后继续向下运行,在遇到lock_object.release()时,发现 acquire=2,则关闭最近的一个锁资源,并且acquire=2-1直到一个线程所有的acquire都被release,其他的线程才能获得资源

# 递归锁演示
lock_object = threading.RLock() # 定义锁

def run():
    lock_object.acquire() # 第一个申请了锁资源 counter -> acquire=1
    lock_object.acquire() # 第二个申请锁资源,发现与第一个锁是同一子线程 counter -> acquire=2
    lock_object.release() # 释放第二个锁资源
    lock_object.release() # 释放第一个锁资源

同时也是因为counter变量的原因,导致RLock的执行效率并没有Lock

4.3 递归锁的实际环境演示

在企业开发环境中,通常一个程序由多个程序有进行分功能开发,每个程序员都希望自己的功能代码能够保证数据安全,所以会自己加锁,如果我们在调用他们的代码时,也想自己加锁保证数据安全,就有可能出现锁嵌套的情况

import threading
lock = threading.RLock()

# 程序员A开发了一个函数,函数可以被其他开发者调用,内部需要基于锁保证数据安全。
def func():
	with lock:
		pass
        
# 程序员B开发了一个函数,可以直接调用这个函数。
def run():
    print("其他功能")
    func() # 调用程序员A写的func函数,内部用到了锁。
    print("其他功能")
    
# 程序员C开发了一个函数,自己需要加锁,同时也需要调用func函数。
def process():
    with lock:
		print("其他功能")
        func() # ----------------> 此时就会出现多次锁的情况,只有RLock支持(Lock不支持)。
		print("其他功能")

所以推荐使用RLock递归锁

4.4 死锁常出现的情况

死锁是因为重复申请锁导致的,除去案例中的同一线程重复申请同一个锁的情况

# 死锁演示
lock_object = threading.Lock() # 定义锁

def run():
    lock_object.acquire() # 第一个申请了锁资源
    # 程序卡死在了中间
    lock_object.acquire() # 遇到了第二个申请,不会得到执行
    lock_object.release() # 释放资源的代码行 不会得到执行
    lock_object.release()

还有就是多线程,交替申请多把锁导致的混乱,这种情况由于存在于两个不同的线程中,所以递归锁都不能解决,需要自行捋清楚代码逻辑

import threading

lock1_object = threading.Lock()
lock2_object = threading.Lock()

def _add():
    lock1_object.acquire() # 1.申请了第一把lock1_object锁
    print('add申请lock1锁') # 2.执行了代码
    lock2_object.acquire() # 5.申请第二把lock2_object锁 第一把锁在t2进程中未解锁 等待解锁后执行
    # 7.t1线程后续无法运行 导致死锁
    print('add申请lock2锁')
    lock1_object.release() # 无法被执行导致无法解锁
    lock2_object.release()
def _sub():
    lock2_object.acquire() # 3.申请了第一把lock2_object锁
    print('sub申请lock2锁') # 4.执行代码
    lock1_object.acquire() # 6.申请第二把lock1_object锁 第一把锁在t1进程中未解锁 等待解锁后执行
    # 8.t1线程后续无法运行 导致死锁
    print('sub申请lock1锁')
    lock1_object.release() # 无法被执行导致无法解锁
    lock2_object.release()
t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)

5 线程池

在程序运行过程中,线程是由CPU进行调动,而我们知道由于GIL锁的存在,多个线程并不会同一时间运行,而是分片进行,在线程交替运行过程中会有一个上下文切换的时间,导致整个项目效率降低,所以线程不是开的越多越好,开多了反而可能导致系统性能降低

不建议:无限制的创建线程,这种每次都创建一个线程去操作,创建任务的太多,线程就会特别多,可能效率反倒降低了

import threading

def task():
    pass

url_list = ['www.xxx-{}.com'.format(i) for i in range(20000)] # 列表生成式会创建20000个元素

for url in url_list:
    t = threading.Thread(target=task)
    t.start()

建议:使用线程池

5.1 定义线程池并使用

from concurrent.futures import ThreadPoolExecutor # 1.导入线程池库方法

pool = ThreadPoolExecutor(10) # 2.线程池维护10个线程
pool.submit(函数名,参数1,参数2,参数3,....) # 3.在线程池中提交一个任务,线程池如果有空闲线程,则分配一个线程去执行,如果没有空闲线程,则等待

案例

import time
from concurrent.futures import ThreadPoolExecutor

def task(video_url,num):
    print("开始执行任务",video_url)
    time.sleep(1)

pool = ThreadPoolExecutor(10) # 线程池中创建10个进程

url_list = ['www.xxx-{}.com'.format(i) for i in range(300)]

for url in url_list:
    # 注意,这个循环并不会等待线程运行,而是一股脑的提交任务,提交后由线程池排队进行分配线程运行
    pool.submit(task,url,2) # 向线程池提交任务
print('end')

5.2 pool.shutdown()

主线程等待线程池pool的任务执行完毕后,再往下运行

import time
from concurrent.futures import ThreadPoolExecutor

def task(video_url,num):
    print("开始执行任务",video_url)
    time.sleep(1)

pool = ThreadPoolExecutor(10)

url_list = ['www.xxx-{}.com'.format(i) for i in range(300)]
for url in url_list:
    pool.submit(task,url,2)

print('执行中')
pool.shutdown(True) # 主线程运行到此行会等待线程池执行完毕后继续执行
print('执行完成')

5.3 .add_done_callback(done)

import time,random
from concurrent.futures import ThreadPoolExecutor,Future

def task(video_url):
    print('开始执行任务',video_url)
    time.sleep(2)
    return random.randint(0,10)

def done(response):
    print('任务执行后返回值',response.result()) # 3.线程对象.result() 可以获取 线程执行函数的返回值(这里就是random.randint(0,10)的结果)

# 创建线程池
pool = ThreadPoolExecutor(10)

url_list = ['www.kinght-{}.com'.format(i) for i in range(300)]

for url in url_list:
    future = pool.submit(task,url) # 1.将任务提交到线程池,线程执行的同时,线程池会将其封装成一个对象
    future.add_done_callback(done) # 2.使用对象名.add_done_callback可以继续执行done函数,并且线程对象将会作为函数的参数

此功能可以用于分工,例如:task专门下载,done专门将下载的数据写入本地文件

5.4 综合代码实例

5.4.1 统一处理数据

线程池处理列表数据后将结果存放于另一个列表

from concurrent.futures import ThreadPoolExecutor

def task(video_url):
    return video_url+'.com'

# 创建线程池
pool = ThreadPoolExecutor(10)
# 处理完成后结果存放列表
future_list = []
# 处理前列表生成
url_list = ['www.kinght-{}'.format(i) for i in range(300)]
# for循环提交线程池处理
for url in url_list:
    future = pool.submit(task,url)
    future_list.append(future.result()) # 将结果存储于列表

pool.shutdown() # 主线程等待线程池运行完成

for url in future_list:
    print(url) # 输出结果

5.4.2 基于线程池下载豆瓣图片

26044585,Hush,https://hbimg.huabanimg.com/51d46dc32abe7ac7f83b94c67bb88cacc46869954f478-aP4Q3V
19318369,柒十一,https://hbimg.huabanimg.com/703fdb063bdc37b11033ef794f9b3a7adfa01fd21a6d1-wTFbnO
15529690,Law344,https://hbimg.huabanimg.com/b438d8c61ed2abf50ca94e00f257ca7a223e3b364b471-xrzoQd
18311394,Jennah·,https://hbimg.huabanimg.com/4edba1ed6a71797f52355aa1de5af961b85bf824cb71-px1nZz
18009711,可洛爱画画,https://hbimg.huabanimg.com/03331ef39b5c7687f5cc47dbcbafd974403c962ae88ce-Co8AUI
30574436,花姑凉~,https://hbimg.huabanimg.com/2f5b657edb9497ff8c41132e18000edb082d158c2404-8rYHbw
17740339,小巫師,https://hbimg.huabanimg.com/dbc6fd49f1915545cc42c1a1492a418dbaebd2c21bb9-9aDqgl
18741964,桐末tonmo,https://hbimg.huabanimg.com/b60cee303f62aaa592292f45a1ed8d5be9873b2ed5c-gAJehO
30535005,TANGZHIQI,https://hbimg.huabanimg.com/bbd08ee168d54665bf9b07899a5c4a4d6bc1eb8af77a4-8Gz3K1
31078743,你的老杨,https://hbimg.huabanimg.com/c46fbc3c9a01db37b8e786cbd7174bbd475e4cda220f4-F1u7MX
25519376,尺尺寸,https://hbimg.huabanimg.com/ee29ee198efb98f970e3dc2b24c40d89bfb6f911126b6-KGvKes
21113978,C-CLong,https://hbimg.huabanimg.com/7fa6b2a0d570e67246b34840a87d57c16a875dba9100-SXsSeY
24674102,szaa,https://hbimg.huabanimg.com/0716687b0df93e8c3a8e0925b6d2e4135449cd27597c4-gWdv24
30508507,爱起床的小灰灰,https://hbimg.huabanimg.com/4eafdbfa21b2f300a7becd8863f948e5e92ef789b5a5-1ozTKq
12593664,yokozen,https://hbimg.huabanimg.com/cd07bbaf052b752ed5c287602404ea719d7dd8161321b-cJtHss
16899164,一阵疯,https://hbimg.huabanimg.com/0940b557b28892658c3bcaf52f5ba8dc8402100e130b2-G966Uz
847937,卩丬My月伴er彎,https://hbimg.huabanimg.com/e2d6bb5bc8498c6f607492a8f96164aa2366b104e7a-kWaH68
31010628,慢慢即漫漫,https://hbimg.huabanimg.com/c4fb6718907a22f202e8dd14d52f0c369685e59cfea7-82FdsK
13438168,海贼玩跑跑,https://hbimg.huabanimg.com/1edae3ce6fe0f6e95b67b4f8b57c4cebf19c501b397e-BXwiW6
28593155,源稚生,https://hbimg.huabanimg.com/626cfd89ca4c10e6f875f3dfe1005331e4c0fd7fd429-9SeJeQ
28201821,合伙哼哼,https://hbimg.huabanimg.com/f59d4780531aa1892b80e0ec94d4ec78dcba08ff18c416-769X6a
28255146,漫步AAA,https://hbimg.huabanimg.com/3c034c520594e38353a039d7e7a5fd5e74fb53eb1086-KnpLaL
30537613,配?,https://hbimg.huabanimg.com/efd81d22c1b1a2de77a0e0d8e853282b83b6bbc590fd-y3d4GJ
22665880,日后必火,https://hbimg.huabanimg.com/69f0f959979a4fada9e9e55f565989544be88164d2b-INWbaF
16748980,keer521521,https://hbimg.huabanimg.com/654953460733026a7ef6e101404055627ad51784a95c-B6OFs4
30536510,“西辞”,https://hbimg.huabanimg.com/61cfffca6b2507bf51a507e8319d68a8b8c3a96968f-6IvMSk
30986577,艺成背锅王,https://hbimg.huabanimg.com/c381ecc43d6c69758a86a30ebf72976906ae6c53291f9-9zroHF
26409800,CsysADk7,https://hbimg.huabanimg.com/bf1d22092c2070d68ade012c588f2e410caaab1f58051-ahlgLm
30469116,18啊全阿,https://hbimg.huabanimg.com/654953460733026a7ef6e101404055627ad51784a95c-B6OFs4
17473505,椿の花,https://hbimg.huabanimg.com/0e38d810e5a24f91ebb251fd3aaaed8bb37655b14844c-pgNJBP
19165177,っ思忆゜?,https://hbimg.huabanimg.com/4815ea0e4905d0f3bb82a654b481811dadbfe5ce2673-vMVr0B
16059616,格林熊丶,https://hbimg.huabanimg.com/8760a2b08d87e6ed4b7a9715b1a668176dbf84fec5b-jx14tZ
30734152,sCWVkJDG,https://hbimg.huabanimg.com/f31a5305d1b8717bbfb897723f267d316e58e7b7dc40-GD3e22
24019677,虚无本心,https://hbimg.huabanimg.com/6fdfa9834abe362e978b517275b06e7f0d5926aa650-N1xCXE
16670283,Y-雨后天空,https://hbimg.huabanimg.com/a3bbb0045b536fc27a6d2effa64a0d43f9f5193c177f-I2vHaI
21512483,汤姆2,https://hbimg.huabanimg.com/98cc50a61a7cc9b49a8af754ffb26bd15764a82f1133-AkiU7D
16441049,笑潇啸逍小鱼,https://hbimg.huabanimg.com/ae8a70cd85aff3a8587ff6578d5cf7620f3691df13e46-lmrIi9
24795603,?????v,https://hbimg.huabanimg.com/a7183cc3a933aa129d7b3230bf1378fd8f5857846cc5-3tDtx3
29819152,妮玛士珍多,https://hbimg.huabanimg.com/ca4ecb573bf1ff0415c7a873d64470dedc465ea1213c6-RAkArS
19101282,陈勇敢?,https://hbimg.huabanimg.com/ab6d04ebaff3176e3570139a65155856871241b58bc6-Qklj2E
28337572,爱意随风散,https://hbimg.huabanimg.com/117ad8b6eeda57a562ac6ab2861111a793ca3d1d5543-SjWlk2
17342758,幸运instant,https://hbimg.huabanimg.com/72b5f9042ec297ae57b83431123bc1c066cca90fa23-3MoJNj
18483372,Beau染,https://hbimg.huabanimg.com/077115cb622b1ff3907ec6932e1b575393d5aae720487-d1cdT9
22127102,栽花的小蜻蜓,https://hbimg.huabanimg.com/6c3cbf9f27e17898083186fc51985e43269018cc1e1df-QfOIBG
13802024,LoveHsu,https://hbimg.huabanimg.com/f720a15f8b49b86a7c1ee4951263a8dbecfe3e43d2d-GPEauV
22558931,白驹过隙丶梨花泪う,https://hbimg.huabanimg.com/e49e1341dfe5144da5c71bd15f1052ef07ba7a0e1296b-jfyfDJ
11762339,cojoy,https://hbimg.huabanimg.com/5b27f876d5d391e7c4889bc5e8ba214419eb72b56822-83gYmB
30711623,雪碧学长呀,https://hbimg.huabanimg.com/2c288a1535048b05537ba523b3fc9eacc1e81273212d1-nr8M4t
18906718,西霸王,https://hbimg.huabanimg.com/7b02ad5e01bd8c0a29817e362814666a7800831c154a6-AvBDaG
31037856,邵阳的小哥哥,https://hbimg.huabanimg.com/654953460733026a7ef6e101404055627ad51784a95c-B6OFs4
26830711,稳健谭,https://hbimg.huabanimg.com/51547ade3f0aef134e8d268cfd4ad61110925aefec8a-NKPEYX

代码:

import requests,os
from concurrent.futures import ThreadPoolExecutor

def download(url):
    res = requests.get(
        url,
        headers={
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.55 Safari/537.36"
        }
    )
    return res

def outer(filename):
    def save(response):
        res = response.result()
        file_path = os.path.join('images',"{}.png".format(filename))
        with open(file_path,mode='wb') as file_object:
            file_object.write(res.content)
    return save

pool = ThreadPoolExecutor(10)

# 经过多次调试,发现__file__和requests函数库存在冲突,故不建议同时使用,所以将其放于线程池之外
abs_path = os.path.abspath(__file__)
dir_path = os.path.dirname(abs_path)
down_path = os.path.join(dir_path, 'images')
if not os.path.exists(down_path):
    # 创建images目录
    os.makedirs(down_path)

with open('url.txt',mode='rt',encoding='utf-8') as file_object:
    for line in file_object:
        file_id,file_name,file_url = line.strip().split(",")
        future = pool.submit(download,file_url)
        future.add_done_callback(outer(file_name))
        # future.add_done_callback(outer(file_name)) 会首先运行依次outer(file_name)
        # 内存空间中创建了 filename 和 save 的空间,然后save被返回了出来等待运行
        # 所以这里实际运行是future.add_done_callback(save) 将 download返回的res作为save的参数

6 单例模式

在程序开发过程中,尤其是多线程开发过程中,可能程序会创建大量的实例,如果继续使用传统实例化的方式进行,每个实例都会占用内存地址,这样会导致内存开销非常大

class Foo:
    pass
obj1 = Foo()
obj2 = Foo()
print(id(obj1)) # 1919265430256
print(id(obj2)) # 1919265430208

如果我们将实例只用创建一次,而后续创建的实例则是进行原实例的修改,就可以解决该问题

class Foo:
    instance = None # 1.创建一个变量instance赋值None
    def __init__(self,name,age):
        self.name = name # 3.无论是否是空实例,这里都可以重新赋值
        self.age = age
    def __new__(cls, *args, **kwargs):
        if cls.instance: # 2.判断该变量是否为空,
            return cls.instance # 不为空说明曾被创建过直接返回实例空间
        cls.instance = object.__new__(cls) # 为空则创建空实例,然后返回
        return cls.instance # 返回

obj1 = Foo('kinght',24)
obj2 = Foo('Aym',24)
print(id(obj1)) # 2293565161376
print(id(obj2)) # 2293565161376

其实单例模式是__new__创建空间,__init__初始化空间的妙用,instance是类变量,如果他被修改,在程序运行过程中会一直改变,后续只需要在__new__加判断instance是否为None,为None则创建,然后将对象存放于instance中,如果不为None,则将instance中存放的对象直接返回给__init__,进行初始化

如此,就达到了一个实例,重复使用的目的

6.1 单例模式可能申请锁冲突

import time,threading

class Foo:
    instance = None 
    def __init__(self,name,age):
        self.name = name
        self.age = age
    def __new__(cls, *args, **kwargs):
        if cls.instance: 
            return cls.instance 
        time.sleep(0.1) # 故障测试添加
        cls.instance = object.__new__(cls)
        return cls.instance

def task():
    obj = Foo('kinght',22)
    print(id(obj))

for i in range(10):
    t = threading.Thread(target=task)
    t.start()
'''输出结果
278707349051227870732582562787074727216
27870749404482787074939872
2787073489408
27870734905122787073304656
2787074940256
'''

由于多线程的关系,短时间内会发起多个线程,假如有一个线程创建了实例空间,却因为某种原因卡住,导致cls.instance无法第一时间被修改值,则会导致后面的线程获取到instance=None而创建多个实例

该情况的解决方式就是加锁

import time,threading

class Foo:
    instance = None
    # 因为锁是为类服务,所以推荐放在类变量中
    lock = threading.Lock()
    def __init__(self,name,age):
        self.name = name
        self.age = age
    def __new__(cls, *args, **kwargs):
        with cls.lock: # 加锁解决问题
            if cls.instance:
                return cls.instance
            time.sleep(0.1)
            cls.instance = object.__new__(cls)
            return cls.instance # 返回

def task():
    obj = Foo('kinght',22)
    print(id(obj))

for i in range(10):
    t = threading.Thread(target=task)
    t.start()

优化代码:由于锁是非常消耗时间的,所以在锁的上方优先判断一次cls.instance是否为None,如果直接为否,则可以避免加锁重复


Python开发-035_多线程开发
http://localhost:8080/archives/3AWqg6hR
作者
kinght
发布于
2024年11月11日
更新于
2024年11月11日
许可协议