Python进程系列

multiprocessing常用组件及功能

创建管理进程模块:

Process(用于创建进程模块)

Process模块用来创建子进程,是Multiprocessing核心模块,使用方式与Threading类似,可以实现多进程的创建,启动,关闭等操作。

Pool(用于创建管理进程池)

Pool模块是用来创建管理进程池的,当子进程非常多且需要控制子进程数量时可以使用此模块。

Queue(用于进程通信,资源共享)

Queue模块用来控制进程安全,与线程中的Queue用法一样。

Value,Array(用于进程通信,资源共享)

Pipe(用于管道通信)

Pipe模块用来管道操作。

Manager(用于资源共享)

Manager模块常与Pool模块一起使用,作用是共享资源。

同步子进程模块:

Condition

Event

用来实现进程间同步通信。

Lock

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突

RLock Semaphore

用来控制对共享资源的访问数量,例如池的最大连接数

一、创建进程

1、fork创建进程

该方法只能在unix/Linux/Mac上运行,windows不可以运行。 程序执行到fork()时,操作系统会创建一个新进程(子进程),并把父进程的所有信息赋值到子进程中。

这个方法很特殊,会有两次返回,分别在子进程和父进程返回一次,子进程永远返回0,父进程返回进程的id.

getpid():返回当前进程的id

getppid():返回当前进程父进程的id。

# -*- coding:utf-8 -*-

"""
@Time: 2019-12-25
@Author: kenwu
@File: processtest.py
"""

# 第一种方式:使用OS模块中的fork方式实现多进程
import os

print('current Process ({}) start ......'.format(os.getpid()))
pid = os.fork()
if pid < 0:
    print('error in fork')
elif pid == 0:
    print('I am child process ({0}, my parent process is ({1}))'.format(os.getpid(), os.getppid()))
else:
    print('I ({0}) created a child process ({1})'.format(os.getpid(), pid))

输出:

2、multiprocessing创建进程

Process([group,target,name,args,kwargs])

group:至今还未使用,值始终为None target:进程实例所调用的对象,一般表示子进程要调用的函数。 args:表示调用对象的参数,一般是函数的参数 kwargs:表示调用对象的关键字参数字典。 name:当前进程实例的别名

Process常用方法

p.is_alive():判断进程是否还在运行。如果还在运行,返回true,否则返回false p.join([timeout]):等待进程实例执行完毕,或等待多少秒 p.run():默认会调用target指定的对象,如果没有给定target参数,对该进程对象调用start()方法时,就会执行对象中的run()方法 p.start():启动进程实例(创建子进程),并运行子进程的run方法 p.terminate():不管任务是否完成,立即终止,同时不会进行任何的清理工作,如果进程p创建了它自己的子进程,这些进程就会变成僵尸进程,使用时特别注意,如果p保存了一个锁或者参与了进程间通信,那么使用该方法终止它可能会导致死锁或者I/O损坏。

Process类常用属性:

p.daemon:布尔值,指示进程是否是后台进程。当创建它的进程终止时,后台进程会自动终止。并且,后台进程无法创建自己的新进程。注意:p.daemon的值必须在p.start方法调用前设置。 p.exitcode:进程的整数退出指令。如果进程仍然在运行,它的值为None,如果值为负数:—N,就表示进程由信号N所终止。 p.name:当前进程实例别名,默认为Process-N,N为从1开始递增的整数。 p.pid:当前进程实例的PID

# 第二种方法:使用multiprocessing模块创建多进程

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print('Child process {0} ({1}) Running......'.format(name, os.getpid()))


print('Parent Process ({}) '.format(os.getpid()))
p_list = []
for i in range(5):
    p = Process(target=run_proc, args=(str(i),))
    p_list.append(p)
    print('Process will start.')
    p_list[i].start()

for p in p_list:
    p.join()

print('Process end.')

3、进程池Pool

进程初始化时,会指定一个最大进程数量,当有新的请求需要创建进程时,如果此时进程池还没有到达设置的最大进程数,该进程池就会创建新的进程来处理该请求,并把该进程放到进程池中,如果进程池已经达到最大数量,请求就会等待,知道进程池中进程数量减少,才会新建进程来执行请求。

pool=Pool(numprocess,initializer,initargs)

numproxess:需要创建的进程个数,如果忽略将使用cpu_count()的值。即系统上的CPU数量。 nitializer:每个进程启动时都要调用的对象。 initargs:为initalizer传递的参数。

multiprocessing.Pool常用函数解析:

apply_async(要调用的方法,参数列表,关键字参数列表):使用非阻塞方式调用指定方法,并行执行(同时执行)

apply(要调用的方法,参数列表,关键字参数列表):使用阻塞方式调用指定方法,,阻塞方式就是要等上一个进程退出后,下一个进程才开始运行。

close():关闭进程池,不再接受进的进程请求,但已经接受的进程还是会继续执行。

terminate():不管程任务是否完成,立即结束。

join():主进程堵塞(就是不执行join下面的语句),直到子进程结束,注意,该方法必须在close或terminate之后使用。

pool.map(func,iterable,chunksize):将可调用对象func应用给iterable的每一项,然后以列表形式返回结果,通过将iterable划分为多块,并分配给工作进程,可以并行执行。chunksize指定每块中的项数,如果数据量较大,可以增大chunksize的值来提升性能。

pool.map_async(func,iterable,chunksize,callback):与map方法不同之处是返回结果是异步的,如果callback指定,当结果可用时,结果会调用callback。

pool.imap(func,iterable,chunksize):与map()方法的不同之处是返回迭代器而非列表。

pool.imap_unordered(func,iterable,chunksize):与imap()不同之处是:结果的顺序是根据从工作进程接收到的时间而定的。

pool.get(timeout):如果没有设置timeout,将会一直等待结果,如果设置了timeout,超过timeout将引发multiprocessing.TimeoutError异常。

pool.ready():如果调用完成,返回True

pool.successful():如果调用完成并且没有引发异常,返回True,如果在结果就绪之前调用,jiang引发AssertionError异常。

pool.wait(timeout):等待结果变为可用,timeout为等待时间。

# 第三种,使用进程池

from multiprocessing import Pool
import os, time, random


def run_task(name):
    print('Task {0} (pid = {1}) is running ......'.format(name, os.getpid()))
    time.sleep(random.random() * 3)
    print('Task {} end.'.format(name))


print('Current Process {} .'.format(os.getpid()))
p = Pool(processes=3)
for i in range(5):
    p.apply_async(run_task, args=(i,))
print('Waiting for all subprocesses done ......')
p.close()
p.join()
print('All subprocesses done.')

# 阻塞与非阻塞对比
from multiprocessing import Pool
import os
import time
import random  # 用来生成随机数


def test1(name):
    print("%s运行中,pid=%d,父进程:%d" % (name, os.getpid(), os.getppid()))
    t_start = time.time()
    # random.random()会生成一个0——1的浮点数
    time.sleep(random.random() * 3)
    t_end = time.time()
    print("%s执行时间:%0.2f秒" % (name, t_end - t_start))


pool = Pool(5)  # 设置线程池中最大线程数量为5
for xx in range(0, 7):
    # 非阻塞运行
    pool.apply_async(test1, ("mark" + str(id),))
print("--start1--")
pool.close()  # 关闭线程池,关闭后不再接受进的请求
pool.join()  # 等待进程池所有进程都执行完毕后,开始执行下面语句
print("--end1--")
print("*" * 30)
pool = Pool(5)  # 设置线程池中最大线程数量为5
for xx in range(0, 7):
    # 阻塞运行
    pool.apply(test1, ("mark" + str(id),))
print("--start2--")
pool.close()  # 关闭线程池,关闭后不再接受进的请求
pool.join()  # 等待进程池所有进程都执行完毕后,开始执行下面语句
print("--end2--")

4、继承Process创建线程

from multiprocessing import Process
import os
import time
class MyProcess(Process):
    #重新init方法
    def __init__(self,interval):
        #下面一句是调用父类init方法,这一本尽量不要少,因为父类还有很多事情需要在init方法内处理
        Process.__init__(self)
        self.interval=interval

    #重写run方法
    def run(self):
        print("子进程运行中,pid=%d,父进程:%d" % (os.getpid(), os.getppid()))
        t_start=time.time()
        time.sleep(self.interval)
        t_end=time.time()
        print("子进程运行结束,耗时:%0.2f秒"%(t_end-t_start))

if __name__=="__main__":
    t_start=time.time()
    print("父进程开始执行")
    p=MyProcess(2)
    p.start()
    p.join()
    t_end=time.time()
    print("父进程运行结束,耗时:%0.2f秒" % (t_end - t_start))

二、进程间通信

1、Queue

from multiprocessing import Process, Queue
import os, time, random


# 写数据进程执行的代码
def proc_write(q, urls):
    for url in urls:
        q.put(url)
        print('Put {0} to queue by Process {1}'.format(url, os.getpid()))
        time.sleep(random.random())


def proc_read(q):
    while True:
        url = q.get(True)
        print('Get {0} from queue by Process {1}'.format(url, os.getpid()))


if __name__ == '__main__':
    # 父进程创建queue,并传给各个子进程。
    q = Queue()
    proc_writer1 = Process(target=proc_write, args=(q, ['url_1', 'url_2', 'url_3']))
    proc_writer2 = Process(target=proc_write, args=(q, ['url_4', 'url_5', 'url_6']))
    proc_writer3 = Process(target=proc_write, args=(q, ['url_7', 'url_8', 'url_9']))
    proc_reader1 = Process(target=proc_read, args=(q,))
    proc_reader2 = Process(target=proc_read, args=(q,))

    # 启动子进程proc_wirter,写入:
    proc_writer1.start()
    proc_writer2.start()
    proc_writer3.start()

    # 启动子进程proc_reader,读取:
    proc_reader1.start()
    proc_reader2.start()

    # 等待proc_writer结束:
    proc_writer1.join()
    proc_writer2.join()
    proc_writer3.join()

    # proc_reader进程是死循环,无法等待其结束,只能强行终止:
    proc_reader1.terminate()
    proc_reader2.terminate()

2、PIPE

在内存中开辟一个新的空间,对多个进程可见,在通信形式上形成一种约束

现有2个进程A和B,他们都在内存中开辟了空间,那么我们在内存中再开辟一个空间C,作用是连接这两个进程的。对于进程来说内存空间是可以共享的(任何一个进程都可以使用内存,内存当中的空间是用地址来标记的,我们通过查找某一个地址就能找到这个内存)A进程可以不断的向C空间输送东西,B进程可以不断的从C空间读取东西,这就是进程间的通信 

总结:

1.向管道发送数据使用send函数,从管道接收数据使用recv()函数 2.recv()函数为阻塞函数,当管道中数据为空的时候会阻塞 3.一次recv()只能接收一次send()的内容 4.send可以发送的数据类型比较多样,字符串,数字,列表等等

# 使用管道的方式进行线程间的通信

from multiprocessing import Process, Pipe
import os, time


def func(name, a):
    time.sleep(1)
    a.send('测试测试: ' + str(name))
    print('父进程的id为:{}'.format(os.getppid()), " ————— ", '子进程的id为:{}'.format(os.getpid()))


if __name__ == '__main__':
    child_conn, parent_conn = Pipe()
    job = []
    for i in range(5):
        p = Process(target=func, args=(i, child_conn))
        job.append(p)
        p.start()

    time.sleep(2)
    for i in range(5):
        data = parent_conn.recv()
        print(data)

    for i in job:
        i.join()

三、进程间同步与共享数据

1、共享内存方式

Value、Array是通过共享内存的方式共享数据

# -*- coding:utf-8 -*-

"""
共享内存:Value, Array
逻辑:2个进程,对同一份数据,一个做加法,一个做减法,各做10次
用法:
    1、创建共享变量 o = Value('i',1000) or o = Array('i', List)指定不同的类型
    2、启动子进程通过o.value or o[i]存取变量的值
    3、必要使用锁操作函数o.acquire()加锁 o.release()释放锁
参考:
    1、get_lock()返回共享变量使用的Rlock实例
    2、get_obj()返回共享变量数据类型
"""

from multiprocessing import Process, Value, Array
import time
import random

m = Array('i', [1, 2, 3, 4, 5])


# 对共享变量加法
def save_money(money):
    """
    针对money.Value进行10次加操作
    :param money:
    :return:
    """
    money.acquire()  # 申请锁
    for i in range(3):
        time.sleep(0.3)
        change = random.randint(1, 200)
        money.value += change
        print('\n', money.value, +change)
    money.release()  # 释放锁


# 对共享变量减法
def take_money(money):
    """
    针对money.Value进行10次减操作
    :param money:
    :return:
    """
    money.acquire()
    for i in range(3):
        time.sleep(0.3)
        change = random.randint(1, 150)
        money.value -= change
        print('\n', money.value, -change)
    money.release()


# 对共享变量加法
def save_money2():
    m.acquire()  # 申请锁
    m[2] = 8
    print([i for i in m])
    m.release()  # 释放锁


# 对共享变量减法
def take_money2():
    m.acquire()  # 申请锁
    print([i for i in m])
    m.release()  # 释放锁


if __name__ == '__main__':
    # 共享内存,可以多个进程存取,证书,变量名money,变量值1000
    money = Value('i', 1000)
    p_add = Process(target=save_money, args=(money,))
    p_sub = Process(target=take_money, args=(money,))
    p_add2 = Process(target=save_money, args=(money,))

    p_add.start()
    p_sub.start()
    p_add2.start()
    p_add.join()
    p_sub.join()
    p_add2.join()

    Process(target=save_money2).start()
    Process(target=take_money2).start()

2、共享进程方式

Manager是通过共享进程的方式共享数据。 Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。 Manager对象控制一个拥有list、dict、Lock、Condition、Event、Queue等对象的服务端进程,并且允许其他进程访问这些对象

from multiprocessing import Process, Manager


def fun1(dic, lis, index):
    dic[index] = 'a'
    dic['2'] = 'b'
    lis.append(index)  # [0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
    # print(l)


if __name__ == '__main__':
    with Manager() as manager:
        dic = manager.dict()  # 注意字典的声明方式,不能直接通过{}来定义
        l = manager.list(range(5))  # [0,1,2,3,4]

        process_list = []
        for i in range(10):
            p = Process(target=fun1, args=(dic, l, i))
            p.start()
            process_list.append(p)

        for res in process_list:
            res.join()
        print(dic)
        print(l)

四、分布集群的消息传递

举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?

原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。

我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:

# task_master.py

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')

请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。

然后,在另一台机器上启动任务进程(本机上启动也可以):

import time, sys
from multiprocessing.managers import BaseManager
from multiprocessing import Queue


# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass


# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n * n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 处理结束:
print('worker exit.')


评论(0 ) 点赞(3)


暂未登录,请登录之后发表评论。 QQ