須通り
Sudo Masaaki official site
For the reinstatement of
population ecology.

離れ離れのプロセスを繋ぐ future は逝ってしまったね

ホーム | 統計 Top | R の並列化のためのモダン環境:future パッケージ

前回の記事furrr というパッケージの future_map 関数を用いて、マルチコアを用いた並列計算を簡単に行う方法を紹介した。この furrr = future + purrr であり、R でクラスタの管理を行う future パッケージと、以前に紹介した purrr を合体させ、map() 系の関数を簡単にマルチスレッド化できるのが売りである。しかし future 自体の設計概念については少し複雑なので、以下、記事を分けて説明することにした。

目次

future パッケージの基本

筆者自身もまだ十分理解できているとは言い難いが、公式サイト future: Unified Parallel and Distributed Processing in R for Everyone および開発者 Henrik Bengtsson による vignettes の内容を、かいつまんで日本語化しておく(訳文ではなく勝手な解釈であることに注意)。なお、Bengtsson 本人がパッケージ紹介論文のプレプリントを公開しており、英語が分かる人はそちらを読むのが最もストレートに、パッケージの設計理念を理解できる。

パッケージの説明

まず、公式にあるパッケージそのものの説明文(version 1.21.0)を訳しておく。

このパッケージの目的は、R のエクスプレッションを「future」を経由して順次、あるいは並列して処理するための、軽量かつ統一された Future API を提供することである。最も単純な並列化の方法は plan(multiprocess) というコマンドを打った後、x %<-% { expression } という文法で、処理を実行することである。本パッケージでは sequential, multicore, multisession, cluster という種類の future が実装されている。これらを用いて R のエクスプレッションをローカルマシンで評価することもできるし、ローカルないしリモートの複数台のマシンを混ぜた、分散処理システム上で評価することもできる。本パッケージを拡張する形で、future を処理するための追加のバックエンド、たとえばスケジューラなどを経由するものも開発されている。統一された API のおかげで、1 台のローカルマシン上の順次処理から、マシンクラスタ上での並列処理まで、コードを書き換えずに対処可能である。もう一つの強みが、グローバル環境上の変数および利用可能な関数が、自動的に特定され必要に応じてクラスタへ送られることであり、既にあるコードを future で使うための調整方法が分かりやすいものとなっている。

Vignette: A Comprehensive Overview

次に A Future for R: A Comprehensive Overview である。こちらは長いので、重要そうな部分だけをメモっておく。

  1. そもそも future って何さ。「将来のある時点で利用可能になる可能性のある値の抽象化 In programming, a future is an abstraction for a value that may be available at some point in the future. 」であると書かれている。うん、分からん。
  2. 並列プログラミングでは future パターンというものは、割と一般的な概念らしい。つまり何らかの大きな(並列化したい)処理をパイプラインの中に位置づけるとき、単なるシーケンシャルなプログラミングであれば、前の関数の処理結果を入力に受け取った関数が処理を行い、出力値が出てきた段階で即座に次の関数に受け渡す、といった感じで粛々と処理すればいい。並列化が入ると、パイプラインの途中で複数のノードに分散してタスクが実行されるようになる。また多くのパイプラインでは、最後にレポートを返すような部分では直列に戻る。
  3. つまり途中だけが並列化された処理には、複数のノードから結果を受け取り、全ての結果が取得されたことを確認してから戻り値を生成し、次の式へ受け渡す過程が必要なはずだ。これを実現するのが future デザインパターンである。x %<-% { expresion } の左辺(これを暗示的な future という)や、x <- future({ expression }) 関数の戻り値 x(明示的な future という)は、いずれも future と呼ばれるオブジェクトである。
  4. Future は unresolved 未解決と resolved 解決済みの 2 状態を持つ。明示的であれ暗示的であれ、代入した直後に future オブジェクトこと x 自体が生成されるが、この x の状態は最初は unresolved である。Expression 部分の評価は即座に eagerly (= instantaneously) 開始される。Future が unresolved である間、親プロセスはブロックされており、パイプラインは次の段階へ移らずに待機する。すなわち、クラスタの全てのノードの計算が完了して結果が取得された段階で、future の状態が resolved になってブロックが解除される。
  5. なお、必ずしも cluster を作らなくても non-parallel に future を使うことが可能である。これが同期評価(synchronous)で、plan(sequential)、あるいは類似の plan(transparent) として指定する。この場合、単に処理待ちの状態が(feature は既に作られているが)unresolved になるだけである。特に利点はないようにも見えるが、最初にローカルマシン上で sequential な future のコードを書いてから大規模な並列環境へ持っていく場合に、コードを修正&再検証する手間が省けて便利。なお、並列化されている場合を asynchronous といい、典型的には plan(multisession) でクラスタを作る。
  6. 上の説明を正確にすると、Future が unresolved である間、「値を取り出そうと試みると」親プロセスはブロックされる。暗示的に future を作成&評価すると、自動的にこの条件を満たすためブロックされる。一方明示的な future では、能動的に value(x) などで値を取り出そうとした時点でブロックされる。
  7. ちなみに同一マシン上のマルチスレッド化手法として plan(multicore) というものもある。(read only な)メモリが親 R プロセスと共有されるので、立ち上げ時にグローバル環境を個々のバックグラウンドプロセスへコピーする必要がない。オーバーヘッドが発生しない点で有利だが、メモリ共有という言葉を聞いて危険な雰囲気を読み取る人もいるだろう。そういうことだ。
  8. まあ実際のコードを書く際には色々難しいこともあるけど、どのような plan() を使っても基本的には同様の処理結果を得られることを保証すべし、というのが future の設計哲学なのだよ。

future の解決と親プロセスのブロック

解決状況を親環境から確認する resolved() 関数

親がブロックされるか否かの条件は、実際はもう少し厄介であり、事は future という戦略の根幹に関わる。なので、各自で実験して確かめてほしい。Vignette に載っている例をアレンジして掲載しておく。


library("future")
plan(multisession)

# futue を明示的に作成する。デフォルトでは即座に評価が開始される。
fa <- future({ 
    cat("Hello world!\n")
    print(Sys.getpid())
    flush.console()
    Sys.sleep(10)
    42L
})

# resolved(fa) を連打して、future() の評価開始から約 10 秒後に TRUE になることを確認せよ。
resolved(fa) # バックグラウンドの future の状態を調べる関数が resolved()

# futue を明示的に作成し、かつ即時評価しない。
flazy <- future({ 
    cat("Hello world!\n")
    print(Sys.getpid())
    flush.console()
    Sys.sleep(1)
    42L
}, lazy=TRUE)

resolved(flazy) # resolved() を一回実行すると評価が始まる。

resolved(flazy) # 1 秒以上経過後に resolved() を再度実行すると TRUE になっている。

上のコードの挙動をまとめると future() を実行した段階で、{expression} の仕事内容を持った future が明示的に作成される。内容は単に各スレッドの Rscript のプロセス ID を出してから、最後に 42 という数字を返すだけなのだが、途中にスリープを入れたので 10 秒程度かかる。この expression は、デフォルト(lazy = FALSE)では定義後、直ちにバックグラウンドで実行され始める。

さて、上記で fa <- future({ expression }) をコンソールへコピペ&エンターすると、すぐさま別のコマンドを入力して実行可能な状態に戻る。あたかも一瞬で実行が完了したように見えるが、実際はフォアグラウンドからバックグラウンドのプロセスへ仕事を投げた段階だ。レストランのホールスタッフに伝えた注文が厨房へ通って、ホールは既に次の注文を受けるべく待機していると思えばいい。

従って料理ができるまでの約 10 秒間、future (料理のオーダー)は unresolved であり、厨房で料理が完成した瞬間に resolved になる。ここで店員に、料理が完成したかを問い合わせる操作が resolved(fa) である。

resolved() 関数のイメージ
    
    マダー?(・∀・ )っ/凵⌒☆チンチン

なお future() の定義時に lazy = TRUE というオプションを指定すると、シェフが怠ける。つまり注文を受けても調理が開始されず、客が resolved(fa) で急かして初めて重い腰を上げるのだ。

なお前掲のプレプリントによると、future パッケージが R の以前の並列化スキームに比べて最も優れている点の 1 つが、この「明示的な future の作成と解決」である。すなわち、並列処理を仕込む過程と、計算完了を待って値を取得する過程がデカップルされたことにより、メインプロセスがブロックされて別の仕事ができなくなるという欠点が解消される。

また、あるデータ変数(仮に x とする)を使った future を作り、これを解決する途中で親環境(ブロックされていない)上の x の値を、別の代入操作で弄ったとする。この場合、計算中の future の内容には影響しない。なぜかというと future() を使った時点で、そのときの x の値を含む形で future オブジェクトが作成されるためである。

計算結果を取得する value() 関数

さて resolved() 関数による future の解決状況の問い合わせは、親プロセスをブロックしない点で利便性が高い一方、ユーザー側から明示的に行う必要がある。つまりバックグラウンドでいつの間にやら計算が完了していても、リアルタイムには結果が返らない。一方、future の結果を返させるのが value() 関数だ。


#(評価終了後に)内容を見られるのが value 関数。
# 評価中に value 関数を使うと、親の R コンソールがブロックされることに注意。
value(fa)

future() の処理が終わるまでの残り時間、操作を受け付けなくなる(ブロックされる)。その後、
> value(fa)
Hello world!
[1] 6788
[1] 42
が出力される。

評価の完了後に、value(fa) を再度打ち込むと、fa は resolved になっているため
全て一瞬で出力される。

# value() 関数の値を(評価終了後に) R の適当なオブジェクトに代入することも可能。
a <- value(fa)

# a を裸で打つと、これは future ではなく単なる値なので
> a
[1] 42
だけが一瞬で出力される。途中のメッセージ(stdout に出る)は含まれないことに注意。

上のコード例に示したように、value() 関数を使うと future の解決時まで親プロセスがブロックされる。レストランでいえば resolved() 関数は単なる状況の問い合わせなので、状態が unresolved であったら「まだ料理が出来上がりませんのでお待ち下さい」と回答すれば済むが、value() は結果を要求する。「次に戻って来たときに料理を持ってこなかったら許さんぞ」と強要されたホールスタッフは、料理完成まで厨房で待つ羽目になり、接客担当がいなくなってしまうのだ。迷惑な客である。

なおコードにおいて、最初に value(fa) を打った時は Hello world! みたいな文字が出たのに、最後に a <- value(fa) したときの結果である a には、expression 自体の戻り値だけが入っており、メッセージが含まれないことを不審に思った方もいるだろう。これは 「value() 関数が呼ばれるたびに、その時点で評価中のプロセスにおける stdout() の内容がリレーされて、親の出力に表示される」仕様となっているためである。つまりメッセージ類が吐き出されている場所は、親のコンソールであって value(fa) の中ではないことに注意。こうしたメッセージを記録する方法については、前回記事の「furrr(future)による並列計算時のログをリアルタイムで書き出す」という節で扱っている。


# なお明示的に作成した future オブジェクトである fa を裸で打つと、
# 解決前であっても後であっても即座に future のメタ情報が表示される。
> fa
MultisessionFuture:
Label: ‘’
Expression:
{
    cat("Hello world!\n")
    print(Sys.getpid())
    flush.console()
    Sys.sleep(10)
    42L
}
Lazy evaluation: FALSE
Asynchronous evaluation: TRUE
Local evaluation: TRUE
Environment: 
Capture standard output: TRUE
Capture condition classes: ‘condition’
Globals: 
Packages: 1 packages (‘utils’)
L'Ecuyer-CMRG RNG seed:  (seed = FALSE)
Resolved: TRUE
Value: 56 bytes of class ‘integer’
Early signaling: FALSE
Owner process: bb3412d7-4320-1c4e-2086-70cf99028e58
Class: ‘MultisessionFuture’, ‘ClusterFuture’, ‘MultiprocessFuture’, ‘Future’, ‘environment’

暗示的に作成した future に対する resolved() の挙動

さて、Future は future({expression}) で明示的に作成する方法の他に、%<-% 演算子を使って暗示的に作成する操作も可能である。


# futue を暗示的に作成する。
a %<-% { 
    cat("Hello world!\n")
    print(1:3)
    flush.console()
    Sys.sleep(10)
    42L
}

発展的内容:なお暗示的な future 作成で遅延評価をさせようとすると、%lazy%(lazy 演算子)という特殊な部品を付けて 
a %<-% { expr } %lazy% TRUE 
という書き方にする必要がある。

暗示的な future 作成でも、デフォルトの挙動として定義後直ちに評価が開始される。この場合も評価はバックグラウンドで進行するので、基本的には親はブロックされない。

ただし、計算中に親の環境上で future オブジェクトにアクセスしたときの挙動が、明示的な future とは異なってくる。具体的には以下に示す通り、単なる問い合わせであったはずの resolve() でも、親がブロックされてしまう。


# 評価開始直後に、暗示的に作成した future オブジェクトである a を裸で打つと、
a

残り時間は操作を受け付けなくなる(ブロックされる)。その後、
Hello world!
[1] 1 2 3
[1] 42
が出力される。

なおこの場合は、 
resolved(a)
を評価開始直後に打ってもブロックされる。

さらに大きな違いとして、暗示的に作成した future には value() が使えない。


また、value(a) というコマンドは、評価中でも評価後でも基本的に使えない。
> value(a)
 UseMethod("value") でエラー: 
   'value' をクラス "c('integer', 'numeric')" のオブジェクトに適用できるようなメソッドがありません 

なぜ挙動が変わるのか。暗示的に future を作成する行為は、並列処理(非同期プログラミング)の専門用語だと「future を定義してから promise を作成するまでの一括操作」に相当するらしい。この辺の説明を R 言語に即して日本語で読める文献はあまり多くないが、株式会社ホクソエム「Shinyユーザのための非同期プログラミング入門」Slideshare によくまとまっている。また用語の定義やそれらが指す範囲はプログラミング言語によっても異なるため、厳密な仕様は Future パッケージのヘルプ PDF の 7 ページ目にある future 関数の "Details" を読んでほしい。

乱暴に説明すると promise は「非同期処理の結果、将来のいずれかの時点で利用可能になる成果物」を指す概念で、レストランだと厨房で作られているであろう料理に相当する。上のコード例では a %<-% { expr } の返り値 a は、future ではなく promise オブジェクトである。調理中の料理を客に出せないのと同じ道理で、このオブジェクトは注文後直ちに概念としては存在し始めるが、future が unresolved であるうちは親プロセスからアクセスできない。

前節の明示的な future 作成のコード例においては、fa <- future() してから a <- value(fa) することで成果物を得ていた。この a は単なる成果物である。一方すぐ上のコード例だと、a %<-% { expr } した値は、future オブジェクトをすっ飛ばして promise になる。

なお断っておくと現在の future package 実装では、暗示・明示いずれの方法を経由しても、最終的にユーザーの手元に戻ってくるオブジェクト a には promise というクラスは付与されず、単なる R のデータになる。

ワーカーの厳密な扱い

ところで上の例では plan(multisession) でクラスタを暗示的に作っている。Future パッケージはこの指定で、全ての利用可能なスレッド数だけ、Rscript のバックグラウンドプロセスを立ち上げる。だがトラブルシューティング等で、もう少し厳密にプロセスを管理したいこともある。この場合、 parallel パッケージの makeCluster() で作ったクラスタを plan(cluster, workers = cl) などとして指定することが可能。細かくは「Tidyverseによるデータフレーム加工(07)グループごとに一括処理する:その 4 マルチコアによる真の並列化のための furrr::future_map 系関数」に上げたコードを読んで欲しい。

(以下、書くべき内容がまとまったら追加)

なお計算中にリアルタイムでログを出す方法も、「Tidyverseによるデータフレーム加工(07)」にまとめてある。

その他この記事で取りこぼしている話題として、こちらのプレプリントに載っている内容だと、Globals and packages はまだ十分に解説されていない。Proper parallel random number generation も、特にシミュレーション系の人はちゃんと理解してから使うべきである。