python並列化プログラミングの最小例

/ python

https://docs.python.org/ja/3/library/concurrent.futures.html

星の数ほどありそうなPythonによる並列化プログラミングのコード例を再生産しておく。

ThreadPoolExecutor

スレッドによる並列化を提供するThreadPoolExecutorによって並列化してもGlobal Interpreter LockによってCPU使用率は100%を超えないことはよく知られている。I/O処理とのオーバーラップが主たる恩恵のため、ブロッキングI/O処理が発生しない関数を並列化してもあまり意味はないだろう。

from time import sleep
from concurrent.futures import ThreadPoolExecutor, as_completed

filenames = {
    'heavy_data.xml': 3,
    'super_data.xml': 6,
    'hyper_data.xml': 2,
    'huge_data.xml': 4,
}

def initializer(string):
    print(f'{string} init thread!')

def worker(data):
    sleep(filenames[data]) # heavy task !
    return f'I understand {filename} components.'

with ThreadPoolExecutor(max_workers=4, initializer=initializer, initargs=('pool',)) as executor:
    futures = []
    for filename in filenames.keys():
        # with open(filename) as file: data = file.read()
        data = filename
        futures.append(executor.submit(worker, data))

    for future in as_completed(futures):
        print(future.result())

これを実行すると次のような結果が得られるだろう。

pool init thread!
pool init thread!
pool init thread!
pool init thread!
I understand hyper_data.xml components.
I understand heavy_data.xml components.
I understand huge_data.xml components.
I understand super_data.xml components.

このプログラム例は複数のファイル全体をメモリ上にロードして何かしらの重い処理をすることを想定している。

並列化を行わない場合ファイル読出しのためのブロッキングと重い処理を交互に繰り返す。この例だと重い処理(に見立てたsleep)だけで15秒かかり、なおかつこの例ではコメントアウトしているため0秒だが実際にはI/OブロッキングでCPUがほとんど動いていない時間が加わりそれ以上の時間がかかる。

一般的にI/O処理にはCPU負荷がかからないためこの裏で重い処理をさせられると効率が良くなり、I/Oによるブロック中に裏でCPU動かした分だけプログラムの終了時間を短縮することができる。

いま重い処理に見立てたsleepは本当の重い処理と違ってCPU使っていないのでこのプログラムは15秒よりも早く終了してしまうが、I/O抜きで直列実行15秒かかる処理をThreadPoolExecutorで並列化しても15秒以上はかかる。Global Interpreter LockによってCPU1個分しか使わないのであくまで短縮するのはI/O処理とオーバーラップした分だけ。

ThreadPoolExecutorによる並列化はこういう点を理解している玄人向けだという印象。マシン全体を重くしないので行儀が良い並列化。

ProcessPoolExecutor

Global Interpreter Lockを回避してCPU使用率100%以上使用にすることができる。使い方はほぼ変わらない。

from time import sleep
from concurrent.futures import ProcessPoolExecutor, as_completed

filenames = {
    'heavy_data.xml': 3,
    'super_data.xml': 6,
    'hyper_data.xml': 2,
    'huge_data.xml': 4,
}

def initializer(string):
    print(f'{string} init thread!')

def worker(filename):
    # with open(filename) as file: data = file.read()
    data = filename
    sleep(filenames[data]) # heavy task !
    return f'I understand {filename} components.'

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4, initializer=initializer, initargs=('pool',)) as executor:
        futures = []
        for filename in filenames.keys():
            futures.append(executor.submit(worker, filename))

        for future in as_completed(futures):
            print(future.result())

注意点は__name__チェックによってプログラム全体をモジュールとしてロード可能なようにしておくこと。これは通常のPythonプログラムでもよく見かけるがProcessPoolExecutorを用いるプログラムでは必須である。これを省略するとThreadPoolExecutorとは違い正しく動作しない(やってみるといい)。

面倒なので例のプログラムは変えていないが、CPU何個も使えるからといってProcessPoolExecutorでこの例を並列化するのは良い例とは言えない。なぜならプロセス間で受け渡しするデータ量と処理量の関係によってはむしろオーバーヘッドが大きくなるからだ。ドキュメントによるとプロセス間ではpickle化(=直列化)できるオブジェクトしかworkerに渡したりreturnで戻したりできないとある。

適当に組んでもオーバーヘッド込みでProcessPoolExecutorでGlobal Interpreter Lockをバイパスする方が早くなってしまうことが往々にしてあるのだが、ThreadPoolExecutorを使うかProcessPoolExecutorによってプログラムの方法を変えていくことが必要だ。Processベースで並列化する際には入出力データ量は小さく、処理量は大きくするプログラム構造を心がけるとよいだろう。dataに見立てたfilenameを渡していたのを、filenameを渡してプロセス内で読ませるように変更しているのはそのためだ。

おわりに

データの共有とかは知らん。例がやや微妙かもしれん