Python协程系列

1、概念

协程,又称微线程,纤程,英文名Coroutine。协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似像多线程,然而协程只有一个线程执行。

2、优势

执行效率极高,因为子程序切换(函数)不是线程切换,由程序自身控制,没有切换线程的开销。所以与多线程相比,线程的数量越多,协程性能的优势越明显。不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在控制共享资源时也不需要加锁,因此执行效率高很多。

说明:协程可以处理IO密集型程序的效率问题,但是处理CPU密集型不是它的长处,如要充分发挥CPU利用率可以结合多进程+协程。

一、创建协程函数(async)

async def test1():
    print('1')
    print('2')


async def test2():
    print('3')
    print('4')

a = test1()
b = test2()

a.send(None)
b.send(None)

程序先执行了test1协程函数,当test1执行完时,报了StopIteration异常,这是协程函数执行完返回的一个异常,我们可以用try except捕捉,用来判断协程函数是否执行完成。

# -*- coding:utf-8 -*-

async def test1():
    print('1')
    print('2')


async def test2():
    print('3')
    print('4')


a = test1()
b = test2()

try:
    a.send(None)
except StopIteration as e:
    print(e.value)
    pass

try:
    b.send(None)
except StopIteration as e:
    print(e.value)
    pass

二、交叉执行协程函数(await)

通过以上例子,我们发现定义协程函数可以使用async关键词,执行函数可以使用send方法,那么如何实现在两个协程函数间切换执行呢?这里需要使用await关键词,修改一下代码:

# -*- coding:utf-8 -*-
import asyncio


async def test1():
    print('1')
    await asyncio.sleep(1)  # asyncio.sleep(1)返回的也是一个协程对象
    print('2')


async def test2():
    print('3')
    print('4')


a = test1()
b = test2()

try:
    a.send(None)
except StopIteration as e:
    # print(e.value)
    pass

try:
    b.send(None)
except StopIteration as e:
    # print(e.value)
    pass

说明:程序先执行test1协程函数,在执行到await时,test1函数停止了执行(阻塞);接着开始执行test2协程函数,直到test2执行完毕。从结果中,我们可以看到,直到程序运行完毕,test1函数也没有执行完(没有执行print(“2”)),那么如何使test1函数执行完毕呢?可以使用asyncio自带的方法循环执行协程函数。

2.1、await与阻塞

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行,协程的目的也是让一些耗时的操作异步化。

注意点:await后面跟的必须是一个Awaitable对象,或者实现了相应协议的对象,查看Awaitable抽象类的代码,表明了只要一个类实现了await方法,那么通过它构造出来的实例就是一个Awaitable,并且Coroutine类也继承了Awaitable。

2.2、自动循环执行协程函数

通过前面介绍我们知道执行协程函数需要使用send方法,但一旦协程函数执行过程中切换到其他函数了,那么这个函数就不在被继续运行了,并且使用sned方法不是很高效。那么如何在执行整个程序过程中,自动得执行所有的协程函数呢,就如同多线程、多进程那样,隐式得执行而不是显示的通过send方法去执行函数。

2.2.1 事件循环方法

前面提到的问题就需要用到事件循环方法去解决,即asyncio.get_event_loop方法,修改以上代码如下:

# -*- coding:utf-8 -*-
import asyncio


async def test1():
    print('1')
    await test2()  # asyncio.sleep(1)返回的也是一个协程对象
    print('2')


async def test2():
    print('3')
    print('4')


loop = asyncio.get_event_loop()
loop.run_until_complete(test1())

说明:asyncio.get_event_loop方法可以创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。

2.2.2 task任务

由于协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果。我们也可以手动将协程对象定义成task,修改以上代码如下:

# -*- coding:utf-8 -*-
import asyncio


async def test1():
    print('1')
    await test2()  # asyncio.sleep(1)返回的也是一个协程对象
    print('2')


async def test2():
    print('3')
    print('4')


loop = asyncio.get_event_loop()
task = loop.create_task(test1())
loop.run_until_complete(task)

2.2.3 获取task结果

result方法

当协程函数运行结束后,我们需要得到其返回值,第一种方式就是等到task状态为finish时,调用task的result方法获取返回值

# -*- coding:utf-8 -*-
import asyncio


async def test1():
    print('1')
    await test2()  # asyncio.sleep(1)返回的也是一个协程对象
    print('2')


async def test2():
    print('3')
    print('4')


loop = asyncio.get_event_loop()
task = loop.create_task(test1())
loop.run_until_complete(task)
print(task.result())

回调函数

获取返回值的第二种方法是可以通过绑定回调函数,在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值。

# -*- coding:utf-8 -*-
import asyncio


async def test1():
    print('1')
    await test2()  # asyncio.sleep(1)返回的也是一个协程对象
    print('2')
    return 'STOP TEST1'


async def test2():
    print('3')
    print('4')


def callback(future):
    print('CallBack:', future.result())  # 通过future对象的result方法可以获取协程函数的返回值


loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())  # 创建task,test1()是一个协程对象
task.add_done_callback(callback)  # 绑定回调函数
loop.run_until_complete(task)

如果回调函数需要接受多个参数,可以通过偏函数导入,修改代码如下:

# -*- coding:utf-8 -*-
import asyncio
import functools

async def test1():
    print('1')
    await test2()  # asyncio.sleep(1)返回的也是一个协程对象
    print('2')
    return 'STOP TEST1'


async def test2():
    print('3')
    print('4')


def callback(param1, param2, future):
    print(param1, param2)
    print('CallBack:', future.result())  # 通过future对象的result方法可以获取协程函数的返回值


loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())  # 创建task,test1()是一个协程对象
task.add_done_callback(functools.partial(callback, 'PARAM1', 'PARAM2'))  # 绑定回调函数
loop.run_until_complete(task)

2.3、协程停止

future对象有几个状态:Pending、Running、Done、Cancelled。创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消,可以使用asyncio.Task获取事件循环的task。

import asyncio


async def test1():
    print('1')
    await asyncio.sleep(3)
    print('2')
    return 'TEST STOP'

tasks = [
    asyncio.ensure_future(test1()),
    asyncio.ensure_future(test1()),
    asyncio.ensure_future(test1()),
]

loop = asyncio.get_event_loop()

try:
    loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
    for task in  asyncio.Task.all_tasks():
        task.cancel()
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

event_loop事件循环:程序开启一个无限的循环,当把一些函数注册到事件循环上时,满足事件发生条件即调用相应的函数。 coroutine协程对象:指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象,协程对象需要注册到事件循环,由事件循环调用。 task任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。 future:代表将来执行或没有执行的任务的结果,它和task上没有本质的区别 async/await关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

2.4 协程并发

并发通常指有多个任务需要同时进行,并行则是同一时刻有多个任务执行。用多线程、多进程、协程来说,协程实现并发,多线程与多进程实现并行。

import asyncio


async def test1(name):
    print('{} : -- 1'.format(name))
    await asyncio.sleep(3)
    print('{} : -- 2'.format(name))
    return 'TEST STOP'

tasks = [
    asyncio.ensure_future(test1('a')),
    asyncio.ensure_future(test1('b')),
    asyncio.ensure_future(test1('c')),
]

loop = asyncio.get_event_loop()


loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
    print("task result is ", task.result())

说明:代码先是定义了三个协程对象,然后通过asyncio.ensure_future方法创建了三个task,并且将所有的task加入到了task列表,最终使用loop.run_until_complete将task列表添加到事件循环中。


评论(0 ) 点赞(4)


暂未登录,请登录之后发表评论。 QQ