Python开发-036_多进程与协程开发
本章节主要讲诉线程开发,协程将会在后续WEB框架章节进行详细介绍
1 多进程开发
进程是计算机中分配资源的最小单位,一个进程中可以有多个线程,并且,同一个进程中的线程共享资源,而进程与进程之间相互隔离,Python中可以通过多进程利用CPU的多核优势,进行计算密集型操作
注意:由于python创建进程需要基于fork、spawn、forkserver等模式,所以将其放于if __name__ == '__main__'
中,才能让代码在跨平台的时候不出问题
import multiprocessing
def task():
pass
if __name__ == '__main__':
p1 = multiprocessing.Process(target=task)
p1.start()
只要多进程的启动代码是从__main__
启动就可以
from multiprocessing import Process
# 这里选择导入multiprocessing模块的process类
def task(arg):
pass
def run():
p = Process(target=task)
p.start()
if __name__ == '__main__':
run()
1.1 进程模式简介
在python的官方文档提到,根据平台不同,想要通过multiprocessing
模块操作进程,可使用三种模式
-
fork,【“拷贝”几乎所有资源】【unix】【任意位置开始】【快】
-
在创建子进程的时候,会将父进程的所有资源全部拷贝一份,子进程在开始的时候于父进程相同,但开始运行后,资源相互隔离
import multiprocessing,time def task(): print(name) # 2.子进程能获得主进程定义的name列表并输出 name.append(123) # 3.子进程对列表添加123 print(name) # 4.子进程列表输出[123] if __name__ == '__main__': multiprocessing.set_start_method("fork") name = [] # 1.主进程定义了name列表 # p1定义进程,拷贝主进程上方所有的资源 p1 = multiprocessing.Process(target=task) p1.start() time.sleep(2) # 5.避免影响实验结果 print(name) # [] 6.但是子进程对name列表的操作,无法影响主进程
-
支持文件对象/线程锁等传参
import multiprocessing,threading def task(lk): print(file_object) print(lk) if __name__ == '__main__': multiprocessing.set_start_method("fork") file_object = open("demo.txt",mode='rt',encoding='utf-8') # 支持自动拷贝 lock = threading.RLock() p1 = multiprocessing.Process(target=task,args=lock) # 也支持手动传递 p1.start()
-
支持linux和mac系统,不支持windows系统
-
-
spawn,【run参数传必备资源】【unix、win】【main代码块开始】【慢】
-
在创建子进程的时候,会创建一个新的python解释器去运行子进程的代码
-
不支持文件对象/线程锁等传参
-
支持Linux、Mac系统和windows系统
import multiprocessing,time def task(): print(name) # 报错:找不到name name.append(123) print(name) if __name__ == '__main__': multiprocessing.set_start_method("spawn") name = [] p1 = multiprocessing.Process(target=task) p1.start() time.sleep(2) print(name)
输出结果直接报错:NameError: name 'name' is not defined
原因:拷贝的时候拷贝了python解释器,并没有拷贝程序定义的name列表,可以通过参数的形式进行传递name值,但是传递完成之后依然时分别独立的两份数据
import multiprocessing,time def task(name): print(name) name.append(123) print(name) # name = [123] if __name__ == '__main__': multiprocessing.set_start_method("spawn") name = [] p1 = multiprocessing.Process(target=task,args=name) # 使用args将列表传入task p1.start() time.sleep(2) print(name) # name = [] 传递完成之后依然时分别独立的两份数据
-
-
forkserver,【run参数传必备资源】【不支持文件对象/线程锁等传参】【部分unix】【main代码块开始】
-
在开始运行主进程的主线程时,会创建一个模板(类似虚拟机的快照),需要创建子进程时,使用模板进行创建
-
不支持文件对象/线程锁等传参
-
由于模板是在程序运行之初完成的创建,其本质也就是一个python解释器,所以和spawn相同的处理机制
import multiprocessing,time def task(name): print(name) name.append(123) print(name) # name = [123] if __name__ == '__main__': multiprocessing.set_start_method("forkserver") name = [] p1 = multiprocessing.Process(target=task,args=name) # 使用args将列表传入task p1.start() time.sleep(2) print(name) # name = [] 传递完成之后依然时分别独立的两份数据
-
1.2 进程模式易出问题
1.2.1 子进程和主进程的数据不互通
import multiprocessing
def task():
file_object.write('kinght\n') # 6.在内存内部写入kinght\n
file_object.flush() # 7.将内存中的数据 aym\n kinght\n 写入硬盘
if __name__=="__main__":
multiprocessing.set_start_method("fork") # 1.多线程定义为fork模式
file_object = open("demo.txt",mode='a+',encoding='utf-8') # 2.获取到文件句柄
file_object.write("aym\n") # 3.文件句柄在内存中写入aym
p1 = multiprocessing.Process(target=task) # 4.创建多进程p1,使用函数task
p1.start() # 5.启动p1 将file_object以及其内部的aym传递给task
# 8.子进程完成,主进程结束时,发现主进程中还有一个aym\n
'''最后输出结果
aym
kinght
aym
'''
由于主进程和子进程内存空间相互独立,而子进程又拷贝了内存空间中的aym\n
,所以子进程写入硬盘的时候,写入的是aym\n kinght\n
,但是子进程完成后,主进程中还有一个aym\n
没有写入硬盘,系统会再次将主进程的数据写入硬盘,达到现在的结果
修改代码,提前刷入硬盘
import multiprocessing
def task():
file_object.write('kinght\n') # 2.这里不会再继承主线程的aym
file_object.flush() # 3.只刷入子进程自己的kinght
if __name__ == '__main__':
multiprocessing.set_start_method('fork')
file_object = open('demo.txt',mode='wt',encoding='utf-8')
file_object.write("aym\n")
file_object.flush() # 1.由于已经提前刷入硬盘,故内存中没有了aym
p1 = multiprocessing.Process(target=task)
p1.start()
'''最后输出结果
aym
kinght
'''
1.2.2 主进程锁会被子进程继承
import threading,multiprocessing
def func():
lock.acquire() # 4.申请lock锁,但是锁被p1子进程的主线程申请走了,此线程暂停
print('func')
lock.release()
def task():
print(lock)
# <locked _thread.RLock object owner=8663369216 count=1 at 0x7fc830183fc0> 锁已被申请
# 2.子进程中,锁同样存在,这里是被转化为了子进程的主线程申请
lock.acquire() # 3.这里锁是被子进程的主进程再次申请,由于是Rlock,所以还能够申请第二次
print(666) # 666能够被输出
for i in range(10):
t = threading.Thread(target=func)
t.start()
if __name__ == '__main__':
multiprocessing.set_start_method('fork')
name = []
lock = threading.RLock()
lock.acquire() # 1.锁被主进程的主线程申请走了
p1 = multiprocessing.Process(target=task) # fork创建子线程时,会将锁同样拷贝一份给子进程
p1.start()
子进程继承锁之后,锁的申请会自动转换到,子进程的主线程,不影响主进程的主线程
1.3 进程常见方法
进程的常见方法:
1.3.1 p.start()
当前进程准备就绪,等待被CPU调度(工作单元为进程中的线程)
import multiprocessing
def task():
pass
if __name__ == '__main__':
p1 = multiprocessing.Process(target=task)
p1.start()
1.3.2 p.join()
等待当前进程的任务执行完成后在往下执行
import multiprocessing
import time
from multiprocessing import Process
def task(arg):
print('执行中')
time.sleep(2)
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
p = Process(target=task,args=('demo',))
p.start()
p.join() # 等待p子进程所有代码执行完成后继续执行主进程
print('主进程继续执行')
1.3.3 p.daemon=布尔值
守护进程,必须放在start之前
p.daemon=True
,设置为守护进程,主进程执行完毕,不等待子进程,直接结束p.daemon=False
,默认状态,设置为非守护进程,主进程执行完成后,等待子进程执行完成再结束
import multiprocessing
import time
def task(arg):
print('子进程开始执行...')
time.sleep(2)
print('子进程执行完毕')
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')
p = multiprocessing.Process(target=task,args=(112,))
p.daemon = True # 子进程的第二条输出语句由于主进程关闭而无法执行
# p.daemon = False # 子进程的第二条输出语句由于主进程等待而获得执行
p.start()
time.sleep(1)
print('主进程即将结束')
1.3.4 进程名称的设置和获取
import multiprocessing
def task(arg):
print('当前进程名称为',multiprocessing.current_process().name) # 获取当前进程名称
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')
p = multiprocessing.Process(target=task,args=(11,))
p.name = '哈哈哈哈哈' # 设置进程名称
p.start()
1.3.5 进程的PID获取
相信熟悉Linux的大家对于PID这个名称都不陌生,每个在系统中运行的进程都有一个PID号
import os
os.getpid() # 获取当前进程的PID
os.getppid() # 获取当前进程的父进程PID
代码中演示:
import multiprocessing,os
def task(arg):
print('子进程PID为',os.getpid()) # 获取子进程pid
print('父进程PID为',os.getppid()) # 获取父进程pid
if __name__ == '__main__':
print('主进程PID为', os.getpid()) # 获取主进程pid
multiprocessing.set_start_method('spawn')
p = multiprocessing.Process(target=task,args=(11,))
p.start()
'''输出结果
主进程PID为 16280
子进程PID为 2088
父进程PID为 16280
'''
1.3.6 获取进程中的线程
import threading
import multiprocessing
import time
def func():
time.sleep(2)
def task(args):
for i in range(3):
t = threading.Thread(target=func)
t.start()
thread_list = threading.enumerate() # 获取线程列表
thread_len = len(thread_list) # 获取有多少个线程
print('线程:',thread_list)
print('数量:',thread_len)
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')
p = multiprocessing.Process(target=task,args=('xxx',))
p.start()
'''输出结果 主线程+3个子线程
线程: [<_MainThread(MainThread, started 5332)>, <Thread(Thread-1, started 2692)>, <Thread(Thread-2, started 8072)>, <Thread(Thread-3, started 24736)>]
数量: 4
'''
1.3.7 自定义进程类
import multiprocessing
class MyProcess(multiprocessing.Process):
def run(self):
print('执行此进程',self._args)
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')
p = MyProcess(args=('xxx',))
p.start()
1.3.8 CPU核数
进程不是越多越好,并不推荐进程数大于CPU的逻辑处理器数量
import multiprocessing
count = multiprocessing.cpu_count() # cpu逻辑处理器
print(count) # 12核 实际电脑是 6核CPU虚拟出12个逻辑处理器
通过CPU逻辑处理器数来创建多进程
import multiprocessing
def func():
pass
if __name__ == '__main__':
count = multiprocessing.cpu_count()
for i in range(count-1): # 还有主进程需要占用1个处理器
p = multiprocessing.Process(target=func)
p.start()
2 进程之间数据共享
在前文,我们通过大量篇幅证实了进程和进程之间互不通信这件事情,无数次的声明,进程是资源分配的最小单位,每个进程中都维护自己独立的数据
import multiprocessing
def task(data):
data.append(666)
print('task_data = ',data)
if __name__ == '__main__':
data = []
p = multiprocessing.Process(target=task,args=(data,))
p.start()
print('主进程data = ',data)
'''运行结果
主进程data = []
task_data = [666]
'''
2.1 共享
2.1.1 Shared memory
关于进程间的数据通信,python基于C语言的底层逻辑,实现了一个value的方法
from multiprocessing import Process,Value
def func(n,m1,m2):
n.value = 888
m1.value = 'a'.encode('utf-8') # char必须encode
m2.value = '超'
if __name__ == '__main__':
num = Value('i',666) # 类似C语言定义 int num = 666
v1 = Value('c') # 类似C语言定义 char v1 = None
v2 = Value('u') # 类似C语言定义 char v2 = None # 只不过可以是中文字符
p = Process(target=func,args=(num,v1,v2))
p.start()
p.join() # 一定要等待子进程运行完成,再进行接下来的调用
print(num.value) # 888
print(v1.value) # b'a'
print(v2.value) # 超
类似于强类型语言的做法,通过value创建一个固有大小的内存空间,将值存放于空间中,然后将指向空间的变量名,通过参数的形式传递给子进程,子进程修改也是直接修改对应空间内存放的值,其他进程读取也是读取对应空间的值,这样一个进程就可以影响多个进程
# 类似于强类型语言,此方法也会开辟固定空间大小
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint, (其u表示无符号)
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
类似的方式,同样可以通过Array
使用数组来创建空间,并且由于需要固定内存空间大小,所以数组内存放类型和数组长度,都是被固定死了的
from multiprocessing import Process,Array
def func(arr):
arr[0] = 66
arr[1] = 77
arr[2] = 88
if __name__ == '__main__':
arr = Array('i',[11,22,33]) # i int 这个数组只能放整形,并且数组长度只能是3
p = Process(target=func,args=(arr,))
p.start()
p.join()
print(arr[:]) # [66, 77, 88]
2.1.2 Server Process
基于manager()
来创建空间,再其范围内,属于同一个大的内存空间,在运行子进程是将内存空间变量名作为参数传入,让子进程可修改主进程的值
from multiprocessing import Process,Manager
def func(demo1,demo2):
# 子空间获得了Manager中的内存地址
# 可以修改主进程中的demo1\demo2
demo1['name'] = 'kinght'
demo1['age'] = 24
demo2.append(666)
if __name__ == '__main__':
# 在这个上下文管理器之外的变量无法获得传递
with Manager() as manager:
# 使用Manager()创建一个上下文管理,再此管理器范围内demo1,demo2内存空间可传递到子进程
demo1 = manager.dict() # 创建一个字典空间
demo2 = manager.list() # 创建一个列表空间
p = Process(target=func,args=(demo1,demo2))
p.start()
p.join()
print(demo1)
print(demo2)
'''输出
{'name': 'kinght', 'age': 24}
[666]
'''
2.2 交换
2.2.1 Queue
使用multiprocessing.Queue
创建一个队列,使用put
将数据放入队列,使用get
将数据取出
队列:先进先出
import multiprocessing
def task(queue):
for i in range(10):
queue.put(i) # 3.put将数据放入队列
if __name__ == '__main__':
queue = multiprocessing.Queue() # 1.定义队列
p = multiprocessing.Process(target=task,args=(queue,)) # 2.将队列作为参数传递给子进程
p.start()
p.join()
print('主进程') # 主进程
print(queue.get()) # 0 # 4.使用get依次取出数据
print(queue.get()) # 1
print(queue.get()) # 2
print(queue.get()) # 3
print(queue.get()) # 4
print(queue.get()) # 5
print(queue.get()) # 6
同样,主进程的操作会影响到子进程
import multiprocessing
def task(queue):
print(queue.get())
print(queue.get())
print(queue.get())
print(queue.get())
print(queue.get())
print(queue.get())
print(queue.get())
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=task,args=(queue,))
for i in range(10):
queue.put(i)
p.start()
p.join()
2.2.2 Pipes
Pipes可以被认为是种管道,它是一种双向的队列
A -> B
A <- B
它的数据交换方式和网络编程有相似的地方
import time
import multiprocessing
def task(conn):
time.sleep(1)
conn.send([111,222,333]) # 3.子进程通过管道向主进程发送数据
data = conn.recv() # 6.阻塞,等待管道主进程的数据
print(data)
if __name__ == '__main__':
parent_conn,child_conn = multiprocessing.Pipe() # 1.定义管道 parent_conn,child_conn为管道的两头
p = multiprocessing.Process(target=task,args=(child_conn,)) # 2.将管道的一头发送给子进程
p.start()
info = parent_conn.recv() # 4.阻塞,等待管道子进程的数据
print("主进程接受:",info)
time.sleep(1)
parent_conn.send('主进程已接受') # 5.主进程向子进程发送数据
上诉都是Python内部提供的进程之间数据共享和交换的机制,在项目开发中很少使用,后期项目中一般会借助第三方来做资源共享,例如Mysql、redis等
3 进程锁
如果多个进程抢占式的去做某些操作/共享同一个资源的时候,为了防止操作出问题,可以通过进程锁来避免
数据资源冲突
import time
from multiprocessing import Process,Value
def func(n):
n.value = n.value + 1
if __name__ == '__main__':
num = Value('i',0)
for i in range(100):
p = Process(target=func,args=(num,))
p.start()
time.sleep(3)
print(num.value)
执行此案例,可能出现99或者100等多个结果
文件资源冲突
import time
import multiprocessing
def task():
with open('demo.txt',mode='rt',encoding='utf-8') as read_file:
current_num = int(read_file.read())
print('余票剩下:',current_num,'抢购中...')
time.sleep(2)
current_num -= 1
with open('demo.txt',mode='wt',encoding='utf-8') as write_file:
write_file.write(str(current_num))
if __name__ == '__main__':
for i in range(10):
p = multiprocessing.Process(target=task)
p.start()
time.sleep(1)
'''运行结果
余票剩下: 9 抢购中...
余票剩下: 9 抢购中...
余票剩下: 8 抢购中...
余票剩下: 8 抢购中...
余票剩下: 7 抢购中...
余票剩下: 7 抢购中...
余票剩下: 6 抢购中...
余票剩下: 6 抢购中...
余票剩下: 5 抢购中...
余票剩下: 5 抢购中...
'''
额外提示:如果使用数据交换的模式(Queue/Pipes)进行数据共享,由于先入先出的机制,不会导致数据混乱
3.1 进程锁使用
进程锁和线程锁使用方式几乎一样,不过由于进程间数据不互通的情况,需要使用传参的方式进行传递到各个进程中间
import time
import multiprocessing
def task(lock): # 3.通过传参传进程锁
lock.acquire() # 4.申请锁
# 以下代码不会再冲突
with open('demo.txt',mode='rt',encoding='utf-8') as read_file:
current_num = int(read_file.read())
print('余票剩下:',current_num,'抢购中...')
time.sleep(2)
current_num -= 1
with open('demo.txt',mode='wt',encoding='utf-8') as write_file:
write_file.write(str(current_num))
lock.release() # 5.运行完成解除锁
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')
lock = multiprocessing.RLock() # 1.定义进程锁
for i in range(10):
p = multiprocessing.Process(target=task,args=(lock,)) # 2.创建进程时通过传参传递锁
p.start()
TIPS:前文提到过,进程不能传递锁,那里特指线程锁
线程锁不能通过子进程参数传递,进程锁可以通过子进程参数传递
在部分系统执行spawn
模式的时候,需要在主线程添加等待代码,等待子进程全部运行完成才不会报错
process_list = [] # 创建一个空列表
for i in range(10):
p = multiprocessing.Process(target=task,args=(lock,))
p.start()
process_list.append() # 将每个进程添加到列表中
# 在所有进程开启完成后,监听保证每个进程都运行完成再结束主进程
for item in process_list:
item.join() # 保证每个进程都运行完成
4 进程池
在开发过程中,如果无限制创建进程、线程,都会导致程序的效率降低,尤其是进程
我们之所以创建进程是为了CPU的多核优势,让他帮我们做并行计算,比如来了200个任务,但CPU只有4个逻辑处理器,我们需要创建的其实是4个进程,让计算机四个四个的处理,所以python为我们提供了进程池
TIPS:在python2中,没有线程池只有进程池、
python3有了线程池和进程池概念,并且将其放于concurrent.futures模块中
import time
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
def task(num):
print('执行',num)
time.sleep(1)
if __name__ == '__main__':
# 修改进程模式
multiprocessing.set_start_method('spawn')
# 创建进程池,规定4个进程
pool = ProcessPoolExecutor(4)
# for循环将任务提交给进程池
for i in range(10):
pool.submit(task,i) # task为方法 i为参数
# 主进程等待进程池中所有任务执行完成后,再继续往下执行
pool.shutdown(True)
与线程池相同,for循环会一次性提交10个进程给进程池,有进程池自行等待排序运行
4.1 add_done_callback()
import time
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
def task(num):
print('执行',num) # 2.执行task代码返回num
time.sleep(2)
return num
def done(res):
print('done函数',multiprocessing.current_process().pid) # 查看当前运行进程的pid
time.sleep(1)
print(res.result()) # 查看res对象中的值 即 task返回值num
time.sleep(1)
if __name__ == '__main__':
pool = ProcessPoolExecutor(4)
for i in range(50):
fur = pool.submit(task,i) # 1.申请线程运行,获取对象
fur.add_done_callback(done) # 3.线程对象作为参数,运行done方法
print('主进程',multiprocessing.current_process().pid) # 查看当前运行进程的pid
pool.shutdown() # 等待线程池所有子进程运行完成后再往下执行主进程
进程池 add_done_callback(done)
和线程池.add_done_callback(done)
几乎相同,进程池也是将子进程的对象作为参数交给done
方法运行
唯一的区别是,线程池的done方法是子线程运行,而进程池是主进程运行,从运行结果可以看出
主进程 27620
执行 0
执行 1
执行 2
执行 3
执行 4
done函数 27620
....省略后续
4.2 进程池加锁
进程池加锁需要基于Manager中的Lock和RLock实现
import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
def task(lock):
print('开始')
with lock:
with open('demo.txt', mode='rt', encoding='utf-8') as read_file:
current_num = int(read_file.read())
print('余票剩下:', current_num, '抢购中...')
time.sleep(2)
current_num -= 1
with open('demo.txt', mode='wt', encoding='utf-8') as write_file:
write_file.write(str(current_num))
if __name__ == '__main__':
pool = ProcessPoolExecutor(5)
# lock = multiprocessing.RLock() # 不能使用
manager = multiprocessing.Manager() # 只能使用manager
lock = manager.RLock()
for i in range(10):
pool.submit(task,lock)
5 进程综合案例
需求:现有多个日志文件,需计算每日访问总量以及每日访问IP数量
文件名:
20210322.log
20210323.log
20210324.log
20210325.log
20210326.log
20210327.log
20210328.log
20210329.log
代码处理:
import os,time
from concurrent.futures import ProcessPoolExecutor
def task(file_name):
# 5.定义一个空元组存放IP数量 定义一个total计算总数
ip_set = set()
total = 0
# 6.打开文件进行处理
file_path = os.path.join('files',file_name)
with open(file_path,mode='rt',encoding='utf-8') as file_object:
for rank in file_object:
total += 1 # 计算有多少行
ip = rank.strip().split(" - - ")[0] # 注意:由于计算量太大,不能使用解包
ip = ip.split(',')[0]
ip_set.add(ip) # 将提取出来的IP放入元组,元组会自动过滤重复的
ip_len = len(ip_set) # 计算IP元组的长度得到IP数
time.sleep(1)
return {'total':total,'ip':ip_len} # 7.将结果组合返回
def outer(info,filename): # 9.outer同时接受空字典、文件名
def done(res,*args,**kwargs):
info[filename] = res.result() # 10.组合字典,res.result()得到返回值{'total':total,'ip':ip_len}
return done # 10.done返回给步骤8执行
if __name__ == '__main__':
info = {} # 1.定义一个空字典 存放结果
pool = ProcessPoolExecutor(4) # 2.定义进程池
file_list = os.listdir('files') # 3.获取files下所有的文件名
# 让每个进程独立处理一个文件
for filename in file_list:
fur = pool.submit(task,filename) # 4.发送到进程池
fur.add_done_callback(outer(info,filename)) # 8.将结果交给outer处理
print(filename)
pool.shutdown(True) # 11.等待所有子进程完成
for k, v in info.items():
print(k, v) # 12.字典解包输出
6 协程
暂时以理解为主,将会在后续Web框架课程中详细介绍
在计算机中,其实只提供了线程、进程用于实现并发编程,而协程是程序员通过代码搞出来的一个东西
官方定义为:协程也可以被称为微线程,是一种用户态内的上下文切换技术
简而言之,就是一个线程实现代码块的相互切换执行
案例
def func1():
print(1)
...
print(2)
def func2():
print(3)
...
print(4)
func1()
func2()
上述代码是普通的函数定义和执行,按流程分别执行两个函数中的代码,并先后会输出:1、2、3、4
但如果介入协程技术那么就可以实现函数见代码切换执行,最终输入:1、3、2、4
6.1 实现函数运行跳转的办法
6.1.1 greenlet
注意:未知错误,python解释器无法连接到已通过pip install greenlet
下载的 from greenlet import greenlet
中,所以并未能浮现该代码
from greenlet import greenlet
def func1():
print(1) # 第1步:输出 1
gr2.switch() # 第3步:切换到 func2 函数
print(2) # 第6步:输出 2
gr2.switch() # 第7步:切换到 func2 函数,从上一次执行的位置继续向后执行
def func2():
print(3) # 第4步:输出 3
gr1.switch() # 第5步:切换到 func1 函数,从上一次执行的位置继续向后执行
print(4) # 第8步:输出 4
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 第1步:去执行 func1 函数
6.1.2 yield
def func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
f1 = func1()
for item in f1:
print(item)
虽然上述两种都实现了协程,但这种编写代码的方式没啥意义
这种来回切换执行,可能反倒让程序的执行速度更慢了(相比较于串行)
6.2 协程如何才能更有意义呢?
不需要用户手动切换,而是程序到达IO操作的等待时间,能够自动切换就有意义了
Python在3.4之后推出了asyncio模块 + Python3.5推出async、async语法 ,内部基于协程并且遇到IO请求自动化切换
import asyncio
async def func1():
print(1) # 4.首先运行func1
await asyncio.sleep(1) # 5.遇到IO阻塞或者等待的时候
print(2) # 7.在IO阻塞或者等待结束后会自动切换回func1
async def func2():
print(3) # 6.会自动跳转到列表中的第二个func2运行
await asyncio.sleep(2) # 8.由于func1运行完成,所以不会再跳转
print(4) # 9.运行结束
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
] # 1.创建任务列表
loop = asyncio.get_event_loop() # 2.创建协程对象
loop.run_until_complete(asyncio.wait(tasks)) # 3.开始运行列表中的方法
使用爬虫案例来详细了解
"""
需要先安装:pip3 install aiohttp
"""
import aiohttp
import asyncio
async def fetch(session, url):
print("发送请求:", url)
async with session.get(url, verify_ssl=False) as response: # 3.发生阻塞,自动跳转到列表下一个元素进行运行
content = await response.content.read() # 4.等待接受到数据,再转回进行数据处理
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(content)
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] # 1.列表生成式创建协程任务列表
await asyncio.wait(tasks) # 2.开始运行tasks任务列表
if __name__ == '__main__':
asyncio.run(main())
通过上述内容发现,在处理IO请求时,协程通过一个线程就可以实现并发的操作
7 协程、线程、进程的区别?
线程,是计算机中可以被cpu调度的最小单元。
进程,是计算机资源分配的最小单元(进程为线程提供资源)。
一个进程中可以有多个线程,同一个进程中的线程可以共享此进程中的资源。
由于CPython中GIL的存在:
- 线程,适用于IO密集型操作。
- 进程,适用于计算密集型操作。
协程,协程也可以被称为微线程,是一种用户态内的上下文切换技术,在开发中结合遇到IO自动切换,就可以通过一个线程实现并发操作。
通常情况下,只让协程做网络IO的请求,在处理IO操作时,协程比线程更加节省开销(协程的开发难度大一些),返回之后的数据,一般选择放进文件、队列、数据库等之后再交由其他的代码进行处理(毕竟协程只有一个主进程)
现在很多Python中的框架都在支持协程,比如:FastAPI、Tornado、Sanic、Django 3、aiohttp等,企业开发使用的也越来越多(目前不是特别多)
关于协程,目前先了解这些概念即可,更深入的开发、应用 暂时不必过多了解,等学了Web框架和爬虫相关知识之后,再来学习和补充效果更佳。有兴趣想要研究的同学可以参考路飞学城ALEX老师写的文章和专题视频:
- 文章
https://pythonav.com/wiki/detail/6/91/
https://zhuanlan.zhihu.com/p/137057192
- 视频
https://www.bilibili.com/video/BV1NA411g7yf
8 并发编程和网络编程结合
我们之前学习的网络编程,一个服务端只能同时处理一个客户端的连接,另外一个客户端想要连接需要等待前一个客户端断开连接,哪怕后面学习了IO多路复用,允许客户端再没有进行收发的时候,服务端让另一个客户端进行连接,但本质还是只能同时处理一个客户端的数据
8.1 案例:多线程socket服务端
service.py
import socket
import threading
def task(conn):
conn.sendall('服务器已连接'.encode('utf-8'))
while True:
client_data = conn.recv(1024) # 单次最大接受1024字节
data = client_data.decode('utf-8') # 编码转换
if data == 'Network_connection_is_disconnected':
print('客户端已断开')
conn.close()
break
print('收到客户端消息:', data)
conn.sendall('已收到信息'.encode('utf-8'))
def run():
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 创建TCP服务端
sock.bind(('127.0.0.1',8001)) # 监听127.0.0.1 8001端口
sock.listen(5) # 允许5个请求等待
while True: # 死循环
# 等待客户端来连接
conn,addr = sock.accept()
# 创建子线程
t = threading.Thread(target=task,args=(conn,))
t.start() # 将接收到的客户端连接交给子线程处理
sock.close()
if __name__ == '__main__':
run()
8.2 案例:多进程socket服务端
import socket
import multiprocessing
def task(conn):
conn.sendall('服务器已连接'.encode('utf-8'))
while True:
client_data = conn.recv(1024) # 单次最大接受1024字节
data = client_data.decode('utf-8') # 编码转换
if data == 'Network_connection_is_disconnected':
print('客户端已断开')
conn.close()
break
print('收到客户端消息:', data)
conn.sendall('已收到信息'.encode('utf-8'))
def run():
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.bind(('127.0.0.1',8001))
sock.listen(5)
while True:
conn,addr = sock.accept()
p = multiprocessing.Process(target=task,args=(conn,))
p.start()
if __name__ == '__main__':
run()
8.3 客户端
client.py 客户端代码由于不需要与多个服务端连接,所以不需要做改动
import socket
client = socket.socket()
client.connect(('127.0.0.1',8001)) # 向服务端发送连接请求
message = client.recv(1024).decode('utf-8')
while True:
content = input('请输入发送内容(Q退出):').strip()
if content.upper() == "Q":
client.sendall('Network_connection_is_disconnected'.encode('utf-8'))
break
client.sendall(content.encode('utf-8'))
reply = client.recv(1024)
print(reply.decode('utf-8'))
client.close()
9 并发和并行
如何来理解这些概念呢?
-
串行,多个任务排队按照先后顺序逐一去执行
-
并发,假设有多个任务,只有一个CPU,那么在同一时刻只能处理一个任务,为了避免串行,可以让将任务切换运行(每个任务运行一点,然后再切换),达到并发效果(看似都在同时运行)
并发在Python代码中体现:协程、多线程(由CPython的GIL锁限制,多个线程无法被CPU调度)
-
并行,假设有多个任务,有多个CPU,那么同一时刻每个CPU都是执行一个任务,任务就可以真正的同时运行
并行在Python代码中的体现:多进程
10 基于模块导入实现单例模式
例如,我们要将一个函数做成单例模式
# utils.py
class Singleton:
def __init__(self):
self.name = "kinght"
single = Singleton()
使用之前学到的方法__new__
进行
class Singleton:
cls.instance = None
def __init__(self):
self.name = "kinght"
def __new__(cls,*args,**kwargs):
if cls.instance:
return cls.instance
cls.instacne = __new__(cls)
return cls.instance
但其实可以通过模块导入机制来简单实现,我们将需要单例模式的类做成一个独立的模块
# utils.py
class Singleton:
def __init__(self):
self.name = "kinght"
single = Singleton()
在模块进行导入的时候,系统会自动将其进行实例化,方便使用
from utils import Singleton
def run():
print(Singleton) # <class 'utils.Singleton'>
if __name__ == '__main__':
run()
当一个模块被重复导入时,不会重新加载,而是直接使用第一次导入的实例化结果
也就是说只需要再创建一个模块
# xx
from utils import Singleton
就可以达到单例模式的效果了
from utils import Singleton
import xx
def run():
print(Singleton)
print(xx.Singleton)
if __name__ == '__main__':
run()
'''输出结果
<class 'utils.Singleton'>
<class 'utils.Singleton'>
'''