NSF Back-end Dev Engineer

python多任务


1.python进程

demo:

import multiprocessing


def func1(x):
    for i in range(1000000000):
        x = x+1
        x = x-1


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=6)
    for _ in range(6):
        pool.apply_async(func1, (1, ))
    pool.close()
    pool.join()

pool = multiprocessing.Pool(processes=6)建立进程池,限制进程最大数量,一般不要超过cpu核数,超出后多个进程使用同一核,进程间切换反而让效率降低。pool.apply_async(func1, (1, ))第一个参数是调用的函数名,第二个参数为函数参数。

2.python线程

from threading import Thread


def f(x):
    for i in range(x)
        print(i)
Thread(target=f, args=[100]).start()
Thread(target=f, args=[100]).start()

3.python进程+协程

import multiprocessing
import time
import gevent
from gevent import monkey

from pykafka import KafkaClient


def f(x):
    for i in range(x):
    	print(i)


def func1(x):
    monkey.patch_all()  # 给所有的耗时操作打上补丁
    gevent.joinall([gevent.spawn(f, x) for _ in range(20)])


start_time = time.time()
pool = multiprocessing.Pool(processes=5)
for _ in range(5):
    pool.apply_async(func1, (10000, ))
pool.close()
pool.join()
print(time.time() - start_time)

协程使用gevent库。

4.concurrent.futures

  • ThreadPoolExecutor:线程池,提供异步调用
  • ProcessPoolExecutor: 进程池,提供异步调用

基本线程示例:

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime


def my_func(param1, timeout=0.01):  # 多线程执行的函数
    print(f'开始:{param1}, 时间: {datetime.now()}\n', end='')
    time.sleep(timeout)
    return param1


def futures_test():
    with ThreadPoolExecutor(max_workers=2) as executor:  # 初始化线程池,指定2线程
        thread_mission_list = []  # 用来记录线程的任务对象
        for _index in range(3):
            run_thread = executor.submit(my_func, _index, 1)  # 多个参数直接传递即可
            thread_mission_list.append(run_thread)
        print('after submit')
        for mission in as_completed(thread_mission_list, timeout=1.5):  # 等待线程执行完毕,先完成的先显示
            # 这里会自动等待执行完毕
            print("结束:", mission.result(), "时间:", datetime.now())


if __name__ == '__main__':
    futures_test()

输出

开始:0, 时间: 2023-03-21 09:47:44.066264
开始:1, 时间: 2023-03-21 09:47:44.066264
after submit
开始:2, 时间: 2023-03-21 09:47:45.067482
结束: 0 时间: 2023-03-21 09:47:45.067686
结束: 1 时间: 2023-03-21 09:47:45.067686
TimeoutError: 1 (of 3) futures unfinished
Process finished with exit code 1

通过输出可以看到线程池线程数量限制和as_completed的超时限制都正常起作用了

使用回调函数示例:

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime


def my_func(param1, timeout=0.01):  # 多线程执行的函数
    print(f'开始:{param1}, 时间: {datetime.now()}\n', end='')
    time.sleep(timeout)
    return param1


def my_callback(run_thread):
    print("结束:", run_thread.result(), "时间:", datetime.now())


def futures_callback_test():
    with ThreadPoolExecutor(max_workers=2) as executor:  # 初始化线程池,指定2线程
        for _index in range(3):
            run_thread = executor.submit(my_func, _index, 1)  # 多个参数像这样直接传递即可
            run_thread.add_done_callback(my_callback)
        print('after submit')


if __name__ == '__main__':
    futures_callback_test()

输出:

开始:0, 时间: 2023-03-21 09:57:36.257870
开始:1, 时间: 2023-03-21 09:57:36.258870
after submit
结束: 1 时间: 2023-03-21 09:57:37.259444
开始:2, 时间: 2023-03-21 09:57:37.259444
结束: 0 时间: 2023-03-21 09:57:37.259640
结束: 2 时间: 2023-03-21 09:57:38.260587

下一篇 aarch64+php7+svm

Comments

Content