3种方式实现python多线程并发处理

最优线程数

  • Ncpu=CPU的数量

  • Ucpu=目标CPU使用率

  • W/C=等待时间与计算时间的比率
    为保持处理器达到期望的使用率,最优的线程池的大小等于
    $$Nthreads=NcpuUcpu(1+W/C)$$

  • cpu密集型任务,即$W<<C$,则$W/C≈0$,则$Nthreads=Ncpu*Ucpu$
    如果希望CPU利用率为100%,则$Nthreads=Ncpu$

  • IO密集型任务,即系统大部分时间在跟I/O交互,而这个时间线程不会占用CPU来处理,即在这个时间范围内,可以由其他线程来使用CPU,因而可以多配置一些线程。

  • 混合型任务,二者都占有一定的时间

线程池

对于任务数量不断增加的程序,每有一个任务就生成一个线程,最终会导致线程数量的失控。对于任务数量不端增加的程序,固定线程数量的线程池是必要的。

方法一:使用threadpool模块

threadpool是一个比较老的模块了,支持py2 和 py3 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import threadpool
import time

def sayhello (a):
print("hello: "+a)
time.sleep(2)

def main():
global result
seed=["a","b","c"]
start=time.time()
task_pool=threadpool.ThreadPool(5)
requests=threadpool.makeRequests(sayhello,seed)
for req in requests:
task_pool.putRequest(req)
task_pool.wait()
end=time.time()
time_m = end-start
print("time: "+str(time_m))
start1=time.time()
for each in seed:
sayhello(each)
end1=time.time()
print("time1: "+str(end1-start1))

if __name__ == '__main__':
main(

方法二:使用concurrent.futures模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from concurrent.futures import ThreadPoolExecutor
import time

import time
from concurrent.futures import ThreadPoolExecutor, wait, as_completed

ll = []
def sayhello(a):
print("hello: "+a)
ll.append(a)
time.sleep(0.8)

def main():
seed=["a","b","c","e","f","g","h"]
start1=time.time()
for each in seed:
sayhello(each)
end1=time.time()
print("time1: "+str(end1-start1))
start2=time.time()
with ThreadPoolExecutor(2) as executor:
for each in seed:
executor.submit(sayhello,each)
end2=time.time()
print("time2: "+str(end2-start2))

def main2():
seed = ["a", "b", "c", "e", "f", "g", "h"]
executor = ThreadPoolExecutor(max_workers=10)
f_list = []
for each in seed:
future = executor.submit(sayhello, each)
f_list.append(future)
wait(f_list)
print(ll)
print('主线程结束')


def main3():
seed = ["a", "b", "c", "e", "f", "g", "h"]
with ThreadPoolExecutor(max_workers=2) as executor:
f_list = []
for each in seed:
future = executor.submit(sayhello, each)
f_list.append(future)
wait(f_list,return_when='ALL_COMPLETED')
print(ll)
print('主线程结束')

if __name__ == '__main__':
main3()

方法三:使用vthread模块

参考:https://pypi.org/project/vthread/

demo1

1
2
3
4
5
6
7
8
9
import vthread

@vthread.pool(6)
def some(a,b,c):
import time;time.sleep(1)
print(a+b+c)

for i in range(10):
some(i,i,i)

demo2:分组线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import vthread
pool_1 = vthread.pool(5,gqueue=1) # open a threadpool with 5 threads named 1
pool_2 = vthread.pool(2,gqueue=2) # open a threadpool with 2 threads named 2

@pool_1
def foolfunc1(num):
time.sleep(1)
print(f"foolstring1, test3 foolnumb1:{num}")

@pool_2
def foolfunc2(num):
time.sleep(1)
print(f"foolstring2, test3 foolnumb2:{num}")

@pool_2
def foolfunc3(num):
time.sleep(1)
print(f"foolstring3, test3 foolnumb3:{num}")

for i in range(10): foolfunc1(i)
for i in range(4): foolfunc2(i)
for i in range(2): foolfunc3(i)