Python协程编程

协程以及Python协程编程:挺靠谱

参考链接

协程 Python编程之----协程

基础

程序开发的一大矛盾是,你要用控制流去完成逻辑流。也就是说,你要用指令的执行来完成逻辑链条的前因后果。

在刚开始学程序的时候,往往都是从控制流等价于执行流的情况下学起,执行到哪,就意味着逻辑走到了哪。这样的程序结构清晰,可读性好。

但是问题是中间有些过程是不能立即得到结果的,程序为了等结果就会阻塞。这种情况多见于一些I/O操作,这种情况下,控制流和逻辑流就脱节了。 为了提升效率,我们可以使用异步的api,通过回调/通知函数来响应操作结果,同时接着执行下一轮的逻辑。

异步回调/通知的问题在于,它把原本统一的逻辑流拆开成了几个阶段,这样控制流和逻辑流就不等价了。为了保证逻辑数据的传递,需要自己来维护状态,阅读起来也比较头疼。状态的维护历来就是bug层出的地方,很容易掉入无效状态而死掉。同时,这种机制调试起来还特别麻烦,因为状态所能提供的信息不够,往往还得手动跟踪调用链,这是相当费精力的。

一条线程 在多个任务之间来回切换,切换这个动作是浪费时间的。对于cpu,操作系统来说,协程是不存在的,他们只管执行线程。他们不管你执行哪个任务,只管执行线程的指令。

协程和线程、进程的区别

协程是一种编译器级别的任务调度机制,它可以让你用逻辑流的顺序去写控制流,而且还不会导致操作系统级的线程阻塞。 发起异步请求、注册回调/通知器、保存状态,挂起控制流、收到回调/通知、恢复状态、恢复控制流的所有过程都能过一个yield来默默完成。

一句话说明什么是协程:协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

Python实现

yield实现协程操作

Key Point:yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import time

# 注意到consumer函数是一个generator(生成器):
# 任何包含yield关键字的函数都会自动成为生成器(generator)对象
# consumer通过yield拿到消息,处理,又通过yield把结果传回;
# yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。
# 当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
# 就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。
def consumer(name):
print("--->starting....")
while True:
new_baozi = yield # yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。
print("[%s] is eating baozi %s" % (name, new_baozi))
time.sleep(1)

def producer():
next(con1) # 启动生成器 在python3 中next()函数,的调用方法是,把生成器的对象以参数形式传入到next(params)。
next(con2)
n = 0
while n < 5:
n += 1
print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
con1.send(n) # 一旦生产了东西,通过con1.send(n)切换到consumer执行;将'send(value)'赋值给new_baozi,重新启动生成器往下走,再执行一次next(con1), 相当于函数又返回了一次值。
con2.send(n)
#整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
if __name__ == '__main__':
con1 = consumer("c1") # 创建一个生成器对象
con2 = consumer("c2") # 创建一个生成器对象
p = producer()

Output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
--->starting....
--->starting....
[producer] is making baozi 1
[c1] is eating baozi 1
[c2] is eating baozi 1
[producer] is making baozi 2
[c1] is eating baozi 2
[c2] is eating baozi 2
[producer] is making baozi 3
[c1] is eating baozi 3
[c2] is eating baozi 3
[producer] is making baozi 4
[c1] is eating baozi 4
[c2] is eating baozi 4
[producer] is making baozi 5
[c1] is eating baozi 5
[c2] is eating baozi 5

greenlet

greenlet 机制的主要思想是:生成器函数或者协程函数中的 yield 语句挂起函数的执行,直到稍后使用 next()send() 操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greentlet 是 python 中实现我们所谓的"Coroutine(协程)"的一个基础库。

使用 greenlet 就可以实现逻辑流的自主控制了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from greenlet import greenlet

def test1():
print(12)
gr2.switch()
print(34)
gr2.switch()

def test2():
print(56)
gr1.switch()
print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

Output

1
2
3
4
12
56
34
78

gevent

gevent支持的协程(基于greenlet的框架) Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。 gevent是第三方库,通过greenlet实现协程,其基本思想是: 当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import gevent
import time

def foo():
print("running in foo")
gevent.sleep(2)
print("switch to foo again")

def bar():
print("switch to bar")
gevent.sleep(5)
print("switch to bar again")

start=time.time()

gevent.joinall(
[gevent.spawn(foo),
gevent.spawn(bar)]
)

print(time.time()-start)

Output

1
2
3
4
5
running in foo
switch to bar
switch to foo again
switch to bar again
5.015146255493164

实际使用

由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成。当然,实际代码里,我们不会用gevent.sleep()去切换协程,而是在执行到IO操作时,gevent自动切换(自动,美滋滋),代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from gevent import monkey
monkey.patch_all()
import gevent
from urllib import request
import time

def f(url):
print('GET: %s' % url)
resp = request.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))

start=time.time()

gevent.joinall([
gevent.spawn(f, 'https://itk.org/'),
gevent.spawn(f, 'https://www.github.com/'),
gevent.spawn(f, 'https://zhihu.com/'),
])
print(time.time()-start) # 用了协程的时间

start=time.time()
for url in ['https://itk.org/', 'https://www.github.com/', 'https://zhihu.com/']:
f(url)
print(time.time()-start) # 没用协程的时间

Output

1
2
3
4
5
6
7
GET: https://itk.org/
GET: https://www.github.com/
GET: https://zhihu.com/
28793 bytes received from https://zhihu.com/.
82700 bytes received from https://www.github.com/.
12299 bytes received from https://itk.org/.
4.142592430114746

另一个示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from gevent import monkey
monkey.patch_all() # 把文件中的阻塞事件都检测到,并打包,也就是他认识文件中的打包方式
import time # time里面的sleep方法gevent模块就认识了
import gevent
def eat():
print('eating 1')
time.sleep(5)
print('eating 2')
return 'eat finished'
def play():
print('playing 1')
time.sleep(5)
print('playing 2')
return 'play finished'

g1 = gevent.spawn(eat) # 自动检测阻塞事件,遇见阻塞了就会进行切换。有些阻塞它不认识。
g2 = gevent.spawn(play)
# g1.join() # 阻塞直到g1结束
# g2.join() # 阻塞直到g2结束
# 或
gevent.joinall([g1, g2]) # 相当于完成列表中全部阻塞了
print(g1.value) # 获取一个协程的返回值
print(g2.value)

Output

1
2
3
4
5
6
eating 1
playing 1
eating 2
playing 2
eat finished
play finished

总结

优点

无需线程上下文切换的开销 无需原子操作锁定及同步的开销 方便切换控制流,简化编程模型 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点

无法利用多核资源:协程的本质是个单线程,它不能同时将单个CPU的多个核用上,协程需要和进程配合才能运行在多CPU上。当然我们日常所编写的绝大部分应用都没有这个必要,除非是CPU密集型应用。进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!