サブロウ丸

主にプログラミングと数学

【python】 multiprocessing.Queue を用いた並列化 (サンプルコードあり)

multiprocessing.Poolの功罪

multiprocessing.Poolはpythonにおけるプロセス並列処理を簡潔に書ける点で非常に使い勝手が良いです. しかし, 以下のようなシチュエーションでは, 並列化を行わない方が高速になり得ます.

data = xxx # ものすごく重たいオブジェクト

def func(data, i, j):
    return data[i] + data[j]

args = [
    (data, i, j) for i in range(10) for j in range(10)
]

with multiprocess.Pool as pool:
    pool.map(func, args)

multiprocess.Pool然り, multiprocess.Process然り, プロセス立ち上げの際には関数から, 関数への入力値から全てシリアライズ(pickle化)されます. ですので, 何か重たい変数(オブジェクト)が関数の引数に含まれる場合は, そのシリアライズ処理がオーバーヘッドになり, 並列化の恩恵が全くないということが発生します.


上記の例では, argsの各tupleオブジェクトが全てシリアライズされたのち, 各プロセスに渡されます. (逆に, シリアライズに時間がかからないような, 小さなオブジェクトであれば全く問題にはなりません)


プロセス並列をする上で, 上記例のdata変数を各プロセスに渡すためのシリアライズは必要不可避であるため,シリアライズする回数を減らすことができれば, 効率を上げられそうです.

解決策1

その部分は, 自分で書けばある程度解決できます. 親プロセスでdataをシリアライズしておき, 子プロセスの中でそれを読み込むようにすれば良いですが 複数子プロセスを立ち上げる場合は, 複数のプロセスが同じファイルに同時にアクセスしてしまわないように工夫をする必要があります.

multiprocessing.Pool + multiprocessing.Queue による解決策

そこで multiprocessing.Queue による解決方法を紹介します.

流れとしては,

  1. 関数への可変な入力値, 上記例だと(i, j)をQueueに格納.
  2. 子プロセスを一つずつ立ち上げる ( ここで, dataのpickle化処理が入るので, 立ち上げに時間がかかる )
  3. 立ち上がった子プロセスはQueueから変数(i, j)を受け取り, 処理
  4. 子プロセスが処理が終わると, 次の変数をQueueから受け取る


例えるならば, Poolだと "指示があるたびに, 現地に社員を派遣する" というのを毎回行っていたのに対し, Process + Queue では "携帯を持たせた社員を現地に向かわせ, 社員は指示をこなしたら, 電話をかけて新しい指示を受け取る" といった感じでしょうか.


となります. これにより, dataのpickle化は各子プロセスに対し, 1回のみ行われるので, 上記のPoolを用いる場合より効率的です. さらに, 子プロセスが立ち上がったとは, 親プロセスも処理に参加することができます.

また, queueへ格納する, 関数への入力を生成するプロセスと queueから値を取り出して処理を行うプロセスを同時に動かすこともmultiprocessing.Queueを用いれば可能です.

簡単な実験

一番下にサンプルコードを載せています.

  • main_queue() が上記で言及している Queueを用いたwork()関数の並列化
  • main_pool() が Poolを用いたwork()関数の並列化実装 になります.

重たいオブジェクトとして, 長さNのリスト(data変数)を作成しています. 3プロセス並列で簡単なテストを実行したところ 僕の環境では, 実行時間の比較, ある程度Nが大きい(objectが重たい)と, Proecss + Queueの方が優位になりました. Queueによるデータの受け渡しにもオーバーヘッドがあるため, オブジェクがある程度大きくないと優位性がでないようです.

Process + Queue Pool
N = int(1e6) 2.38 s 0.42 s
N = int(1e7) 4.44 s 3.36 s
N = int(1e8) 27.97 s 62.93 s

それぞれ, 下記のサンプルコードの実行時間.

サンプルコードのコアの部分の説明ですが, Process + Queue事項の場合, worker_queue関数が子プロセスで動く処理になります. q.get(timeout=timeout)でqueueからデータを取り出しますが, 指定したtimeoutでデータを取り出せない場合は queue.Empty エラーを返すので, それを受け取ったら終了します.

データを取り出せない場合はすでにqueueが空になっているケースがほとんどです.

def worker_queue(worker_index, data, q, rq):
    """calculate b ** 2

    Parameters
    ----------
    worker_index : int
    data : list
    q : multiprocessing.Queue
        queue for get input
    rq : multiprocessing.Queue
        aueue for store results
    """
    print(f'start worker_index {worker_index}')
    timeout = 1
    while True:
        try:
            i, j = q.get(timeout=timeout)
            _, _, result = work(data, i, j)
            rq.put((i, j, result))
        except queue.Empty:
            break
    return

サンプルコード

gist.github.com