Python开发-036_多进程与协程开发

本章节主要讲诉线程开发,协程将会在后续WEB框架章节进行详细介绍

1 多进程开发

进程是计算机中分配资源的最小单位,一个进程中可以有多个线程,并且,同一个进程中的线程共享资源,而进程与进程之间相互隔离,Python中可以通过多进程利用CPU的多核优势,进行计算密集型操作

注意:由于python创建进程需要基于forkspawnforkserver等模式,所以将其放于if __name__ == '__main__'中,才能让代码在跨平台的时候不出问题

import multiprocessing
def task():
    pass
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task)
    p1.start()

只要多进程的启动代码是从__main__启动就可以

from multiprocessing import Process
# 这里选择导入multiprocessing模块的process类
def task(arg):
    pass
def run():
    p = Process(target=task)
    p.start()
if __name__ == '__main__':
    run()

1.1 进程模式简介

在python的官方文档提到,根据平台不同,想要通过multiprocessing模块操作进程,可使用三种模式

  • fork,【“拷贝”几乎所有资源】【unix】【任意位置开始】【快】

    • 在创建子进程的时候,会将父进程的所有资源全部拷贝一份,子进程在开始的时候于父进程相同,但开始运行后,资源相互隔离

      import multiprocessing,time
      
      def task():
          print(name) # 2.子进程能获得主进程定义的name列表并输出
          name.append(123) # 3.子进程对列表添加123
          print(name) # 4.子进程列表输出[123]
      
      if __name__ == '__main__':
          multiprocessing.set_start_method("fork")
          name = [] # 1.主进程定义了name列表
      	
          # p1定义进程,拷贝主进程上方所有的资源
          p1 = multiprocessing.Process(target=task)
          p1.start()
      
          time.sleep(2) # 5.避免影响实验结果
          print(name) # [] 6.但是子进程对name列表的操作,无法影响主进程
      
    • 支持文件对象/线程锁等传参

      import multiprocessing,threading
      
      def task(lk):
          print(file_object)
          print(lk)
      
      if __name__ == '__main__':
          multiprocessing.set_start_method("fork")
          file_object = open("demo.txt",mode='rt',encoding='utf-8') # 支持自动拷贝
          lock = threading.RLock()
          p1 = multiprocessing.Process(target=task,args=lock) # 也支持手动传递
          p1.start()
      
    • 支持linux和mac系统,不支持windows系统

  • spawn,【run参数传必备资源】【unix、win】【main代码块开始】【慢】

    • 在创建子进程的时候,会创建一个新的python解释器去运行子进程的代码

    • 不支持文件对象/线程锁等传参

    • 支持Linux、Mac系统和windows系统

      import multiprocessing,time
      
      def task():
          print(name) # 报错:找不到name
          name.append(123)
          print(name)
      
      if __name__ == '__main__':
          multiprocessing.set_start_method("spawn")
          name = []
      
          p1 = multiprocessing.Process(target=task)
          p1.start()
      
          time.sleep(2)
          print(name)
      

      输出结果直接报错:NameError: name 'name' is not defined

      原因:拷贝的时候拷贝了python解释器,并没有拷贝程序定义的name列表,可以通过参数的形式进行传递name值,但是传递完成之后依然时分别独立的两份数据

      import multiprocessing,time
      
      def task(name):
          print(name)
          name.append(123)
          print(name) # name = [123]
      
      if __name__ == '__main__':
          multiprocessing.set_start_method("spawn")
          name = []
      
          p1 = multiprocessing.Process(target=task,args=name) # 使用args将列表传入task
          p1.start()
      
          time.sleep(2)
          print(name) # name = [] 传递完成之后依然时分别独立的两份数据
      
  • forkserver,【run参数传必备资源】【不支持文件对象/线程锁等传参】【部分unix】【main代码块开始】

    • 在开始运行主进程的主线程时,会创建一个模板(类似虚拟机的快照),需要创建子进程时,使用模板进行创建

    • 不支持文件对象/线程锁等传参

    • 由于模板是在程序运行之初完成的创建,其本质也就是一个python解释器,所以和spawn相同的处理机制

      import multiprocessing,time
      
      def task(name):
          print(name)
          name.append(123)
          print(name) # name = [123]
      
      if __name__ == '__main__':
          multiprocessing.set_start_method("forkserver")
          name = []
      
          p1 = multiprocessing.Process(target=task,args=name) # 使用args将列表传入task
          p1.start()
      
          time.sleep(2)
          print(name) # name = [] 传递完成之后依然时分别独立的两份数据
      

1.2 进程模式易出问题

1.2.1 子进程和主进程的数据不互通

import multiprocessing

def task():
    file_object.write('kinght\n') # 6.在内存内部写入kinght\n
    file_object.flush() # 7.将内存中的数据 aym\n kinght\n 写入硬盘
    
if __name__=="__main__":
    multiprocessing.set_start_method("fork") # 1.多线程定义为fork模式
    
    file_object = open("demo.txt",mode='a+',encoding='utf-8') # 2.获取到文件句柄
    file_object.write("aym\n") # 3.文件句柄在内存中写入aym
    
    p1 = multiprocessing.Process(target=task) # 4.创建多进程p1,使用函数task
    p1.start() # 5.启动p1 将file_object以及其内部的aym传递给task
    
# 8.子进程完成,主进程结束时,发现主进程中还有一个aym\n

'''最后输出结果
aym
kinght
aym
'''

由于主进程和子进程内存空间相互独立,而子进程又拷贝了内存空间中的aym\n,所以子进程写入硬盘的时候,写入的是aym\n kinght\n,但是子进程完成后,主进程中还有一个aym\n没有写入硬盘,系统会再次将主进程的数据写入硬盘,达到现在的结果

修改代码,提前刷入硬盘

import multiprocessing

def task():
    file_object.write('kinght\n') # 2.这里不会再继承主线程的aym
    file_object.flush() # 3.只刷入子进程自己的kinght

if __name__ == '__main__':
    multiprocessing.set_start_method('fork')
    file_object = open('demo.txt',mode='wt',encoding='utf-8')
    file_object.write("aym\n")
    file_object.flush() # 1.由于已经提前刷入硬盘,故内存中没有了aym
    
    p1 = multiprocessing.Process(target=task)
    p1.start()
    
'''最后输出结果
aym
kinght
'''

1.2.2 主进程锁会被子进程继承

import threading,multiprocessing

def func():
    lock.acquire() # 4.申请lock锁,但是锁被p1子进程的主线程申请走了,此线程暂停
    print('func')
    lock.release()

def task():
    print(lock)
    # <locked _thread.RLock object owner=8663369216 count=1 at 0x7fc830183fc0> 锁已被申请
    # 2.子进程中,锁同样存在,这里是被转化为了子进程的主线程申请
    lock.acquire() # 3.这里锁是被子进程的主进程再次申请,由于是Rlock,所以还能够申请第二次
    print(666) # 666能够被输出
    for i in range(10):
        t = threading.Thread(target=func)
        t.start()

if __name__ == '__main__':
    multiprocessing.set_start_method('fork')
    name = []
    lock = threading.RLock()
    lock.acquire() # 1.锁被主进程的主线程申请走了
    p1 = multiprocessing.Process(target=task) # fork创建子线程时,会将锁同样拷贝一份给子进程
    p1.start()

子进程继承锁之后,锁的申请会自动转换到,子进程的主线程,不影响主进程的主线程

1.3 进程常见方法

进程的常见方法:

1.3.1 p.start()

当前进程准备就绪,等待被CPU调度(工作单元为进程中的线程)

import multiprocessing
def task():
    pass
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task)
    p1.start()

1.3.2 p.join()

等待当前进程的任务执行完成后在往下执行

import multiprocessing
import time
from multiprocessing import Process

def task(arg):
    print('执行中')
    time.sleep(2)
    
if __name__ == '__main__':
    multiprocessing.set_start_method("spawn")
    p = Process(target=task,args=('demo',))
    p.start()
    p.join() # 等待p子进程所有代码执行完成后继续执行主进程
    print('主进程继续执行')

1.3.3 p.daemon=布尔值

守护进程,必须放在start之前

  • p.daemon=True,设置为守护进程,主进程执行完毕,不等待子进程,直接结束
  • p.daemon=False,默认状态,设置为非守护进程,主进程执行完成后,等待子进程执行完成再结束
import multiprocessing
import time

def task(arg):
    print('子进程开始执行...')
    time.sleep(2)
    print('子进程执行完毕')

if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')
    p = multiprocessing.Process(target=task,args=(112,))
    p.daemon = True # 子进程的第二条输出语句由于主进程关闭而无法执行
    # p.daemon = False  # 子进程的第二条输出语句由于主进程等待而获得执行
    p.start()

    time.sleep(1)
    print('主进程即将结束')

1.3.4 进程名称的设置和获取

import multiprocessing
def task(arg):
    print('当前进程名称为',multiprocessing.current_process().name) # 获取当前进程名称
if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')
    p = multiprocessing.Process(target=task,args=(11,))
    p.name = '哈哈哈哈哈' # 设置进程名称
    p.start()

1.3.5 进程的PID获取

相信熟悉Linux的大家对于PID这个名称都不陌生,每个在系统中运行的进程都有一个PID号

import os
os.getpid() # 获取当前进程的PID
os.getppid() # 获取当前进程的父进程PID

代码中演示:

import multiprocessing,os
def task(arg):
    print('子进程PID为',os.getpid()) # 获取子进程pid
    print('父进程PID为',os.getppid()) # 获取父进程pid
if __name__ == '__main__':
    print('主进程PID为', os.getpid())  # 获取主进程pid
    multiprocessing.set_start_method('spawn')
    p = multiprocessing.Process(target=task,args=(11,))
    p.start()
    
'''输出结果
主进程PID为 16280
子进程PID为 2088
父进程PID为 16280
'''

1.3.6 获取进程中的线程

import threading
import multiprocessing
import time

def func():
    time.sleep(2)
def task(args):
    for i in range(3):
        t = threading.Thread(target=func)
        t.start()
    thread_list = threading.enumerate() # 获取线程列表
    thread_len = len(thread_list) # 获取有多少个线程
    print('线程:',thread_list)
    print('数量:',thread_len)
if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')
    p = multiprocessing.Process(target=task,args=('xxx',))
    p.start()
  
'''输出结果 主线程+3个子线程
线程: [<_MainThread(MainThread, started 5332)>, <Thread(Thread-1, started 2692)>, <Thread(Thread-2, started 8072)>, <Thread(Thread-3, started 24736)>]
数量: 4
'''

1.3.7 自定义进程类

import multiprocessing

class MyProcess(multiprocessing.Process):
    def run(self):
        print('执行此进程',self._args)
if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')
    p = MyProcess(args=('xxx',))
    p.start()

1.3.8 CPU核数

进程不是越多越好,并不推荐进程数大于CPU的逻辑处理器数量

import multiprocessing
count = multiprocessing.cpu_count() # cpu逻辑处理器
print(count) # 12核 实际电脑是 6核CPU虚拟出12个逻辑处理器

通过CPU逻辑处理器数来创建多进程

import multiprocessing

def func():
    pass
if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    for i in range(count-1): # 还有主进程需要占用1个处理器
        p = multiprocessing.Process(target=func)
        p.start()

2 进程之间数据共享

在前文,我们通过大量篇幅证实了进程和进程之间互不通信这件事情,无数次的声明,进程是资源分配的最小单位,每个进程中都维护自己独立的数据

import multiprocessing

def task(data):
    data.append(666)
    print('task_data = ',data)

if __name__ == '__main__':
    data = []
    p = multiprocessing.Process(target=task,args=(data,))
    p.start()
    print('主进程data = ',data)

'''运行结果
主进程data =  []
task_data =  [666]
'''

2.1 共享

2.1.1 Shared memory

关于进程间的数据通信,python基于C语言的底层逻辑,实现了一个value的方法

from multiprocessing import Process,Value

def func(n,m1,m2):
    n.value = 888
    m1.value = 'a'.encode('utf-8') # char必须encode
    m2.value = '超'

if __name__ == '__main__':
    num = Value('i',666) # 类似C语言定义 int num = 666
    v1 = Value('c') # 类似C语言定义 char v1 = None
    v2 = Value('u') # 类似C语言定义 char v2 = None # 只不过可以是中文字符

    p = Process(target=func,args=(num,v1,v2))
    p.start()
    p.join() # 一定要等待子进程运行完成,再进行接下来的调用

    print(num.value) # 888
    print(v1.value) # b'a'
    print(v2.value) # 超

类似于强类型语言的做法,通过value创建一个固有大小的内存空间,将值存放于空间中,然后将指向空间的变量名,通过参数的形式传递给子进程,子进程修改也是直接修改对应空间内存放的值,其他进程读取也是读取对应空间的值,这样一个进程就可以影响多个进程

# 类似于强类型语言,此方法也会开辟固定空间大小
    'c': ctypes.c_char,  'u': ctypes.c_wchar,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte, 
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,  (其u表示无符号)
    'l': ctypes.c_long,  'L': ctypes.c_ulong, 
    'f': ctypes.c_float, 'd': ctypes.c_double

类似的方式,同样可以通过Array使用数组来创建空间,并且由于需要固定内存空间大小,所以数组内存放类型和数组长度,都是被固定死了的

from multiprocessing import Process,Array

def func(arr):
    arr[0] = 66
    arr[1] = 77
    arr[2] = 88
if __name__ == '__main__':
    arr = Array('i',[11,22,33]) # i int 这个数组只能放整形,并且数组长度只能是3

    p = Process(target=func,args=(arr,))
    p.start()
    p.join()

    print(arr[:]) # [66, 77, 88]

2.1.2 Server Process

基于manager()来创建空间,再其范围内,属于同一个大的内存空间,在运行子进程是将内存空间变量名作为参数传入,让子进程可修改主进程的值

from multiprocessing import Process,Manager

def func(demo1,demo2):
    # 子空间获得了Manager中的内存地址
    # 可以修改主进程中的demo1\demo2
    demo1['name'] = 'kinght' 
    demo1['age'] = 24
    demo2.append(666)

if __name__ == '__main__':
    # 在这个上下文管理器之外的变量无法获得传递
    with Manager() as manager:
        # 使用Manager()创建一个上下文管理,再此管理器范围内demo1,demo2内存空间可传递到子进程
        demo1 = manager.dict() # 创建一个字典空间
        demo2 = manager.list() # 创建一个列表空间
        p = Process(target=func,args=(demo1,demo2))
        p.start()
        p.join()

        print(demo1)
        print(demo2)

'''输出
{'name': 'kinght', 'age': 24}
[666]
'''

2.2 交换

2.2.1 Queue

使用multiprocessing.Queue创建一个队列,使用put将数据放入队列,使用get将数据取出

队列:先进先出

import multiprocessing

def task(queue):
    for i in range(10):
        queue.put(i) # 3.put将数据放入队列
if __name__ == '__main__':
    queue = multiprocessing.Queue() # 1.定义队列
    p = multiprocessing.Process(target=task,args=(queue,)) # 2.将队列作为参数传递给子进程
    p.start()
    p.join()

    print('主进程') # 主进程
    print(queue.get()) # 0 # 4.使用get依次取出数据
    print(queue.get()) # 1
    print(queue.get()) # 2
    print(queue.get()) # 3
    print(queue.get()) # 4
    print(queue.get()) # 5
    print(queue.get()) # 6

同样,主进程的操作会影响到子进程

import multiprocessing

def task(queue):
    print(queue.get())
    print(queue.get())
    print(queue.get())
    print(queue.get())
    print(queue.get())
    print(queue.get())
    print(queue.get())
if __name__ == '__main__':
    queue = multiprocessing.Queue()
    p = multiprocessing.Process(target=task,args=(queue,))
    for i in range(10):
        queue.put(i)
    p.start()
    p.join()

2.2.2 Pipes

Pipes可以被认为是种管道,它是一种双向的队列

A -> B
A <- B 

它的数据交换方式和网络编程有相似的地方

import time
import multiprocessing

def task(conn):
    time.sleep(1)
    conn.send([111,222,333]) # 3.子进程通过管道向主进程发送数据
    data = conn.recv() # 6.阻塞,等待管道主进程的数据
    print(data)

if __name__ == '__main__':
    parent_conn,child_conn = multiprocessing.Pipe() # 1.定义管道 parent_conn,child_conn为管道的两头
    p = multiprocessing.Process(target=task,args=(child_conn,)) # 2.将管道的一头发送给子进程
    p.start()

    info = parent_conn.recv() # 4.阻塞,等待管道子进程的数据
    print("主进程接受:",info)
    time.sleep(1)
    parent_conn.send('主进程已接受') # 5.主进程向子进程发送数据

上诉都是Python内部提供的进程之间数据共享和交换的机制,在项目开发中很少使用,后期项目中一般会借助第三方来做资源共享,例如Mysql、redis等

3 进程锁

如果多个进程抢占式的去做某些操作/共享同一个资源的时候,为了防止操作出问题,可以通过进程锁来避免

数据资源冲突

import time
from multiprocessing import Process,Value

def func(n):
    n.value = n.value + 1
if __name__ == '__main__':
    num = Value('i',0)
    for i in range(100):
        p = Process(target=func,args=(num,))
        p.start()
    time.sleep(3)
    print(num.value)

执行此案例,可能出现99或者100等多个结果

文件资源冲突

import time
import multiprocessing

def task():
    with open('demo.txt',mode='rt',encoding='utf-8') as read_file:
        current_num = int(read_file.read())
    print('余票剩下:',current_num,'抢购中...')
    time.sleep(2)
    current_num -= 1
    with open('demo.txt',mode='wt',encoding='utf-8') as write_file:
        write_file.write(str(current_num))

if __name__ == '__main__':
    for i in range(10):
        p = multiprocessing.Process(target=task)
        p.start()
        time.sleep(1)
        
'''运行结果
余票剩下: 9 抢购中...
余票剩下: 9 抢购中...
余票剩下: 8 抢购中...
余票剩下: 8 抢购中...
余票剩下: 7 抢购中...
余票剩下: 7 抢购中...
余票剩下: 6 抢购中...
余票剩下: 6 抢购中...
余票剩下: 5 抢购中...
余票剩下: 5 抢购中...
'''

额外提示:如果使用数据交换的模式(Queue/Pipes)进行数据共享,由于先入先出的机制,不会导致数据混乱

3.1 进程锁使用

进程锁和线程锁使用方式几乎一样,不过由于进程间数据不互通的情况,需要使用传参的方式进行传递到各个进程中间

import time
import multiprocessing

def task(lock): # 3.通过传参传进程锁
    lock.acquire() # 4.申请锁
    
    # 以下代码不会再冲突
    with open('demo.txt',mode='rt',encoding='utf-8') as read_file:
        current_num = int(read_file.read())
    print('余票剩下:',current_num,'抢购中...')
    time.sleep(2)
    current_num -= 1
    with open('demo.txt',mode='wt',encoding='utf-8') as write_file:
        write_file.write(str(current_num))
        
    lock.release() # 5.运行完成解除锁

if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')
    lock = multiprocessing.RLock() # 1.定义进程锁
    for i in range(10):
        p = multiprocessing.Process(target=task,args=(lock,)) # 2.创建进程时通过传参传递锁
        p.start()

TIPS:前文提到过,进程不能传递锁,那里特指线程锁

线程锁不能通过子进程参数传递,进程锁可以通过子进程参数传递

在部分系统执行spawn模式的时候,需要在主线程添加等待代码,等待子进程全部运行完成才不会报错

process_list = [] # 创建一个空列表
for i in range(10):
    p = multiprocessing.Process(target=task,args=(lock,))
    p.start()
    process_list.append() # 将每个进程添加到列表中
    
# 在所有进程开启完成后,监听保证每个进程都运行完成再结束主进程
for item in process_list:
    item.join() # 保证每个进程都运行完成

4 进程池

在开发过程中,如果无限制创建进程、线程,都会导致程序的效率降低,尤其是进程

我们之所以创建进程是为了CPU的多核优势,让他帮我们做并行计算,比如来了200个任务,但CPU只有4个逻辑处理器,我们需要创建的其实是4个进程,让计算机四个四个的处理,所以python为我们提供了进程池

TIPS:在python2中,没有线程池只有进程池、

python3有了线程池和进程池概念,并且将其放于concurrent.futures模块中

import time
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

def task(num):
    print('执行',num)
    time.sleep(1)
if __name__ == '__main__':
    # 修改进程模式
    multiprocessing.set_start_method('spawn')
    
    # 创建进程池,规定4个进程
    pool = ProcessPoolExecutor(4) 
    # for循环将任务提交给进程池
    for i in range(10):
        pool.submit(task,i) # task为方法 i为参数
        
    # 主进程等待进程池中所有任务执行完成后,再继续往下执行
    pool.shutdown(True)

与线程池相同,for循环会一次性提交10个进程给进程池,有进程池自行等待排序运行

4.1 add_done_callback()

import time
from concurrent.futures import ProcessPoolExecutor
import multiprocessing

def task(num):
    print('执行',num) # 2.执行task代码返回num
    time.sleep(2)
    return num

def done(res):
    print('done函数',multiprocessing.current_process().pid) # 查看当前运行进程的pid
    time.sleep(1)
    print(res.result()) # 查看res对象中的值 即 task返回值num
    time.sleep(1)

if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)
    for i in range(50):
        fur = pool.submit(task,i) # 1.申请线程运行,获取对象
        fur.add_done_callback(done) # 3.线程对象作为参数,运行done方法

    print('主进程',multiprocessing.current_process().pid) # 查看当前运行进程的pid
    pool.shutdown() # 等待线程池所有子进程运行完成后再往下执行主进程

进程池 add_done_callback(done)和线程池.add_done_callback(done)几乎相同,进程池也是将子进程的对象作为参数交给done方法运行

唯一的区别是,线程池的done方法是子线程运行,而进程池是主进程运行,从运行结果可以看出

主进程 27620
执行 0
执行 1
执行 2
执行 3
执行 4
done函数 27620
....省略后续

4.2 进程池加锁

进程池加锁需要基于Manager中的Lock和RLock实现

import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor

def task(lock):
    print('开始')
    with lock:
        with open('demo.txt', mode='rt', encoding='utf-8') as read_file:
            current_num = int(read_file.read())
        print('余票剩下:', current_num, '抢购中...')
        time.sleep(2)
        current_num -= 1
        with open('demo.txt', mode='wt', encoding='utf-8') as write_file:
            write_file.write(str(current_num))

if __name__ == '__main__':
    pool = ProcessPoolExecutor(5)
    # lock = multiprocessing.RLock() # 不能使用
    manager = multiprocessing.Manager() # 只能使用manager
    lock = manager.RLock()
    for i in range(10):
        pool.submit(task,lock)

5 进程综合案例

需求:现有多个日志文件,需计算每日访问总量以及每日访问IP数量

image-20220905161241318

文件名:

20210322.log
20210323.log
20210324.log
20210325.log
20210326.log
20210327.log
20210328.log
20210329.log

代码处理:

import os,time
from concurrent.futures import ProcessPoolExecutor


def task(file_name):
    # 5.定义一个空元组存放IP数量 定义一个total计算总数
    ip_set = set()
    total = 0

    # 6.打开文件进行处理
    file_path = os.path.join('files',file_name)
    with open(file_path,mode='rt',encoding='utf-8') as file_object:
        for rank in file_object:
            total += 1 # 计算有多少行
            ip = rank.strip().split(" - - ")[0] # 注意:由于计算量太大,不能使用解包
            ip = ip.split(',')[0]
            ip_set.add(ip) # 将提取出来的IP放入元组,元组会自动过滤重复的
        ip_len = len(ip_set) # 计算IP元组的长度得到IP数
    time.sleep(1)
    return {'total':total,'ip':ip_len} # 7.将结果组合返回

def outer(info,filename): # 9.outer同时接受空字典、文件名
    def done(res,*args,**kwargs):
        info[filename] = res.result() # 10.组合字典,res.result()得到返回值{'total':total,'ip':ip_len}
    return done # 10.done返回给步骤8执行

if __name__ == '__main__':
    info = {} # 1.定义一个空字典 存放结果
    pool = ProcessPoolExecutor(4) # 2.定义进程池
    file_list = os.listdir('files') # 3.获取files下所有的文件名

    # 让每个进程独立处理一个文件
    for filename in file_list:
        fur = pool.submit(task,filename) # 4.发送到进程池
        fur.add_done_callback(outer(info,filename)) # 8.将结果交给outer处理
        print(filename)

    pool.shutdown(True) # 11.等待所有子进程完成

    for k, v in info.items():
        print(k, v) # 12.字典解包输出

6 协程

暂时以理解为主,将会在后续Web框架课程中详细介绍

在计算机中,其实只提供了线程、进程用于实现并发编程,而协程是程序员通过代码搞出来的一个东西

官方定义为:协程也可以被称为微线程,是一种用户态内的上下文切换技术

简而言之,就是一个线程实现代码块的相互切换执行

案例

def func1():
    print(1)
    ...
    print(2)
    
def func2():
    print(3)
    ...
    print(4)
    
func1()
func2()

上述代码是普通的函数定义和执行,按流程分别执行两个函数中的代码,并先后会输出:1、2、3、4

但如果介入协程技术那么就可以实现函数见代码切换执行,最终输入:1、3、2、4

6.1 实现函数运行跳转的办法

6.1.1 greenlet

注意:未知错误,python解释器无法连接到已通过pip install greenlet下载的 from greenlet import greenlet中,所以并未能浮现该代码

from greenlet import greenlet

def func1():
    print(1)  # 第1步:输出 1
    gr2.switch()  # 第3步:切换到 func2 函数
    print(2)  # 第6步:输出 2
    gr2.switch()  # 第7步:切换到 func2 函数,从上一次执行的位置继续向后执行

def func2():
    print(3)  # 第4步:输出 3
    gr1.switch()  # 第5步:切换到 func1 函数,从上一次执行的位置继续向后执行
    print(4)  # 第8步:输出 4

gr1 = greenlet(func1)
gr2 = greenlet(func2)

gr1.switch()  # 第1步:去执行 func1 函数

6.1.2 yield

def func1():
    yield 1
    yield from func2()
    yield 2
    
def func2():
    yield 3
    yield 4
    
f1 = func1()
for item in f1:
    print(item)

虽然上述两种都实现了协程,但这种编写代码的方式没啥意义

这种来回切换执行,可能反倒让程序的执行速度更慢了(相比较于串行)

6.2 协程如何才能更有意义呢?

不需要用户手动切换,而是程序到达IO操作的等待时间,能够自动切换就有意义了

Python在3.4之后推出了asyncio模块 + Python3.5推出async、async语法 ,内部基于协程并且遇到IO请求自动化切换

import asyncio


async def func1():
    print(1) # 4.首先运行func1
    await asyncio.sleep(1) # 5.遇到IO阻塞或者等待的时候
    print(2) # 7.在IO阻塞或者等待结束后会自动切换回func1


async def func2():
    print(3) # 6.会自动跳转到列表中的第二个func2运行
    await asyncio.sleep(2) # 8.由于func1运行完成,所以不会再跳转
    print(4) # 9.运行结束


tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
] # 1.创建任务列表
loop = asyncio.get_event_loop() # 2.创建协程对象
loop.run_until_complete(asyncio.wait(tasks)) # 3.开始运行列表中的方法

使用爬虫案例来详细了解

"""
需要先安装:pip3 install aiohttp
"""

import aiohttp
import asyncio

async def fetch(session, url):
    print("发送请求:", url)
    async with session.get(url, verify_ssl=False) as response: # 3.发生阻塞,自动跳转到列表下一个元素进行运行
        content = await response.content.read() # 4.等待接受到数据,再转回进行数据处理
        file_name = url.rsplit('_')[-1]
        with open(file_name, mode='wb') as file_object:
            file_object.write(content)
            
async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
            'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
            'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
        ]
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] # 1.列表生成式创建协程任务列表
        await asyncio.wait(tasks) # 2.开始运行tasks任务列表
if __name__ == '__main__':
    asyncio.run(main())

通过上述内容发现,在处理IO请求时,协程通过一个线程就可以实现并发的操作

7 协程、线程、进程的区别?

线程,是计算机中可以被cpu调度的最小单元。
进程,是计算机资源分配的最小单元(进程为线程提供资源)。
一个进程中可以有多个线程,同一个进程中的线程可以共享此进程中的资源。

由于CPython中GIL的存在:
- 线程,适用于IO密集型操作。
- 进程,适用于计算密集型操作。

协程,协程也可以被称为微线程,是一种用户态内的上下文切换技术,在开发中结合遇到IO自动切换,就可以通过一个线程实现并发操作。

通常情况下,只让协程做网络IO的请求,在处理IO操作时,协程比线程更加节省开销(协程的开发难度大一些),返回之后的数据,一般选择放进文件、队列、数据库等之后再交由其他的代码进行处理(毕竟协程只有一个主进程)

现在很多Python中的框架都在支持协程,比如:FastAPI、Tornado、Sanic、Django 3、aiohttp等,企业开发使用的也越来越多(目前不是特别多)

关于协程,目前先了解这些概念即可,更深入的开发、应用 暂时不必过多了解,等学了Web框架和爬虫相关知识之后,再来学习和补充效果更佳。有兴趣想要研究的同学可以参考路飞学城ALEX老师写的文章和专题视频:

- 文章
https://pythonav.com/wiki/detail/6/91/
https://zhuanlan.zhihu.com/p/137057192
- 视频
https://www.bilibili.com/video/BV1NA411g7yf

8 并发编程和网络编程结合

我们之前学习的网络编程,一个服务端只能同时处理一个客户端的连接,另外一个客户端想要连接需要等待前一个客户端断开连接,哪怕后面学习了IO多路复用,允许客户端再没有进行收发的时候,服务端让另一个客户端进行连接,但本质还是只能同时处理一个客户端的数据

8.1 案例:多线程socket服务端

service.py

import socket
import threading

def task(conn):
    conn.sendall('服务器已连接'.encode('utf-8'))
    while True:
        client_data = conn.recv(1024) # 单次最大接受1024字节
        data = client_data.decode('utf-8') # 编码转换
        if data == 'Network_connection_is_disconnected':
            print('客户端已断开')
            conn.close()
            break
        print('收到客户端消息:', data)
        conn.sendall('已收到信息'.encode('utf-8'))
def run():
    sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 创建TCP服务端
    sock.bind(('127.0.0.1',8001)) # 监听127.0.0.1 8001端口
    sock.listen(5) # 允许5个请求等待
    while True: # 死循环
        # 等待客户端来连接
        conn,addr = sock.accept()
        # 创建子线程
        t = threading.Thread(target=task,args=(conn,))
        t.start() # 将接收到的客户端连接交给子线程处理
    sock.close()
if __name__ == '__main__':
    run()

8.2 案例:多进程socket服务端

import socket
import multiprocessing

def task(conn):
    conn.sendall('服务器已连接'.encode('utf-8'))
    while True:
        client_data = conn.recv(1024)  # 单次最大接受1024字节
        data = client_data.decode('utf-8')  # 编码转换
        if data == 'Network_connection_is_disconnected':
            print('客户端已断开')
            conn.close()
            break
        print('收到客户端消息:', data)
        conn.sendall('已收到信息'.encode('utf-8'))

def run():
    sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sock.bind(('127.0.0.1',8001))
    sock.listen(5)
    while True:
        conn,addr = sock.accept()
        p = multiprocessing.Process(target=task,args=(conn,))
        p.start()

if __name__ == '__main__':
    run()

8.3 客户端

client.py 客户端代码由于不需要与多个服务端连接,所以不需要做改动

import socket

client = socket.socket()
client.connect(('127.0.0.1',8001)) # 向服务端发送连接请求

message = client.recv(1024).decode('utf-8')
while True:
    content = input('请输入发送内容(Q退出):').strip()
    if content.upper() == "Q":
        client.sendall('Network_connection_is_disconnected'.encode('utf-8'))
        break
    client.sendall(content.encode('utf-8'))
    reply = client.recv(1024)
    print(reply.decode('utf-8'))
client.close()

9 并发和并行

如何来理解这些概念呢?

  • 串行,多个任务排队按照先后顺序逐一去执行

  • 并发,假设有多个任务,只有一个CPU,那么在同一时刻只能处理一个任务,为了避免串行,可以让将任务切换运行(每个任务运行一点,然后再切换),达到并发效果(看似都在同时运行)

    并发在Python代码中体现:协程、多线程(由CPython的GIL锁限制,多个线程无法被CPU调度)
    
  • 并行,假设有多个任务,有多个CPU,那么同一时刻每个CPU都是执行一个任务,任务就可以真正的同时运行

    并行在Python代码中的体现:多进程
    

10 基于模块导入实现单例模式

例如,我们要将一个函数做成单例模式

# utils.py

class Singleton:
    def __init__(self):
        self.name = "kinght" 
        
single = Singleton()

使用之前学到的方法__new__进行

class Singleton:
    cls.instance = None
    def __init__(self):
        self.name = "kinght" 
    def __new__(cls,*args,**kwargs):
        if cls.instance:
            return cls.instance
        cls.instacne = __new__(cls)
        return cls.instance

但其实可以通过模块导入机制来简单实现,我们将需要单例模式的类做成一个独立的模块

# utils.py

class Singleton:
    def __init__(self):
        self.name = "kinght" 
        
single = Singleton()

在模块进行导入的时候,系统会自动将其进行实例化,方便使用

from utils import Singleton

def run():
   print(Singleton) # <class 'utils.Singleton'>

if __name__ == '__main__':
    run()

当一个模块被重复导入时,不会重新加载,而是直接使用第一次导入的实例化结果

也就是说只需要再创建一个模块

# xx
from utils import Singleton

就可以达到单例模式的效果了

from utils import Singleton
import xx
def run():
    print(Singleton)
    print(xx.Singleton)
if __name__ == '__main__':
    run()
'''输出结果
<class 'utils.Singleton'>
<class 'utils.Singleton'>
'''

Python开发-036_多进程与协程开发
http://localhost:8080/archives/jhdMKz5g
作者
kinght
发布于
2024年11月11日
更新于
2024年11月11日
许可协议