サブロウ丸

主にプログラミングと数学

【python】 multiprocessing.Queue を用いた並列化 (サンプルコードあり)

multiprocessing.Poolの功罪

multiprocessing.Poolはpythonにてプロセス並列処理を簡潔に書けるので便利です. しかし, 以下のようなシチュエーションでは, 並列化を行うと逆に遅くなることがあります.

data = xxx # ものすごく重たいオブジェクト (巨大なリストなど)

def func(data, i, j):
    """並列化したい関数: dataとi,jの3つの引数を受け取って, 何か処理をする関数"""
    return data[i] + data[j]

args = [
    (data, i, j) for i in range(10) for j in range(10)
]

with multiprocess.Pool as pool:
    pool.map(func, args)

multiprocess.Poolやmultiprocess.Processではプロセスの立ち上げの際に 関数への入力値が全てシリアライズ(pickle化)されます. なので, 何かデータ量の多いオブジェクトが関数の引数に含まれる場合は, そのシリアライズ処理がオーバーヘッドになり並列化の恩恵が得られないということが発生します.


上記の例では, argsのtupleオブジェクトが全てシリアライズされたのち, 各プロセスに渡されます. (逆に, シリアライズに時間がかからないような, 小さなオブジェクトであれば全く問題にはなりません)


プロセス並列をする上で, 上記例のdata変数を各プロセスに渡すためのシリアライズは必要不可避であるため, その回数を減らすことができれば, 効率を上げられます.

multiprocessing.Pool + multiprocessing.Queue による解決策

そこで multiprocessing.Queue による解決方法を紹介します.

流れとしては,

  1. 関数への可変な入力値, 上記例だと(i, j)の組みをQueueに格納.
  2. 子プロセスを一つずつ立ち上げる ( ここで, dataのpickle化処理が入るので, 立ち上げに時間がかかる )
  3. 立ち上がった子プロセスはQueueから変数(i, j)を受け取り, 処理
  4. 子プロセスが処理を終えると, 次の変数をQueueから受け取る


例えるならば, Poolだと "指示があるたびに, 現地に社員を派遣する"(毎回全ての変数をシリアライズ) というのを毎回行っていたのに対し, Process + Queue では "携帯を持たせた社員を現地に向かわせ, 社員は指示をこなしたら, 電話をかけて新しい指示を受け取る"(1回だけ思い変数のみシリアライズ+その他の軽い変数を毎回受け取り) といった感じでしょうか.

また, queueへ格納する, 関数への入力を生成するプロセスと queueから値を取り出して処理を行うプロセスを同時に動かすこともmultiprocessing.Queueを用いれば可能です.

1. 変数をQueueに格納

# queue for input
M = 10
q = multiprocessing.Queue()
for i in range(M):
    for j in range(M):
        q.put((i, j))

# queue for output ( result queue )
rq = multiprocessing.Queue()

2. 子プロセスの立ち上げ

# heavy object
data = [i for i in range(N)]

# worker factory
def create_worker(worker_index):
    return multiprocessing.Process(
            target=worker_queue,
            args=(worker_index, data, q, rq))

# create workers
processes = list()
worker_index = 1
for worker_index in range(1, num_worker):
    process = create_worker(worker_index)
    process.start()
    processes.append(process)

3. 立ち上がった子プロセスはQueueから変数(i, j)を受け取り, 処理 + 子プロセスが処理を終えると, 次の変数をQueueから受け取る

def worker_queue(worker_index, data, q, rq):
    """calculate i + j of (i, j) in queue q
    Parameters
    ----------
    worker_index : int
    data : list
    q : multiprocessing.Queue
        queue for get input
    rq : multiprocessing.Queue
        queue for store results
    """
    print(f'start worker_index {worker_index}')
    timeout = 1
    while True:
        try:
            i, j = q.get(timeout=timeout)  # 変数の受け取り
            _, _, result = work(data, i, j)  # 処理
            rq.put((i, j, result))  # 結果の格納
        except queue.Empty:  # queueが空になったら終了
            break
    return

簡単な実験

一番下にサンプルコードを載せています.

  • main_queue() が上記で言及している Queueを用いたwork()関数の並列化
  • main_pool() が Poolを用いたwork()関数の並列化実装 になります.

重たいオブジェクトとして, 長さNのリスト(data変数)を作成しています. 3プロセス並列で簡単なテストを実行したところ 僕の環境では, 実行時間の比較, ある程度Nが大きい(objectが重たい)と, Proecss + Queueの方が優位になりました. Queueによるデータの受け渡しにもオーバーヘッドがあるため, オブジェクがある程度大きくないと優位性がでないのは当然です.

Process + Queue Pool
N = int(1e6) 2.38 s 0.42 s
N = int(1e7) 4.44 s 3.36 s
N = int(1e8) 27.97 s 62.93 s

それぞれ, 下記のサンプルコードの実行時間.

サンプルコードのコアの部分の説明ですが, Process + Queue事項の場合, worker_queue関数が子プロセスで動く処理になります. q.get(timeout=timeout)でqueueからデータを取り出しますが, 指定したtimeoutでデータを取り出せない場合は queue.Empty エラーを返すので, それを受け取ったら終了します.

データを取り出せない場合はすでにqueueが空になっているケースがほとんどです.

def worker_queue(worker_index, data, q, rq):
    """calculate b ** 2

    Parameters
    ----------
    worker_index : int
    data : list
    q : multiprocessing.Queue
        queue for get input
    rq : multiprocessing.Queue
        aueue for store results
    """
    print(f'start worker_index {worker_index}')
    timeout = 1
    while True:
        try:
            i, j = q.get(timeout=timeout)
            _, _, result = work(data, i, j)
            rq.put((i, j, result))
        except queue.Empty:
            break
    return

サンプルコード

gist.github.com