【模板】Python多进程文件处理

人生苦短,所以用了Python,但是Python慢啊,所以依旧苦短。好在可以开n倍加速早点阶数这无聊的等待时间。

在NLP任务中经常会遇到对大文本进行处理的任务,这些任务包括但不限于:

  • 分词
  • 词性标注
  • 词频统计(大名鼎鼎的WordCount)
  • 关键词提取
  • 大小写转换(当然这个bash命令更方便了,放在这里只是为了凑数:)

为了方便描述,就用task(line)笼统地表示处理这些任务的函数吧,line是每一行的句子。

在常规使用Pythn的时候,我们会打开一个文件然后一行一行地处理:

1
2
3
with open(file, encoding="utf8") as f:
for line in f:
task(line)

但是这个方式的处理速度是十分感人的,单进程的处理使得本身就已经很慢的Python只能在处理小文本的时候发挥优势,真正做到了一核有难,八核围观。

那么还有一种方法就是使用多核加速文件读取速度了。在C++、Java等语言中发挥多核优势可以选择用多线程或多进程,特别是多线程能共享内存,只要保证了线程安全就是一种加轻量化的加速方案。

但是Python一般会因为GIL(Global Interpreter Lock)导致多线程速度反而更慢,那么就只剩下多进程了。

经过一段时间的摸索核实践,我总结出一个使用多进程加速处理文件的模板,在实践中已经验证了其可行性。主要思想和Map-Reduce类似。

需要用到的库有这些:

1
2
3
import logging          # 非必须,如果不嫌麻烦可以用print代替
import multiprocessing # Python自带的多进程库
from itertools import islice # 便于一次性读取多行

准备好了库后,就可以给出主要的操作流程了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def multiprocess_task(file, workers:int, batch:int):
"""
参数:
file: 需要处理的文件名
workers: 进程数量
batch: 一次处理的行数
"""
results = []

x = 0 # 行数统计
fp = open(corpus, encoding="utf8")
pool = multiprocessing.Pool(workers) if workers > 1 else None

try:
while True:
lines = list(islice(fp, batch))
if not lines:
break

# 多核处理
if workers > 1:
# 切分数据块
minibatch = len(lines) // workers
datas = [lines[i*minibatch: (i+1)*minibatch] for i in range(workers)]

# 使用pool.starmap并行执行task,相当于Map
result = pool.starmap(task, datas)

# 合并处理结果,相当于Reduce
results = merge(results, result)

# 单核处理
else:
results = merge(results, task(lines))

x += len(lines)
logging.info(f"{x} lines read.")

except Exception as e:
# 异常需谨慎处理,不建议用Exception直接捕获
logging.info(e)
return None

finally:
if pool:
pool.close()
fp.close()

return results

当workers大于1时,上面的代码在pool.starmap阶段会将数据分发到不同的子进程中处理,返回的结果在merge中合并。

讲道理这种实验是要对比结果的,不过之前使用的时候没记录……反正快了不少就是了(手动狗头)