須通り
Sudo Masaaki official site
For the reinstatement of
population ecology.

purrr::map2 の公式ヘルプに書いてあるように、purrr の並列化は、for文のように明らかな逐次処理コードを書かなくて済むという意味である。でも目の前のパソコンに CPU は幾つある?2つかもしれないし4つ、それ以上かもしれない。使えるものは使い倒そう。

ホーム | 統計 Top | Tidyverseによるデータフレーム加工(07)グループごとに一括処理する:その 4 マルチコアによる真の並列化のための furrr::future_map 系関数

今回のシリーズでは dplyr や purrr といった Tidyverse のパッケージを利用して、巨大なデータフレームに含まれている、ある変数列の水準別に、一括処理を行う手順を解説している。前回までにグループ化/階層化した tibble のグループごとに処理を適用し、計算結果を nest された tibble に格納したり、unnest したり hoist したりして計算結果を取り出す方法を検討した。

単に purrr::map を噛ますだけだと CPU は一つしか使われないのだが、R Studio の Davis Vaughan らが開発した furrr というパッケージの future_map() 関数を使うと簡単に、マルチコアを用いた真の並列計算が約束される(少々のトラブルシューティング知識は必要:この少々が曲者なんだなぁ)。変な名前のパッケージだが furrr = future + purrr だ。R でクラスタの管理を行う future パッケージと、上記 purrr を合体させ、map() 系の関数を簡単にマルチスレッド化できる。

Future パッケージ自体は、parallel パッケージを用いた古典的な(R version 2.x 世代の)並列化スキームを部分的に取り入れつつ、非同期プログラミングの文脈で古くから知られてきた、future というデザインパターンを R で実現する。これにより、ユーザーがクラスタの低レベルな構造を意識せずに、並列計算のコードを書いたりテストしたりすることが可能である。なお future の設計概念は少し複雑なので、同時公開される別のページに説明を譲る。

目次

本記事は以下の環境で検証している。


> library(furrr)
    要求されたパッケージ future をロード中です 
    警告メッセージ: 
1:  パッケージ ‘furrr’ はバージョン 3.6.3 の R の下で造られました  
2:  パッケージ ‘future’ はバージョン 3.6.3 の R の下で造られました  

> library(tidyverse)
-- Attaching packages --------------------------------- tidyverse 1.3.0 --
√ ggplot2 3.3.2     √ purrr   0.3.4
√ tibble  3.0.3     √ dplyr   1.0.0
√ tidyr   1.1.0     √ stringr 1.4.0
√ readr   1.3.1     √ forcats 0.5.0
-- Conflicts ------------------------------------ tidyverse_conflicts() --
x dplyr::filter() masks stats::filter()
x dplyr::lag()    masks stats::lag()
    警告メッセージ: 
1:  パッケージ ‘tidyverse’ はバージョン 3.6.2 の R の下で造られました  
2: replacing previous import ‘vctrs::data_frame’ by ‘tibble::data_frame’ when loading ‘dplyr’ 
3:  パッケージ ‘ggplot2’ はバージョン 3.6.3 の R の下で造られました  
4:  パッケージ ‘tibble’ はバージョン 3.6.3 の R の下で造られました  
5:  パッケージ ‘tidyr’ はバージョン 3.6.3 の R の下で造られました  
6:  パッケージ ‘purrr’ はバージョン 3.6.3 の R の下で造られました  
7:  パッケージ ‘dplyr’ はバージョン 3.6.3 の R の下で造られました  
8:  パッケージ ‘forcats’ はバージョン 3.6.3 の R の下で造られました  

> sessionInfo()
R version 3.6.1 (2019-07-05)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows 10 x64 (build 19042)

Matrix products: default

locale:
[1] LC_COLLATE=Japanese_Japan.932  LC_CTYPE=Japanese_Japan.932   
[3] LC_MONETARY=Japanese_Japan.932 LC_NUMERIC=C                  
[5] LC_TIME=Japanese_Japan.932    

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
 [1] forcats_0.5.0   stringr_1.4.0   dplyr_1.0.0     purrr_0.3.4    
 [5] readr_1.3.1     tidyr_1.1.0     tibble_3.0.3    ggplot2_3.3.2  
 [9] tidyverse_1.3.0 furrr_0.1.0     future_1.18.0  

loaded via a namespace (and not attached):
 [1] Rcpp_1.0.6       cellranger_1.1.0 pillar_1.4.6     compiler_3.6.1  
 [5] dbplyr_1.4.4     tools_3.6.1      digest_0.6.25    lubridate_1.7.9 
 [9] jsonlite_1.7.0   lifecycle_0.2.0  gtable_0.3.0     pkgconfig_2.0.3 
[13] rlang_0.4.7      reprex_0.3.0     cli_2.0.2        rstudioapi_0.11 
[17] DBI_1.1.0        parallel_3.6.1   haven_2.3.1      withr_2.2.0     
[21] xml2_1.3.2       httr_1.4.2       fs_1.4.2         hms_0.5.3       
[25] generics_0.0.2   vctrs_0.3.4      globals_0.12.5   grid_3.6.1      
[29] tidyselect_1.1.0 glue_1.4.2       listenv_0.8.0    R6_2.4.1        
[33] fansi_0.4.1      readxl_1.3.1     modelr_0.1.8     blob_1.2.1      
[37] magrittr_1.5     backports_1.1.9  scales_1.1.1     codetools_0.2-16
[41] ellipsis_0.3.1   rvest_0.3.6      assertthat_0.2.1 colorspace_1.4-2
[45] stringi_1.4.6    munsell_0.5.0    broom_0.7.0      crayon_1.4.1    

但し書き:本記事でそれなりに突っ込んだ使い方をしている future パッケージについて、クラスタの作成や制御に関わる一部の関数が parallelly という独立したパッケージにスピンオフした。具体的にいうと makeClusterPSOCK() 等である。当分は future::makeClusterPSOCK() の名前空間でも使えるが、これからの開発案件では parallelly::makeClusterPSOCK() へと移行することになる。このパッケージの一部機能については本記事でも解説するので参照されたい。

一応、本記事のスコープとしては同一のローカルマシン上で並列計算を行う。makeClusterPSOCK() では、他のマシン上のプロセスを混ぜてクラスタを形成することも可能である。この場合は SSH 等で接続を確立するのだが、背景知識ががらりと変わってくるので別の機会に説明する。

いきなり使ってみる

今回は筆者の実際の研究ネタである、freqpcr パッケージを教材に使う。freqpcr() は自作の統計ツールで、生物の個体群における対立遺伝子頻度を、定量 PCR の ΔΔCq 値から推定する。プレプリントはこちらから読める。パッケージ内でダミーデータを合成して、推定精度をシミュレーションするためのラッパー関数が用意されているため、今回の並列計算テストにうってつけである。なおパラメータの決め方次第で、0.1 秒程度で推定が完了する領域から 10 分以上を要する領域まで、様々な計算負荷を掛けられる(上記プレプリントの supplementary figures S5--S7 を参照)。


# 初めてならば freqpcr をインストール
library(remotes) # cran 以外のリポジトリからパッケージを入れるためのツール
install_github("sudoms/freqpcr")

# もしも Error: (converted from warning) package 'cubature' was built under R version 3.6.3 
# と出たら → 厳格すぎるルールのせいで警告がエラー扱いされているので、環境設定を緩める
Sys.setenv(R_REMOTES_NO_ERRORS_FROM_WARNINGS="true")
install_github("sudoms/freqpcr")

# もしも以下のエラーが出たら、
Downloading GitHub repo sudoms/freqpcr@master
Error in utils::download.file(url, path, method = method, quiet = quiet,  : 
   URL 'https://api.github.com/repos/sudoms/freqpcr/tarball/master' を開けません

# まず devtools と remotes をアップデートする。
# それでもダメなら、お使いの R でファイルをダウンロードするためのプロキシ設定を見直す。
options(download.file.method = "libcurl") # Mac の場合

library(freqpcr)
packageVersion("freqpcr")

# パッケージの使用法とパラメータの生物学的意味は以下のページから PDF ヘルプを参照
# https://github.com/sudoms/freqpcr/releases/latest

purrr の map() 関数によるシングルススレッドの一括処理

無事インストールできたら手始めに、定量 PCR 分析の結果として得られるであろう Cq 値のダミーデータセットを生成する。害虫の体重分布、集団中の対立遺伝子頻度、一度に分析される個体数といったパラメータを組み合わせた tibble である "om" を、tidyr::expand_grid() を用いて作る。その後 om の各行=各パラメータ領域について、freqpcr パッケージの make_dummy() 関数を用いてダミーデータ生成を行う。


library(tidyverse)
library(parallel) # >= R 2.14.0
library(future)
library(furrr) # install.packages("furrr")
library(installr) # install.packages("installr") # ただし Windows 専用。R 4.x に未対応
sessionInfo()
library(freqpcr); packageVersion("freqpcr");

# パラメータセットを設定
P <- c(0.01, 0.1) # 地域の害虫集団中の殺虫剤抵抗性遺伝子の頻度
K <- 1 # 害虫 1 個体から抽出される DNA 収量が従うガンマ分布の shape パラメータ
npertrap <- 2^seq(3, 6, by=1) # 1 回の定量 PCR サンプルに混合される害虫個体数
ntrap <- 2^seq(3, 6, by=1) # 定量 PCR 分析の反復数
diploid <- TRUE # 対象の害虫種が2倍体であるか

# om: パラメータの組み合わせを格納する tibble
om <- tidyr::expand_grid(P, K, ntrap, npertrap, replicate=c(1:5), diploid) %>%
    dplyr::mutate(  scaleDNA=(1/K)*1e-06, targetScale=1.2, baseChange=0.24,
                    EPCR=0.97, zeroAmount=1.6e-03, sdMeasure=0.2  ) %>% 
    dplyr::mutate(ntotal=ntrap*npertrap) %>%
    dplyr::filter(ntotal<=128, ntotal>=4, ntrap>=2) %>%
    dplyr::mutate(LID=1:nrow(.)) %>%
    dplyr::select(LID, everything())
set.seed(1729)
om$rseed <- sample(nrow(om), replace=FALSE) # 手動ロードバランシングのためランダム順ソート。
set.seed(NULL)
print(om)

# ダミーデータ生成
# 18000 regions -> 17 sec, 78,000 -> 271 sec, 156,000 -> 1668 sec
om <- om %>%
    dplyr::group_by(LID) %>%
    tidyr::nest() %>%
    dplyr::mutate(  DNA=purrr::map( .x=data,
                                    .f=~make_dummy( rand.seed=.$rseed, P=.$P, K=.$K,
                                                    ntrap=.$ntrap, npertrap=.$npertrap, 
                                                    scaleDNA=.$scaleDNA,
                                                    targetScale=.$targetScale, baseChange=.$baseChange,
                                                    EPCR=.$EPCR, zeroAmount=.$zeroAmount, 
                                                    sdMeasure=.$sdMeasure, diploid=.$diploid ) )  ) %>%
    tidyr::unnest(cols=data) %>%
    dplyr::ungroup() %>%
    dplyr::arrange(rseed) # 手動ロードバランシングのためランダム順ソート。
print(om)

om[1, "DNA"][[1]][[1]]; # ダミーデータの形を見たいときはリストを取り出す

なお RNG のシードを管理する都合上、ダミーデータを作成するプロセスはマルチスレッド化があまり簡単ではない。なので教育的意義と、実用上の要請の両面から、make_dummy() は purrr::map() で 1 CPU での並列処理とした。ダミーデータは各種データ形式のスロットが埋め込まれた S4 クラスのリストなので、map の戻り値も list 形式である。

furrr の future_map() 関数によるマルチスレッド化

いよいよダミーデータの組を用いて、母集団の対立遺伝子頻度を推定してみる。まずはクラスタの準備である。


# ログ出力先のファイルを(現在のセッション上に)作成
# https://stat.ethz.ch/R-manual/R-devel/library/base/html/connections.html
fout <- file(paste("otest", "210401", "txt", sep="."), "w")

# 利用可能なスレッド数マイナス 1 でクラスタを作成。たとえば 4 コア 8 スレッドの CPU なら 7 に。
cl <- future::makeClusterPSOCK(availableCores()-1)

# 既知の問題:MacOS の新しいバージョン (Catalina, Big Sur) では、future のインストール直後に 
# makeClusterPSOCK() すると管理者パスワードを要求され、入力しても反応せずにハングアップすることがある。
# 一旦 R セッションを、それでもダメならマシンを再起動するべし。

# cl.pid という名前で、クラスタが初期化した子プロセスの名前を記録しておく。何気に超重要(後記)
cl.pid <- purrr::map_int(cl, ~pluck(., "session_info", "process", "pid"))
print(cl.pid) # クラスタで初期化された Rscript.exe のプロセス ID 一覧

# クラスタ上の R プロセスにライブラリをロード
invisible(clusterEvalQ(cl, library(freqpcr)))

# future パッケージの plan() 関数で、先ほどのクラスタを使うよう設定
future::plan(cluster, workers=cl)
nbrOfWorkers() # 使用コア数が増えていることを確認

ここから、furrr で並列化した実際の推定プロセス。多分 1 分くらいで終わる。タスクマネージャの「詳細」タブで Rscript.exe の挙動を見ながら操作するといい。


ptime0 <- proc.time()
e1 <- try( {
cat(paste("Calculation started in ", Sys.time(), "\n", sep="")); flush.console();
capture.output( temp <- om %>%
        dplyr::group_by(LID) %>%
        tidyr::nest() %>%
        furrr::future_map(  .x=.$data,
                            .f=~sim_dummy(  CqList=pluck(.$DNA, 1), EPCR=.$EPCR, 
                                            zeroAmount=.$zeroAmount, K=.$K, 
                                            beta=TRUE, diploid=.$diploid,
                                            maxtime=120, print.level=1, 
                                            aux=c(  replicate=.$replicate, rseed=.$rseed,
                                                    P=.$P, K=.$K, ntrap=.$ntrap, 
                                                    npertrap=.$npertrap, beta=TRUE  )  )  ),
    file=fout, append=FALSE, type=c("output") ) # "NUL" or /dev/null (unix) to supress the output
    cat(paste("Calculation ended in   ", Sys.time(), "\n", sep=""))
}, silent=FALSE )
print(proc.time()-ptime0)

# 後始末の定形処理
try(close(fout)) # ログの書き出し接続を閉じる。既に閉じられている場合エラーになるので、try() で囲む。

future::plan(sequential) # future パッケージによるクラスタ使用を停止する(直列処理に戻す)
nbrOfWorkers() # ワーカー数が 1 に戻っていることを確認

# ※上記の後始末は「future と cl クラスタの繋がりを断った」だけであることに注意。
# cl 自体の後始末も必要であり、その手順は下で解説する。

長い計算ではプロセス間のコネクションが切れて、計算結果の受取に失敗することがあるので、try() 文で挟んである。

クラスタの終了処理

さて、クラスタを立ち上げて並列計算を行ったら、結果が成功であれ失敗であれ、使ったリソースは片付けねばならない。基本的には parallel::stopCluster(cl) を実行して、クラスタを閉じればリソースも解放される。あるいはシンプルに Rgui を閉じるだけでも良く、クラスタ cl と Rscript 間のコネクションが生きていれば、内部で立ち上げられた子プロセスも自動で閉じる。

問題は、並列計算中にコネクションエラーとなって future_map() が止まったとき。ローカルマシンであれば、無視するのも 1つの手ではある。だが制御を離れた Rscript が計算リソースを食い潰していて、次のジョブが投入できないならば、以下のように手動で停止させる。ここで、cl の立ち上げ時に記録しておいたプロセス一覧こと cl.pid が役に立つ。


# クラスタの終了処理(2020 年版)

# まず、起動中の cl を停止。失敗の危険性を考慮し try() で囲んでおく。
stopcl <- try( parallel::stopCluster(cl) )

上記のように connection failure が発生していた場合、stopCluster(cl) がエラーになる。
Error in serialize(data, node$con) ← このメッセージでコネクションエラーと分かる。
システム上にゴミを残さないためには、自分が cl 内で作った Rscript.exe のプロセスたちを
責任を持ってブッコロする必要がある。

# 方法 1:先に記録しておいた一覧である cl.pid を参照して、該当する ID のプロセスを停止する
installr::kill_pid(cl.pid)

# 方法 2:システム上に存在する Rscript.exe という名前のプロセスを全停止する
installr::kill_process("Rscript")

# 後は定形で、コネクションエラーの有無に関わらず、現在の R が持つ外部プロセスへのコネクションを全て閉じる。
closeAllConnections() 
# ただし、closeAllConnections() で止まるのはユーザーが作ったコネクションである。
# stdin, stdout, stderr はシステムが持っているので、止まることはない。

# 最後にクラスタ関連情報 → クラスタ自体を削除。
rm(stopcl) 
rm(cl.pid)
rm(cl) # stopCluster(cl) で外部 Rscript は終了するが、クラスタの残骸は R 上に残っているので。

gc(); gc(); # 申し訳程度にガベージコレクション

方法 2 は自分や他のユーザーが作ったものを問わず、マシンで動作中の R スクリプトを無差別に消してしまうので、通常は方法 1 を用いること。なお installr が使えなければ R 上からのシステムコマンド操作、たとえば system2() 関数等を使う手もある。あるいは cl.pid とにらめっこしつつ、タスクマネージャで手動削除してもいい。

クラスタの自動削除

なお parallelly パッケージでクラスタを管理する場合は、auto stop という機能が便利だ。今後はこちらへの移行を推奨する。以下のような使い方をする。


library(parallelly)
library(furrr)
# ちょっと前の方法
#cl <- parallelly::makeClusterPSOCK(availableCores()-1, timeout=10) # クラスタを初期化
#cl <- parallelly::autoStopCluster(cl) # 作成したクラスタに auto stop をセット

# なお、makeClusterPSOCK() 本体にも autoStop=TRUE をオプション指定できるようになった。
# 今後はこちらを使うほうが便利だろう。
cl <- parallelly::makeClusterPSOCK(availableCores()-1, timeout=10, autoStop=TRUE)

future::plan(cluster, workers=cl)
temp <- furrr::future_map(  .x=c(8:11),
                            .f=~Sys.sleep(.x)  ) # 実際のジョブ
future::plan(sequential) 

showConnections()

rm(list="cl") # R 環境から cl を削除する。「コネクションの切断」もここで起こる。
showConnections()
gc() # システム上に残っている野良 Rscript がここで削除される
showConnections()

何ができるかというと、計算が終わった後で R 環境中のクラスタオブジェクト cl を削除し、gc() でガベージコレクションを実施すると、auto stop がセットされているクラスタの Rscript を自動で停止・削除してくれる。

この auto stop には利点があって、クラスタの終了処理がエラーを吐いて、解析パイプラインが途中で止まるリスクを、最小限に押さえられる。たとえば上のコード例では timeout が 10 秒と短く設定されているため、計算が終了した暁には一部ノードの Rscript が落ちることになる(タイムアウトについては下のセクションを読んでほしい)。この状態から parallel::stopCluster(cl) などとしてクラスタを手動終了しようとすると、(実世界に存在しない Rscript プロセスにアクセスを試みて)コネクションエラーが発生する。しかし予め auto stop をセットしておけばノードの生死に関わらず、最初の rm(list="cl") で R 環境内から、次いで gc() でシステム上から、いずれもエラーを出さずに削除してくれる。

future_map() について

今回使ったデータセットでは、パラメータセットが om の各行のトップ階層に存在し、さらに make_dummy() で生成したダミーデータ(S4 クラス CqList に属する)が om$DNA のリスト要素として埋め込まれている。sim_dummy() は、先に作ったダミーデータを第一引数に取り、内部的に freqpcr() を呼び出して推定した結果のオブジェクト(S4 クラス CqFreq)を返す。

さて、この furrr::future_map() は何をやっているのか。最初にダミーデータ生成で使った purrr::map() のアナロジーであることがお分かりだろう。purrr::map() を用いて(シングルスレッドで)並列処理する方法は、当サイトで既に「Tidyverseによるデータフレーム加工(05)グループごとに一括処理する:その 2 nest および map 系関数」というページの「nest + mutate + map(+ ラムダ式)による方法」という節で解説した。要するに、group_by(LID) でユニーク行を抽出、nest() で環境分離。得られた 1 行 n 列の nested tibble から取り出した、$DNA 列にあるダミーデータを sim_dummy() に放り込んで、推定プロセスを実行する(このとき map 系関数では処理をラムダ式で括る必要がある)。

先の記事では mutate を用いて、戻り値を元の tibble の新規列に代入していた。しかし今回は計算結果を加工してから om に代入したいので、temp という名前で置いておく。

ログの扱いについて(初級編)

ダミーデータに基づく推定用の関数が sim_dummy()(freqpcr のラッパー)だが、こいつには動作ログを標準出力へ出す機能がある。実験に使ったパラメータの値はもちろん freqpcr 関数には受け渡されないので、各パラメータ領域のログを保存する必要があるのだ。クラスタからの一番簡単なログ出力の実装方法として、とりあえず各ノードで future_map() が走らせる処理の全メッセージを親の R 環境に返させておき、これを外部ファイルに流すことにした。

そのために処理全体を capture.output(処理, file=fout, append=FALSE, type=c("output")) という関数で挟んでいる。上のチャンクで fout <- file("ファイル名.txt", "w") という形で、ローカル環境に予め、書き込み可能なテキストファイルを作成したことに注意されたい。このローカルファイルには、R 環境上では fout という名前でアクセスできる。fout は、書き込み可能な外部ファイルへの「コネクション」である。コネクションを閉じるまでの間、R 環境から fout 宛に流したデータが、外部に記録されるわけだ。なお type=c("output") というフラグで、標準出力だけをファイルへ書き出し、標準エラー出力 strerr() は R コンソールへ出すようにしている。

(蛇足)base R の file() 関数についての公式ヘルプは Functions to Manipulate Connections (Files, URLs, ...) というページにある。だが R file でググると File Manipulation という、ローカルディレクトリにあるファイルやフォルダを一覧操作する関数群の情報が出てくる。紛らわしいのである。

なお、このログの扱いには「並列計算の最後に全部まとめて出すので、途中で 1 ノードでも止まると何も記録されない」という重大な欠点がある。対処として各ノードから個別にログをリアルタイムで吐かせるコードを書いたので、このページの最後で紹介する。

並列処理の検死報告を行う

上記の計算はデモ用に組み合わせを絞ってあるので、数十秒間で終わる。失敗しても try() で結果を受け止められるので、操作元の R のグローバル環境上ではエラーを出さずに停止する。クラスタ全体として計算が失敗したかどうかは、以下で調べる。


class(e1)

この戻り値が "try-error" という文字列だった場合、コネクションエラー等の理由で future_map() が異常終了したと考えられる。なおsim_dummy() の内部で freqpcr() の推定が失敗することもしばしばあるが、sim_dummy() そのものにも try() を用いた安全装置が組み込まれており、基本的にはエラーを返さないようになっている。

クラスタに含まれる Rscript.exe を調べる

何らかの理由で e1 がエラーになったとする。まず我々ができることは、クラスタ内で初期化された子プロセス(Rscript)の各々が生きているか、死んでいるかの調査である。このセクションでは以下、筆者の環境で過去に生じたエラーメッセージを表示している。残念なことに「確実にエラーを出す」条件が、並列処理では再現困難であるため、見せるだけになってしまうが堪忍してほしい。

まず、計算前にクラスタを立ち上げたときの情報。立ち上げたクラスタに含まれている Rscript の プロセス ID を見るには、cl オブジェクトの $session_info$process$pid を見る。クラスタの内外に関わらず、現在のマシン上で動いている Rscript を全て見るには、Windows なら installr パッケージの get_Rscript_PID() 関数を使うと楽。なお installr は R version 4 系列では、2021 年 4 月現在で未サポート。


# 現在マシン内で動いている、全ての Rscript.exe のプロセス ID を見る
print(installr::get_Rscript_PID())

# 操作元の R 上で、cl ことクラスタオブジェクトが現在認識している子プロセスの ID を見る
print(purrr::map_int(cl, ~pluck(., "session_info", "process", "pid")))

たとえば最初に 16 スレッドで cl を立ち上げてから、計算前に 
installr::get_Rscript_PID() 
もしくは 
purrr::map_int(cl, ~pluck(., "session_info", "process", "pid")) 
した場合には、どちらも同じく、16 個の子プロセス全ての process ID が表示される。

[1] 31336 32552  3204 31908 37512 30604 11520 34992 37576 37368 25468 30224 24220 31956 26356 28760

さて、この状態から始めた並列処理の内部で、何らかのエラーが出たとする。このエラーに付随して親の R プロセスと、クラスタ内のあるノード、すなわち子プロセスとのコネクションが計算中に閉じてしまった場合、何が起こるか。当該の子プロセスは、親側の cl 内では依然「あるもの」として認識されているが、実際のマシン上では既に、当該 ID を持つ Rscript.exe が終了してしまって存在しない状態になる。そのせいで、典型的には以下のようなエラーメッセージとともに親側の処理も中断してしまう。


furrr::future_map() で一括計算中にコネクションが切れると、以下のようなエラーが出る。

Error in unserialize(node$con) : 
Failed to retrieve the value of ClusterFuture () from cluster SOCKnode #4 (PID 31908 on localhost ‘localhost’). The reason reported was ‘ コネクションからの読み取りエラーが発生しました ’. Post-mortem diagnostic: No process exists with this PID, i.e. the localhost worker is no longer alive.

この状態に陥ったら、まず process ID を再確認すると良い。


class(e1) == "try-error" だったときの動作解析

# この状態で installr::get_Rscript_PID() すると
[1] 31336 32552  3204 37512 30604 11520 34992 37576 37368 25468 30224 24220 31956 26356 28760

# この状態で purrr::map_int(cl, ~pluck(., "session_info", "process", "pid")) すると
[1] 31336 32552  3204 31908 37512 30604 11520 34992 37576 37368 25468 30224 24220 31956 26356 28760

もうお分かりであろう。ID = 31908 の Rscript.exe がタイムアウトで消滅しているのに、cl 側からは依然存在するものと認識されており、番町皿屋敷の 1 枚足りなぁい状態である。

コネクションエラーを回避する方法は?

ここまで furrr の大雑把な使い方と、エラーが出たときの片付け方法を解説してきた。だが重要なのは、いかにしてエラー箇所を特定し、正しいコードに書き直して大規模計算を完了させるかである。困ったことに、(ノード上の標準エラー出力を完全には回収できないため)親の R セッションに事後表示されるエラーメッセージだけでは、コネクションエラーの原因は絞り込めないらしい。トルストイの言葉を借りると、計算が完了する過程はどれも似たものだが、エラーを出す過程は色々あるのだ。一応、原因特定に役立ちそうな情報を置いておく。

ノードにエクスポート出来ないオブジェクト

細かいことはこちらの vignette "A Future for R: Non-Exportable Objects" および future パッケージの issue 330 を読んでほしい。そこそこ長い vignette なので要点だけメモっておくと、

  1. 一部タイプのオブジェクトは、それが生成された R のセッションに紐付いており、並列計算用に新しく作ったノードに「エクスポート」できない。future package では “non-exportable objects” と呼ばれている。
  2. たとえばローカルファイルへ動作ログを吐かせたいとき、親セッション中で file("output.log", open="wb") として作成したファイルへのコネクションは、external pointer の実体を持つ。これは典型的な non-exportable objects である。
  3. つまりクラスタへ投げる処理のコード中では、グローバル変数のうち non-exportable なものを呼び出すことは原則 NG。External pointer だからといって常に子ノードからアクセスできないわけでもないが、仮に子ノードがメモリ上のアクセス不可能な、あるいは禁止されている場所へアクセスを試みてしまうと、ノードが突然死する原因となる。
  4. これを明示的に禁止するには options(future.globals.onReference = "error") というコマンドを、予め親セッション上で実行しておくべし。(2020年現在の)環境変数 future.globals.onReference のデフォルトは "ignore" で、non-exportable でもお目溢しする。親のセッション中に存在する non-exportable objects を都度全て調査するとオーバーヘッドが発生するのと、常に失敗するわけでもないので、一律に禁止すると不便だそうな。
  5. 他の典型的な non-exportable objects が vignette に列挙されている。そのほとんどが external pointer タイプのデータだ。

須藤自身がよく経験するのが、xml2 パッケージでウェブスクレイピングするときの xml_document オブジェクトである。これは R では珍しい参照渡しのオブジェクトだ(xml2 は近いうちに解説シリーズを始めると思う)。要するに、巨大な画像やテキストやデータベースをメモリ上に展開して、コピー操作抜きに中身を加工したい場合に頻出する。

また Rcpp の sourceCpp() 関数を使って、R セッション内でインスタントに作成した C++ の関数も、同様に external pointer で保持されている。ベストプラクティスは、自作関数はパッケージに固めてから読み込ませること。次点は clusterEvalQ() を使って、各ノード上でも sourceCpp() する。もちろんオーバーヘッドがあるので、個々のタスクが小さい時は効率が悪い。

コネクションのタイムアウト

複数のノード(子プロセス)に仕事を割り振って計算をヨーイドンで始めるのだが、先に自分の割当分を全て計算してしまったノードは待機状態になる。他のノードが全て仕事を終えてから、あらためて全ノードの計算結果を回収するのである。だが、ここで親の R セッションとのプロセス間通信が一定時間行われないと、ノードが勝手に終了してしまうらしい。

タイムアウトまでの時間を制御したければ、plan(multisession) で暗示的にワーカーを作るのではなく、future::makeClusterPSOCK() で明示的に作成したクラスターに plan(cluster, workers=cl) で仕事を割り当てる。この際 makeClusterPSOCK(..., timeout=10) などとすれば、親と子の間の通信が 10 秒間発生しなかったタイミングで自動的に、子のプロセスが終了するようになる。


# わざとタイムアウトを短くしてコネクションエラーを発生させるコード
cl <- future::makeClusterPSOCK(availableCores()-1, connectTimeout=10, timeout=20)
cl.pid <- purrr::map_int(cl, ~pluck(., "session_info", "process", "pid"))
invisible(clusterEvalQ(cl, { sink(paste("otest", Sys.getpid(), "txt", sep="."), append=TRUE) }))

invisible(clusterEvalQ(cl, { print("aaa") })) # sink test
Sys.sleep(5)
invisible(clusterEvalQ(cl, { print("bbb") })) # sink test after 5 second interval.
Sys.sleep(15)
invisible(clusterEvalQ(cl, { print("ccc") })) # sink test after 15 second interval.
Sys.sleep(25)
invisible(clusterEvalQ(cl, { print("ddd") })) # sink test after 25 second interval.

上のコードを実行すると分かるが、20秒間待ってやるとの約束で作ったクラスタなので、Sys.sleep(20) までは大丈夫であり、"aaa" と "bbb" と "ccc" は外部テキストファイルへ書き込まれる。しかし 25 秒は待ってくれない。タスクマネージャを開いて観察すると、"ccc" が書き込まれた 20 秒後に Rscript.exe が全てバルスされ消滅。その 5 秒後に R コンソールへ「serialize(data, node$con, xdr = FALSE) でエラー: コネクションへの書き込みエラーが発生しました」というメッセージが出ることが分かる。

より正確に調査して判明したタイムアウトの挙動は「makeClusterPSOCK(..., timeout = s) で 指定した s 秒よりも長い時間、親子間の通信が発生しなかったノードは、すでに自ノードへ割り当てられたジョブがあれば全うするが、それらの結果を親へ返した直後に終了する」であるらしい。以下のコードでタイムアウトまでの秒数を変えて実験してみた。


library(parallel)
library(parallelly)
library(furrr)
library(tidyverse)
myfunc <- function(x, s) {
    cat(paste("Calculation started in ", Sys.time(), "\n", sep=""))
    print(paste("Wait ", s, " seconds and return ", x, ".", sep=""))
    flush.console()
    Sys.sleep(s)
    x
} 

cl <- parallelly::makeClusterPSOCK(3, connectTimeout=10, timeout=8, autoStop=TRUE)
invisible(clusterEvalQ(cl, { sink(paste("otest", Sys.getpid(), "txt", sep="."), append=TRUE) }))
future::plan(cluster, workers=cl)

dat <- data.frame(x=1:10, s=rep(10, 10))
e1 <- try( {
    temp <- dat %>%
        dplyr::group_by(gr=1:nrow(.)) %>% 
        tidyr::nest() %>%
        furrr::future_map(  .x=.$data,
                            .f=~myfunc(.$x, .$s), .options=furrr_options(stdout=NA)  )
}, silent=FALSE )

invisible(clusterEvalQ(cl, { sink() }))
future::plan(sequential)
rm(cl)
gc()

########
結果:
timeout = 80, s = 10: 各ノードのログ記録、計算後の親への結果クエリともに成功
timeout = 50, s = 10: 各ノードのログ記録、計算後の親への結果クエリともに成功
timeout = 25, s = 10: 各ノードのログ記録、計算後の親への結果クエリともに成功

timeout =  8, s = 10: 各ノードのログ記録、計算後の親への結果クエリともに成功するが、
先に計算が終わって結果を返したノードから順にプロセスが閉じてしまう。
そのため計算後の invisible(clusterEvalQ(cl, { sink() })) で以下のエラーメッセージが出た。

 serialize(data, node$con, xdr = FALSE) でエラー: 
   コネクションへの書き込みエラーが発生しました 

つまり没交渉な時間が timeout の条件を満たすと、満たした直後に子プロセスが死ぬのではなく、割り当てられたジョブを終えて結果を親に返した直後のタイミングで子が死ぬのである。

また、future_map() において 1 領域計算が終わるごとに、親子間の通信が発生しているようだ。さもないと timeout = 25 の組でもエラーが出るはずだからである。

これらの挙動は、クラスタ間で計算の所要時間が極端に異なる場合、終了処理に支障をきたすリスクとなる。特に autoStop=TRUE を付けずにクラスタを手動管理している場合が問題で、というのも future::plan(sequential) は cl が閉じていても正常に動作するが、stopCluster(cl) は cl の全ノードが生きていないと、コネクションエラー(Error in serialize(data, node$con, xdr = FALSE))を返すのだ。一応 makeClusterPSOCK() のデフォルトは timeout = 30*24*60*60 つまり 1 ヶ月なので、最初にゴールインしたノードと最後まで掛かるノードの差が数日程度であれば、実用上は問題ないと思う。

コネクション数の欠乏

伝統的に R と外部プロセス間のコネクションは、システムが常に確保している stdin, stdout, stderr に加え、ユーザーが作成可能な最大 125 個までと決められている。つまり、125スレッドを超える並列計算は何らかのカスケード構造を作らない限りは実行できず、これが future の能力を制約する因子となってきた。なお本制約は、R のソースに #define NCONNECTIONS 128 としてハードコーディングされているため、当該部分のソースを書き換えて R を自力ビルドすれば撤廃できる。

(調査中)parallelly::isConnectionValid() によるコネクションのチェック

実は parallelly そのものが 2020 年秋に初めて CRAN へ掲載された新しいパッケージなので、本記事の草案時点では使えなかった方法だが、isConnectionValid() という関数が実装された。こいつは R 環境上から、先に作ったクラスタの各ノードへのコネクションが、現在でも利用可能か否かを調べられる。コネクションが使えない場合は、原因をある程度特定して報告してくれる → はずなのだが、なんか上手く動かないので調査中。

この関数を使うには、調査対象のコネクションオブジェクトの指定(クラスタの名称ではない)が必要。たとえば以下のような感じ。


library(parallelly)
library(furrr)
library(tidyverse)
cl <- makeClusterPSOCK(2, timeout=10, autoStop=TRUE)
cl2 <- makeClusterPSOCK(2, timeout=1000, autoStop=TRUE)

future::plan(cluster, workers=cl)
temp <- furrr::future_map(  .x=c(11:12),
                            .f=~Sys.sleep(.x)  ) # cl だけにジョブをぶち込む
future::plan(sequential) 

# 現在存在するコネクションを情報表示(all: ユーザー以外が作ったものも含む)
base::showConnections(all=TRUE)

# こんな感じ
> base::showConnections(all=TRUE)
  description               class      mode  text     isopen   can read can write
0 "stdin"                   "terminal" "r"   "text"   "opened" "yes"    "no"     
1 "stdout"                  "terminal" "w"   "text"   "opened" "no"     "yes"    
2 "stderr"                  "terminal" "w"   "text"   "opened" "no"     "yes"    
3 "<-DESKTOP-6KOQKLF:11928" "sockconn" "a+b" "binary" "opened" "yes"    "yes"    
4 "<-DESKTOP-6KOQKLF:11928" "sockconn" "a+b" "binary" "opened" "yes"    "yes"    
5 "<-DESKTOP-6KOQKLF:11356" "sockconn" "a+b" "binary" "opened" "yes"    "yes"    
6 "<-DESKTOP-6KOQKLF:11356" "sockconn" "a+b" "binary" "opened" "yes"    "yes"    

# まず、表記が簡単な標準エラー出力の情報を isConnectionValid() で表示してみよう
parallelly::isConnectionValid(stderr()) # [1] TRUE

# また connectionId() で、コネクションの番号が分かる。
connectionId(stderr()) # [1] 2

# 続いて、ユーザーが作ったコネクションの調査方法。
parallelly::isConnectionValid(base::getConnection(3)) # [1] TRUE
parallelly::isConnectionValid(base::getConnection(4)) # [1] TRUE
parallelly::isConnectionValid(base::getConnection(5)) # [1] TRUE
parallelly::isConnectionValid(base::getConnection(6)) # [1] TRUE
parallelly::isConnectionValid(base::getConnection(7)) # base::getConnection(7) でエラー:  7 というコネクションはありません

# あれ? 2 つは内部の Rscript が死んでいるはずなのに、全部 Valid となる。

# 未検討の問題: cl や cl2 といったクラスタ名から、子飼いのコネクションを調べる方法。


# 後始末
rm(list="cl")
rm(list="cl2")
gc()

その他のエラー

Future パッケージの基本設計では、future 内部の処理において発生したエラーは、そのままの形で親の R 環境へエラーとして出力される。これは、future 以前の snow や parallel といった並列化スキームの挙動を再現しており、ユーザーの学習・再実装コストをなるべく抑えるための配慮である。一方、ワーカーの Rscript が死んだとかコネクションがタイムアウトになったとか、future パッケージそのものに責任があるエラーは、 FutureError という特別なクラスでマークされる。こちらのプレプリントによれば FutureError の存在をトリガーとして、失敗したジョブの自動再投入なども原理的に可能であるが、まだ開発が追いついていないらしい。

また R そのものの仕様の限界により、子の各ノードで生じた標準エラー出力の結果を、全く取りこぼすこと無く親へリレーすることは、future では困難らしい。特にコネクションエラーが発生した場合は大変だ。トラブルシューティングに十分な情報を集めようと思うと、突然死してしまった Rscript から、死ぬまでの状況を逐一ログとして書き出させる必要がある。次の節では、ログ出力を子自身に行わせる方法を検討する。

furrr(future)による並列計算時のログをリアルタイムで書き出す

最初に出したコード例では、capture.outoput() を並列処理(temp <- furrr::future_map(処理) の部分)の内側ではなく、外側に配置した。しかしながらログの書き出し単位が全ノードの和になるため、並列処理中にとりあえず計算できたところまでのログを閲覧できなかったり、途中でノードが死ぬとログが取り出せなくなったり、メモリが枯渇しやすくなったりする問題が残る。

これまでの説明から分かるように、動作ログを都度書き出すようなコードは並列化では一工夫を要する。安全と思われる方法の 1 つは、それぞれの子プロセス内で自分の Process ID なんかを使って、ユニークな名前のログファイルを生成させることだ。どうしても親プロセスからログを使う必要があれば、全ての計算が終了した後、名前でローカルファイルにアクセスさせる。ログをクラスタ内で生成させるコードを以下に紹介しておく。


# sink 命令 + Tee 演算子を使い、クラスタの各ノードから個別にログを書き出す
library(magrittr)

cl <- future::makeClusterPSOCK(availableCores()-1, connectTimeout=10, timeout=20)
cl.pid <- purrr::map_int(cl, ~pluck(., "session_info", "process", "pid"))
print(cl.pid)
invisible(clusterEvalQ(cl, library(freqpcr)))

# 各クラスタの出力を個別のテキストファイルへ sink する
invisible(clusterEvalQ(cl, { sink(paste("otest", Sys.getpid(), "txt", sep="."), append=TRUE) }))
invisible(clusterEvalQ(cl, { print("aaaaaaaaa") })) # sink test

future::plan(cluster, workers=cl)
nbrOfWorkers() # クラスタのコア数が増えていることを確認

ptime0 <- proc.time()
e1 <- try( {
    cat(paste("Calculation started in ", Sys.time(), "\n", sep="")); flush.console();
    temp <- om %>%
        dplyr::group_by(LID) %>%
        tidyr::nest() %>%
        furrr::future_map(  .x=.$data,
                            .f=~sim_dummy(  CqList=pluck(.$DNA, 1), EPCR=.$EPCR, 
                                            zeroAmount=.$zeroAmount, K=.$K, 
                                            beta=TRUE, diploid=.$diploid,
                                            maxtime=120, print.level=1, 
                                            aux=c(  replicate=.$replicate, rseed=.$rseed,
                                                    P=.$P, K=.$K, ntrap=.$ntrap, 
                                                    npertrap=.$npertrap, beta=TRUE  )  ) %T>%
                                print, .options=furrr_options(stdout=NA)  );
    cat(paste("Calculation ended in   ", Sys.time(), "\n", sep="")); flush.console();
}, silent=FALSE )
print(proc.time()-ptime0)

invisible(clusterEvalQ(cl, { sink() })) # sink は使ったら戻す。

future::plan(sequential) 
nbrOfWorkers() 
(stopcl <- try( parallel::stopCluster(cl) )) # NULL なら成功

上記の挙動は「各ノードの標準出力 → すぐさまノード内の sink に出力されて個別のログファイルに書き込み」。だが、これを future のデフォルト設定でやろうとすると実際には、future_map( ) 内部の出力は全て、並列計算の完了を待ってから親の R コンソールに出てしまう。Future パッケージの vignette 2 によると、計算後に結果のクエリが行われた時点で stdout がノードから再リレーされて、初めてメッセージが表に出てくるのが仕様。つまりノード内にリアルタイム書き込み処理を入れるためには、このリレー機能を一時的に潰す必要がある。

というわけで furrr::future_map() の制御パラメータとして、.options = furrr_options(stdout=NA) を指定した。なおデフォルトは stdout=TRUE で、上記の通り子プロセスの stdout が親にリレーされる(さらに FALSE というオプションもあり、出力が null デバイスに送られて消滅する)。※ furrr v 1.0 では future_options() という名前だった。

さらに Tee 演算子 というものを使って、sim_dummy() 関数の標準出力を、まず内部ノードの標準出力へ送る分岐を作る(これがないと、最終的には親の標準出力へログが吐かれてしまう)。Tee は magrittr で追加定義される、特殊なパイプ演算子の一種だ(原義は英語の T を指すらしい)。lhs %>% rhs で繋いでいく通常のパイプ演算子は、lhs(左辺)から来たデータを、rhs(右辺)に配置した処理関数の第一引数に放り込む。この処理関数が返す結果を、後続のパイプライン中で入力として使いつつ、R のコンソール画面上やグラフ上にも情報を示したい場合、どうするか。つまり lhs %>% 処理1 %>% 処理2 %>% (以下、処理3, 4, ...)があったとして、処理 2 以降の計算もやりつつ、処理 1 の途中経過を見ておきたい。

こんなとき、Tee 演算子を使うと lhs %>% 処理1 %T>% print %>% 処理2 %>% ... や、lhs %>% 処理1 %T>% plot %>% 処理2 %>% ... といった書き方ができる。つまり、処理 1 の結果をそのまま処理 2 へ送るラインを維持しつつ、途中に 1 回脇道として表示関数を挟めるのだ。なお Tee のある位置に通常のパイプ演算子を入れると、計算結果が全部ログに流れて、親に返る結果が null になってしまう。

手元でテストした限りでは、あえてログファイルの名前を全ノード共通にすることも可能であった。ただし積極的には勧めない。各ノードの計算が 1 領域だけ進むごとに、当該部分のログが書き出される。だが、1 つの書き込みが完了する前に、別のノードからの書き込みセッションが割り込んでしまい、ログが切断されてしまうことがまれにあった。

capture.output() を furrr::future_map() 内部で使えるか?

なお、元のコードは capture.output() を使っていたので、そちらも試してみたが色々問題があり、実用的でない。


# capture.output() + Tee 演算子を使う案 → コレは上手く行かない!
library(magrittr)

cl <- future::makeClusterPSOCK(availableCores()-1, connectTimeout=24*60*60, timeout=24*60*60)
cl.pid <- purrr::map_int(cl, ~pluck(., "session_info", "process", "pid"))
print(cl.pid)
invisible(clusterEvalQ(cl, library(freqpcr)))

# 各クラスタ内で capture.output() を使う方法
invisible(clusterEvalQ(cl, { fout <- file(paste("otest", Sys.getpid(), "txt", sep="."), "w") }))
clusterEvalQ(cl, { fout }) # 各クラスタの fout の実体(connection class object)を見ておこう
# 各クラスタに存在する fout への接続を試す
clusterEvalQ(cl, { capture.output(Sys.getpid(), file=fout, append=TRUE) }) 

future::plan(cluster, workers=cl)
nbrOfWorkers() # クラスタのコア数が増えていることを確認

ptime0 <- proc.time()
e1 <- try( {
    cat(paste("Calculation started in ", Sys.time(), "\n", sep="")); flush.console();
    temp <- om %>%
        dplyr::group_by(LID) %>%
        tidyr::nest() %>%
        furrr::future_map(  .x=.$data,
                            .f=~sim_dummy(  CqList=pluck(.$DNA, 1), EPCR=.$EPCR, 
                                            zeroAmount=.$zeroAmount, K=.$K, 
                                            beta=TRUE, diploid=.$diploid,
                                            maxtime=120, print.level=1, 
                                            aux=c(  replicate=.$replicate, rseed=.$rseed,
                                                    P=.$P, K=.$K, ntrap=.$ntrap, 
                                                    npertrap=.$npertrap, beta=TRUE  )  ) %T>%
                                capture.output( file=fout, append=TRUE )
                            , .options=furrr_options(stdout=NA)  );
    cat(paste("Calculation ended in   ", Sys.time(), "\n", sep="")); flush.console();
}, silent=FALSE )
print(proc.time()-ptime0)

# 以下のように叱られる。
Error in capture.output(., file = fout, append = FALSE) : 
   オブジェクト 'fout' がありません 

# 仕方ないので、いったんファイル名を直打ちさせてみる
ptime0 <- proc.time()
e1 <- try( {
    cat(paste("Calculation started in ", Sys.time(), "\n", sep="")); flush.console();
    temp <- om %>%
        dplyr::group_by(LID) %>%
        tidyr::nest() %>%
        furrr::future_map(  .x=.$data,
                            .f=~sim_dummy(  CqList=pluck(.$DNA, 1), EPCR=.$EPCR, 
                                            zeroAmount=.$zeroAmount, K=.$K, 
                                            beta=TRUE, diploid=.$diploid,
                                            maxtime=120, print.level=1, 
                                            aux=c(  replicate=.$replicate, rseed=.$rseed,
                                                    P=.$P, K=.$K, ntrap=.$ntrap, 
                                                    npertrap=.$npertrap, beta=TRUE  )  ) %T>%
                                capture.output( file=paste("otest", Sys.getpid(), "txt", sep="."), append=TRUE)
                            , .options=furrr_options(stdout=NA)  );
    cat(paste("Calculation ended in   ", Sys.time(), "\n", sep="")); flush.console();
}, silent=FALSE )
print(proc.time()-ptime0)

# 一応書き込みは発生するようになるが、今度は message が落ちて output しか出なくなる。
# 他に試してみた方法として、future_map() そのものを capture.output() で包む → 計算は進むがログが出ない。

invisible(clusterEvalQ(cl, try(close(fout)) ))

future::plan(sequential) 
nbrOfWorkers() 
(stopcl <- try( parallel::stopCluster(cl) )) # NULL なら成功

まず作ったはずの fout へ、future_map() 内の captue.output() からアクセスできずに躓いた。これは capture.output() が動作するとき、一時的に別環境が作られているためらしい。公式の capture.output のヘルプによれば、capture.output と sink の関係は with(data, expr, ...) と attach(what, ...) の関係らしい。つまり、attach は第一引数で指定したデータフレームから、各要素を一時的に現在のサーチパスへ持ってくる操作である。一方 with は、第一引数で指定したデータの直下に仮の環境を作り、その中でエクスプレッションを評価する。指定したデータフレームの列に、裸のオブジェクト名でアクセス可能となる点では同じだが、動作する環境が違うのだ。

さらにいうと、capture.output() そのものが sink() の機能を経由して書き込みを実現しているので、回りくどいことをせず直接 sink() を使うべきだろう。