multiprocessing常用组件及功能
创建管理进程模块:
Process(用于创建进程模块)
Process模块用来创建子进程,是Multiprocessing核心模块,使用方式与Threading类似,可以实现多进程的创建,启动,关闭等操作。
Pool(用于创建管理进程池)
Pool模块是用来创建管理进程池的,当子进程非常多且需要控制子进程数量时可以使用此模块。
Queue(用于进程通信,资源共享)
Queue模块用来控制进程安全,与线程中的Queue用法一样。
Value,Array(用于进程通信,资源共享)
Pipe(用于管道通信)
Pipe模块用来管道操作。
Manager(用于资源共享)
Manager模块常与Pool模块一起使用,作用是共享资源。
同步子进程模块:
Condition
Event
用来实现进程间同步通信。
Lock
当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突
RLock Semaphore
用来控制对共享资源的访问数量,例如池的最大连接数
一、创建进程
1、fork创建进程
该方法只能在unix/Linux/Mac上运行,windows不可以运行。 程序执行到fork()时,操作系统会创建一个新进程(子进程),并把父进程的所有信息赋值到子进程中。
这个方法很特殊,会有两次返回,分别在子进程和父进程返回一次,子进程永远返回0,父进程返回进程的id.
getpid():返回当前进程的id
getppid():返回当前进程父进程的id。
# -*- coding:utf-8 -*- """ @Time: 2019-12-25 @Author: kenwu @File: processtest.py """ # 第一种方式:使用OS模块中的fork方式实现多进程 import os print('current Process ({}) start ......'.format(os.getpid())) pid = os.fork() if pid < 0: print('error in fork') elif pid == 0: print('I am child process ({0}, my parent process is ({1}))'.format(os.getpid(), os.getppid())) else: print('I ({0}) created a child process ({1})'.format(os.getpid(), pid))
输出:
2、multiprocessing创建进程
Process([group,target,name,args,kwargs])
group:至今还未使用,值始终为None target:进程实例所调用的对象,一般表示子进程要调用的函数。 args:表示调用对象的参数,一般是函数的参数 kwargs:表示调用对象的关键字参数字典。 name:当前进程实例的别名
Process常用方法
p.is_alive():判断进程是否还在运行。如果还在运行,返回true,否则返回false p.join([timeout]):等待进程实例执行完毕,或等待多少秒 p.run():默认会调用target指定的对象,如果没有给定target参数,对该进程对象调用start()方法时,就会执行对象中的run()方法 p.start():启动进程实例(创建子进程),并运行子进程的run方法 p.terminate():不管任务是否完成,立即终止,同时不会进行任何的清理工作,如果进程p创建了它自己的子进程,这些进程就会变成僵尸进程,使用时特别注意,如果p保存了一个锁或者参与了进程间通信,那么使用该方法终止它可能会导致死锁或者I/O损坏。
Process类常用属性:
p.daemon:布尔值,指示进程是否是后台进程。当创建它的进程终止时,后台进程会自动终止。并且,后台进程无法创建自己的新进程。注意:p.daemon的值必须在p.start方法调用前设置。 p.exitcode:进程的整数退出指令。如果进程仍然在运行,它的值为None,如果值为负数:—N,就表示进程由信号N所终止。 p.name:当前进程实例别名,默认为Process-N,N为从1开始递增的整数。 p.pid:当前进程实例的PID
# 第二种方法:使用multiprocessing模块创建多进程 from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print('Child process {0} ({1}) Running......'.format(name, os.getpid())) print('Parent Process ({}) '.format(os.getpid())) p_list = [] for i in range(5): p = Process(target=run_proc, args=(str(i),)) p_list.append(p) print('Process will start.') p_list[i].start() for p in p_list: p.join() print('Process end.')
3、进程池Pool
进程初始化时,会指定一个最大进程数量,当有新的请求需要创建进程时,如果此时进程池还没有到达设置的最大进程数,该进程池就会创建新的进程来处理该请求,并把该进程放到进程池中,如果进程池已经达到最大数量,请求就会等待,知道进程池中进程数量减少,才会新建进程来执行请求。
pool=Pool(numprocess,initializer,initargs)
numproxess:需要创建的进程个数,如果忽略将使用cpu_count()的值。即系统上的CPU数量。 nitializer:每个进程启动时都要调用的对象。 initargs:为initalizer传递的参数。
multiprocessing.Pool常用函数解析:
apply_async(要调用的方法,参数列表,关键字参数列表):使用非阻塞方式调用指定方法,并行执行(同时执行)
apply(要调用的方法,参数列表,关键字参数列表):使用阻塞方式调用指定方法,,阻塞方式就是要等上一个进程退出后,下一个进程才开始运行。
close():关闭进程池,不再接受进的进程请求,但已经接受的进程还是会继续执行。
terminate():不管程任务是否完成,立即结束。
join():主进程堵塞(就是不执行join下面的语句),直到子进程结束,注意,该方法必须在close或terminate之后使用。
pool.map(func,iterable,chunksize):将可调用对象func应用给iterable的每一项,然后以列表形式返回结果,通过将iterable划分为多块,并分配给工作进程,可以并行执行。chunksize指定每块中的项数,如果数据量较大,可以增大chunksize的值来提升性能。
pool.map_async(func,iterable,chunksize,callback):与map方法不同之处是返回结果是异步的,如果callback指定,当结果可用时,结果会调用callback。
pool.imap(func,iterable,chunksize):与map()方法的不同之处是返回迭代器而非列表。
pool.imap_unordered(func,iterable,chunksize):与imap()不同之处是:结果的顺序是根据从工作进程接收到的时间而定的。
pool.get(timeout):如果没有设置timeout,将会一直等待结果,如果设置了timeout,超过timeout将引发multiprocessing.TimeoutError异常。
pool.ready():如果调用完成,返回True
pool.successful():如果调用完成并且没有引发异常,返回True,如果在结果就绪之前调用,jiang引发AssertionError异常。
pool.wait(timeout):等待结果变为可用,timeout为等待时间。
# 第三种,使用进程池 from multiprocessing import Pool import os, time, random def run_task(name): print('Task {0} (pid = {1}) is running ......'.format(name, os.getpid())) time.sleep(random.random() * 3) print('Task {} end.'.format(name)) print('Current Process {} .'.format(os.getpid())) p = Pool(processes=3) for i in range(5): p.apply_async(run_task, args=(i,)) print('Waiting for all subprocesses done ......') p.close() p.join() print('All subprocesses done.')
# 阻塞与非阻塞对比 from multiprocessing import Pool import os import time import random # 用来生成随机数 def test1(name): print("%s运行中,pid=%d,父进程:%d" % (name, os.getpid(), os.getppid())) t_start = time.time() # random.random()会生成一个0——1的浮点数 time.sleep(random.random() * 3) t_end = time.time() print("%s执行时间:%0.2f秒" % (name, t_end - t_start)) pool = Pool(5) # 设置线程池中最大线程数量为5 for xx in range(0, 7): # 非阻塞运行 pool.apply_async(test1, ("mark" + str(id),)) print("--start1--") pool.close() # 关闭线程池,关闭后不再接受进的请求 pool.join() # 等待进程池所有进程都执行完毕后,开始执行下面语句 print("--end1--") print("*" * 30) pool = Pool(5) # 设置线程池中最大线程数量为5 for xx in range(0, 7): # 阻塞运行 pool.apply(test1, ("mark" + str(id),)) print("--start2--") pool.close() # 关闭线程池,关闭后不再接受进的请求 pool.join() # 等待进程池所有进程都执行完毕后,开始执行下面语句 print("--end2--")
4、继承Process创建线程
from multiprocessing import Process import os import time class MyProcess(Process): #重新init方法 def __init__(self,interval): #下面一句是调用父类init方法,这一本尽量不要少,因为父类还有很多事情需要在init方法内处理 Process.__init__(self) self.interval=interval #重写run方法 def run(self): print("子进程运行中,pid=%d,父进程:%d" % (os.getpid(), os.getppid())) t_start=time.time() time.sleep(self.interval) t_end=time.time() print("子进程运行结束,耗时:%0.2f秒"%(t_end-t_start)) if __name__=="__main__": t_start=time.time() print("父进程开始执行") p=MyProcess(2) p.start() p.join() t_end=time.time() print("父进程运行结束,耗时:%0.2f秒" % (t_end - t_start))
二、进程间通信
1、Queue
from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码 def proc_write(q, urls): for url in urls: q.put(url) print('Put {0} to queue by Process {1}'.format(url, os.getpid())) time.sleep(random.random()) def proc_read(q): while True: url = q.get(True) print('Get {0} from queue by Process {1}'.format(url, os.getpid())) if __name__ == '__main__': # 父进程创建queue,并传给各个子进程。 q = Queue() proc_writer1 = Process(target=proc_write, args=(q, ['url_1', 'url_2', 'url_3'])) proc_writer2 = Process(target=proc_write, args=(q, ['url_4', 'url_5', 'url_6'])) proc_writer3 = Process(target=proc_write, args=(q, ['url_7', 'url_8', 'url_9'])) proc_reader1 = Process(target=proc_read, args=(q,)) proc_reader2 = Process(target=proc_read, args=(q,)) # 启动子进程proc_wirter,写入: proc_writer1.start() proc_writer2.start() proc_writer3.start() # 启动子进程proc_reader,读取: proc_reader1.start() proc_reader2.start() # 等待proc_writer结束: proc_writer1.join() proc_writer2.join() proc_writer3.join() # proc_reader进程是死循环,无法等待其结束,只能强行终止: proc_reader1.terminate() proc_reader2.terminate()
2、PIPE
在内存中开辟一个新的空间,对多个进程可见,在通信形式上形成一种约束
现有2个进程A和B,他们都在内存中开辟了空间,那么我们在内存中再开辟一个空间C,作用是连接这两个进程的。对于进程来说内存空间是可以共享的(任何一个进程都可以使用内存,内存当中的空间是用地址来标记的,我们通过查找某一个地址就能找到这个内存)A进程可以不断的向C空间输送东西,B进程可以不断的从C空间读取东西,这就是进程间的通信
总结:
1.向管道发送数据使用send函数,从管道接收数据使用recv()函数 2.recv()函数为阻塞函数,当管道中数据为空的时候会阻塞 3.一次recv()只能接收一次send()的内容 4.send可以发送的数据类型比较多样,字符串,数字,列表等等
# 使用管道的方式进行线程间的通信 from multiprocessing import Process, Pipe import os, time def func(name, a): time.sleep(1) a.send('测试测试: ' + str(name)) print('父进程的id为:{}'.format(os.getppid()), " ————— ", '子进程的id为:{}'.format(os.getpid())) if __name__ == '__main__': child_conn, parent_conn = Pipe() job = [] for i in range(5): p = Process(target=func, args=(i, child_conn)) job.append(p) p.start() time.sleep(2) for i in range(5): data = parent_conn.recv() print(data) for i in job: i.join()
三、进程间同步与共享数据
1、共享内存方式
Value、Array是通过共享内存的方式共享数据
# -*- coding:utf-8 -*- """ 共享内存:Value, Array 逻辑:2个进程,对同一份数据,一个做加法,一个做减法,各做10次 用法: 1、创建共享变量 o = Value('i',1000) or o = Array('i', List)指定不同的类型 2、启动子进程通过o.value or o[i]存取变量的值 3、必要使用锁操作函数o.acquire()加锁 o.release()释放锁 参考: 1、get_lock()返回共享变量使用的Rlock实例 2、get_obj()返回共享变量数据类型 """ from multiprocessing import Process, Value, Array import time import random m = Array('i', [1, 2, 3, 4, 5]) # 对共享变量加法 def save_money(money): """ 针对money.Value进行10次加操作 :param money: :return: """ money.acquire() # 申请锁 for i in range(3): time.sleep(0.3) change = random.randint(1, 200) money.value += change print('\n', money.value, +change) money.release() # 释放锁 # 对共享变量减法 def take_money(money): """ 针对money.Value进行10次减操作 :param money: :return: """ money.acquire() for i in range(3): time.sleep(0.3) change = random.randint(1, 150) money.value -= change print('\n', money.value, -change) money.release() # 对共享变量加法 def save_money2(): m.acquire() # 申请锁 m[2] = 8 print([i for i in m]) m.release() # 释放锁 # 对共享变量减法 def take_money2(): m.acquire() # 申请锁 print([i for i in m]) m.release() # 释放锁 if __name__ == '__main__': # 共享内存,可以多个进程存取,证书,变量名money,变量值1000 money = Value('i', 1000) p_add = Process(target=save_money, args=(money,)) p_sub = Process(target=take_money, args=(money,)) p_add2 = Process(target=save_money, args=(money,)) p_add.start() p_sub.start() p_add2.start() p_add.join() p_sub.join() p_add2.join() Process(target=save_money2).start() Process(target=take_money2).start()
2、共享进程方式
Manager是通过共享进程的方式共享数据。 Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。 Manager对象控制一个拥有list、dict、Lock、Condition、Event、Queue等对象的服务端进程,并且允许其他进程访问这些对象
from multiprocessing import Process, Manager def fun1(dic, lis, index): dic[index] = 'a' dic['2'] = 'b' lis.append(index) # [0,1,2,3,4,0,1,2,3,4,5,6,7,8,9] # print(l) if __name__ == '__main__': with Manager() as manager: dic = manager.dict() # 注意字典的声明方式,不能直接通过{}来定义 l = manager.list(range(5)) # [0,1,2,3,4] process_list = [] for i in range(10): p = Process(target=fun1, args=(dic, l, i)) p.start() process_list.append(p) for res in process_list: res.join() print(dic) print(l)
四、分布集群的消息传递
举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?
原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。
我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:
# task_master.py import random, time, queue from multiprocessing.managers import BaseManager # 发送任务的队列: task_queue = queue.Queue() # 接收结果的队列: result_queue = queue.Queue() # 从BaseManager继承的QueueManager: class QueueManager(BaseManager): pass # 把两个Queue都注册到网络上, callable参数关联了Queue对象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 绑定端口5000, 设置验证码'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 启动Queue: manager.start() # 获得通过网络访问的Queue对象: task = manager.get_task_queue() result = manager.get_result_queue() # 放几个任务进去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 从result队列读取结果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 关闭: manager.shutdown() print('master exit.')
请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。
然后,在另一台机器上启动任务进程(本机上启动也可以):
import time, sys from multiprocessing.managers import BaseManager from multiprocessing import Queue # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是运行task_master.py的机器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与task_master.py设置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 从网络连接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n * n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 处理结束: print('worker exit.')