1.python进程
demo:
import multiprocessing
def func1(x):
for i in range(1000000000):
x = x+1
x = x-1
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=6)
for _ in range(6):
pool.apply_async(func1, (1, ))
pool.close()
pool.join()
pool = multiprocessing.Pool(processes=6)建立进程池,限制进程最大数量,一般不要超过cpu核数,超出后多个进程使用同一核,进程间切换反而让效率降低。pool.apply_async(func1, (1, ))第一个参数是调用的函数名,第二个参数为函数参数。
2.python线程
from threading import Thread
def f(x):
for i in range(x)
print(i)
Thread(target=f, args=[100]).start()
Thread(target=f, args=[100]).start()
3.python进程+协程
import multiprocessing
import time
import gevent
from gevent import monkey
from pykafka import KafkaClient
def f(x):
for i in range(x):
print(i)
def func1(x):
monkey.patch_all() # 给所有的耗时操作打上补丁
gevent.joinall([gevent.spawn(f, x) for _ in range(20)])
start_time = time.time()
pool = multiprocessing.Pool(processes=5)
for _ in range(5):
pool.apply_async(func1, (10000, ))
pool.close()
pool.join()
print(time.time() - start_time)
协程使用gevent库。
4.concurrent.futures
- ThreadPoolExecutor:线程池,提供异步调用
- ProcessPoolExecutor: 进程池,提供异步调用
基本线程示例:
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
def my_func(param1, timeout=0.01): # 多线程执行的函数
print(f'开始:{param1}, 时间: {datetime.now()}\n', end='')
time.sleep(timeout)
return param1
def futures_test():
with ThreadPoolExecutor(max_workers=2) as executor: # 初始化线程池,指定2线程
thread_mission_list = [] # 用来记录线程的任务对象
for _index in range(3):
run_thread = executor.submit(my_func, _index, 1) # 多个参数直接传递即可
thread_mission_list.append(run_thread)
print('after submit')
for mission in as_completed(thread_mission_list, timeout=1.5): # 等待线程执行完毕,先完成的先显示
# 这里会自动等待执行完毕
print("结束:", mission.result(), "时间:", datetime.now())
if __name__ == '__main__':
futures_test()
输出
开始:0, 时间: 2023-03-21 09:47:44.066264
开始:1, 时间: 2023-03-21 09:47:44.066264
after submit
开始:2, 时间: 2023-03-21 09:47:45.067482
结束: 0 时间: 2023-03-21 09:47:45.067686
结束: 1 时间: 2023-03-21 09:47:45.067686
TimeoutError: 1 (of 3) futures unfinished
Process finished with exit code 1
通过输出可以看到线程池线程数量限制和as_completed
的超时限制都正常起作用了
使用回调函数示例:
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
def my_func(param1, timeout=0.01): # 多线程执行的函数
print(f'开始:{param1}, 时间: {datetime.now()}\n', end='')
time.sleep(timeout)
return param1
def my_callback(run_thread):
print("结束:", run_thread.result(), "时间:", datetime.now())
def futures_callback_test():
with ThreadPoolExecutor(max_workers=2) as executor: # 初始化线程池,指定2线程
for _index in range(3):
run_thread = executor.submit(my_func, _index, 1) # 多个参数像这样直接传递即可
run_thread.add_done_callback(my_callback)
print('after submit')
if __name__ == '__main__':
futures_callback_test()
输出:
开始:0, 时间: 2023-03-21 09:57:36.257870
开始:1, 时间: 2023-03-21 09:57:36.258870
after submit
结束: 1 时间: 2023-03-21 09:57:37.259444
开始:2, 时间: 2023-03-21 09:57:37.259444
结束: 0 时间: 2023-03-21 09:57:37.259640
结束: 2 时间: 2023-03-21 09:57:38.260587