0%

Python(十四)- 多任务:进程、线程、协程

实际开发中,经常会面临一些并发问题的方案和设计,通常会有性能优化和提升的需求。

多任务处理

多任务:指的是操作系统同时运行多个任务。 可分为并行和并发。
多任务编程的目的就是通过应用程序利用多个计算机核心达到多任务同时执行的目的,以此来提升程序执行效率。

并发:指能处理多个同时性活动的能力,并发事件之间不一定要同一时刻发生。
可以看成:一个处理器同时处理多个任务,每个任务放在不同时间片段,快速交替执行。

并行:指在同一时刻,有多个任务在多个处理器上同时执行。

多任务的处理方式通常为:多进程、多线程、进程池、线程池。

1.多进程

1.1.简介

进程:程序在一个数据集上的一次动态执行过程进程是一个动态的过程,占有CPU资源,有一定的生命周期。

程序:是一个静态的描述,不占有计算机资源。

进程的特征

  • 进程是操作系统资源分配的最小单位
  • 每个进程单独占有4G虚拟内存
  • 进程之间相互独立,运行不受影响

进程的创建流程

  1. 用户空间运行程序,发起进程创建申请
  2. 调用操作系统内核接口创建进程
  3. 分配计算机资源,确定进程状态
  4. 将新的进程提供给用户使用

1.2.进程状态:

  1. 三态
    • 就绪态:进程具备运行条件,等待处理器运行
    • 运行态:进程占有CPU,处于运行状态
    • 等待态:又称阻塞态,睡眠态,指进程暂时不具备运行的条件,需要阻塞等待(sleep accept…)
  2. 五态
    • 新建态:创建一个进程,获取资源,直接表现为运行一个程序,或者在程序中创建新的进程
    • 就绪态:同三态
    • 运行态:同三态
    • 等待态:同三态
    • 终止态:进程执行结束,资源回收过程

多个进程,如何占用CPU?

  • 同一内核同一时刻只能运行一个任务
  • 多个进程对内核资源进行争夺,操作系统决定哪个进程占有计算机核心
  • 占有计算机核心的进程,我们称为该进程占有CPU的时间片

1.3.python进程库

在Python中,有一个多进程库(multiprocessing)。

  1. 优点
  • 并行多个任务,提高运行效率
  • 空间独立,数据安全,创建方便
  1. 缺点
  • 进程创建销毁的过程中消耗较多的计算机资源

1.4.进程通信

进程间通信: 管道(Pipe)、消息队列(Queue)、共享内存、信号(signal)、信号量(Semaphore)、套接字(Socket)。

比较项 管道 消息队列 共享内存
开辟空间 内存 内存 内存
读写方式 双向/单向 先进先出 操作覆盖内存
效率 一般 一般
应用 多用于亲缘进程 方便灵活、广泛 较复杂
是否需要互斥机制 需要

1.5.进程池

multiprocessing 包是Python中的多进程管理包,可以利用 multiprocessing.Process 对象来创建进程,Process 对象拥有 is_alive()join([timeout])run()start()terminate()等方法。

进程池的使用有四种方式:apply_asyncapplymap_asyncmap

使用步骤

  • 实例进程池,传递进程池数量
  • 添加子进程到进程池中
  • 先关闭Pool,禁止进程池再接收任务
  • 等待进程池中进程完成
  • 关闭进程池

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os
import multiprocessing


def show(name):
print('name: ', name, ' --- 当前进程:', os.getpid())
return 'hello : pid-%d' % os.getpid()


if __name__ == '__main__':
# 实例子进程,和CPU数量一致
pro_pool = multiprocessing.Pool(multiprocessing.cpu_count())
result = [] # 存储结果
# 提交任务,获得返回进程对象对象, 可在此提交多个进程
for i in range(10):
name = 'worker-%d' % i
r = pro_pool.apply_async(func=show, args=(name,))
result.append(r)
# 获取执行结果,状态信息
for i in result:
i.wait() # 等待进程函数执行完毕
for i in result:
if i.ready(): # 进程函数是否已经启动了
if i.successful(): # 进程函数是否执行成功
print(i.get()) # 进程函数返回值

# 关闭线程池
pro_pool.close()
# 等待子线程完成
pro_pool.join()

参考:python进程池

2.多线程

2.1.简介

Python 有一个名为 Global Interpreter Lock(GIL)的全局解释器锁。
GIL 确保每次只能执行一个 “线程”。一个线程获取 GIL 执行相关操作,然后将 GIL 传递到下一个线程。
虽然看起来程序被多线程并行执行,但它们实际上只是轮流使用相同的 CPU 核心。
所有这些 GIL 传递都增加了执行的开销。这意味着多线程并不能让程序运行的更快。

线程:操作系统中能够进行运算调度的最小单位。包含在进程之中,是进程中的实际运作单位。
线程又称为轻量级的进程,在创建和删除时消耗的计算机资源小。

在Python中,有一个多线程库(Threading),但是由于 GIL 锁的存在,用多线程来加速代码的效果并不是那么的好。

线程的通信:全局变量(需要加锁)、消息队列(Queue)、事件(event)、线程锁(Lock)、条件变量(condition)。

2.2.线程池

一个线程的运行时间可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。
相比于进程,线程的创建销毁消耗比较少,而在多线程的情况下,依旧会消耗很多时间,降低效率。

简介

  • 线程池的出现大大降低了系统频繁创建销毁线程的开销,实现更好的性能和系统稳定性。
  • 多个线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务。

原理

  • 线程池首先会维护一个任务队列
  • 生成工作使用的线程
  • 线程分别从队列中取出任务并执行,一个任务执行完成需要告诉主线程完成一个任务
  • 再从任务队列中取出任务,直到所有任务为空,退出线程

使用
线程池的基类是 concurrent.futures 模块中的 ExecutorExecutor 提供了两个子类,即 ThreadPoolExecutorProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。
此外,还有第三方的模块 threadpool-executor-shrink-ablethreadpool

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import time
import queue
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed


def fetch(name, q):
url = q.get()
resp = requests.get(url)
# print(f'{name}{len(resp.text)}')
return resp.text


def main():
q = queue.Queue()
urls = ['http://www.163.com'] * 10
# 向queue中增加url
for url in urls:
q.put(url)

# 使用线程池
with ThreadPoolExecutor(5) as pool:
tasks = [pool.submit(fetch, f'worker{i} ', q) for i in range(len(urls))]

for i in as_completed(tasks):
print(i.result())
print(i.done)


if __name__ == '__main__':
start = time.monotonic()
main()
print(f'使用时间:{time.monotonic() - start:.2f}s')

参考:Python 线程池原理及实现

3.协程

3.1.简介

线程的调度(线程上下文切换)是由操作系统决定的,当一个线程启动后,什么时候占用CPU、什么时候让出CPU,程序员都无法干涉。
与此对比,协程(又称微线程)是可以让程序员自己调度所写的程序,在切换去执行其他代码块的时候(通常是遇到IO操作时切换),可以减少一些不必要的上下文切换。

3.2.优缺点

优点

  • 执行效率高:由程序自身控制子程序切换不是线程切换,没有线程切换的开销。
  • 不需要多线程的锁机制:只有一个线程,也不存在共享资源争夺,只需要判断状态。
  • 适合用于高并发处理。

缺点

  • 协程是单线程执行,无法利用多核资源。(解决方案:多进程+协程)
  • I/O阻塞时,会挂起整个程序
  • Python对协程的支持和兼容有限

3.3.发展历程

早期的 生成器(generator)中的 yield 可以一定程度上实现协程。
Python3.4中内置了 asyncio 标准库,引入@asyncio.coroutine装饰器和yield from语法。
在Python3.5上又提供了 async/await 语法,通过 async/await 语法进行声明,是编写 asyncio 应用的推荐方式。
python协程库还有eventletgevent

因此,python中协程的使用,可总结为:

  • yield / yield from
  • eventlet 第三方库
  • gevent 第三方库
  • asyncio 标准库

3.4.协程的使用

  1. yield 表达式的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def test():
print("generator start")
n = 1
while n < 10:
yield_value = yield n
print("yield_value = %d" % yield_value)
n += 1


# 创建generator对象
generator = test()
print("---------------")

# 启动generator
next_result = generator.__next__()
print("next_result = %d" % next_result)

print("---------------")

# 给 yield表达式 传值
send_result = generator.send(666)
print("send_result = %d" % send_result)

可用 inspect 模块,inspect.getgeneratorstate() 方法查看协程状态。
方法说明:
__next__()方法: 作用是启动或者恢复generator的执行,相当于send(None)
send(value)方法:作用是发送值给yield表达式。

  1. yield from 的使用
1
2
3
4
def func():
yield from range(10)

print(list(func())) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  1. @asyncio.coroutine 装饰器的使用 (已弃用)
1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio


@asyncio.coroutine # 声明一个协程
def test_yield_from(n):
# ...
yield from asyncio.sleep(1)


# 是否是协程函数
print(asyncio.iscoroutinefunction(test_yield_from))
# 是否是协程对象
print(asyncio.iscoroutine(test_yield_from(3)))
  1. asyncio 库的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
print(f"started at {time.strftime('%X')}")

await say_after(1, 'hello')
await say_after(2, 'world')

print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# 预期的输出(python3.7+):
# started at 23:32:18
# hello
# world
# finished at 23:32:21
  1. gevent 库的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
from gevent import monkey; monkey.patch_socket()
import gevent

def f(n):
for i in range(n):
print(gevent.getcurrent(), i)

g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()
  1. eventlet 库的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import eventlet

def test1(x):
print(f'Greenthread test1 Num:{x}')
eventlet.greenthread.sleep(1)
return x

def test2(y):
print(f'Greenthread test2 Num:{y}')
return y

x = eventlet.spawn(test1, 1)
y = eventlet.spawn(test2, 2)
x.wait()

# Greenthread test2 Num:2
# Greenthread test1 Num:1
# 1

spawn函数产生的协程可以通过wait函数来执行并获取它的返回结果

参考:
官网:协程与任务
协程与异步IO


------------- 本文已结束 感谢您的阅读! -------------
Donate comment here.

本文标题:Python(十四)- 多任务:进程、线程、协程

文章作者:Jusheng Yao

发布时间:2020年05月21日 - 23:03

最后更新:2020年06月21日 - 20:03

原始链接:http://yaojusheng.github.io/archives/46aa5d73.html

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

欢迎关注我的其它发布渠道