[ https://issues.apache.org/jira/browse/SPARK-34033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tom Howland updated SPARK-34033: -------------------------------- Description: Provide a way for users to initialize the sparkR daemon before it forks. I'm a contractor to Target, where we have several projects doing ML with sparkR. The changes proposed here results in weeks of compute-time saved with every run. (40000 partitions) * (5 seconds to load our R libraries) * (2 calls to gapply in our app) / 60 / 60 = 111 hours. (from [docs/sparkr.md|https://github.com/WamBamBoozle/spark/blob/daemon_init/docs/sparkr.md#daemon-initialization]) h3. Daemon Initialization If your worker function has a lengthy initialization, and your application has lots of partitions, you may find you are spending weeks of compute time repeatedly doing something that should have taken a few seconds during daemon initialization. Every Spark executor spawns a process running an R daemon. The daemon "forks a copy" of itself whenever Spark finds work for it to do. It may be applying a predefined method such as "max", or it may be applying your worker function. SparkR::gapply arranges things so that your worker function will be called with each group. A group is the pair Key-Seq[Row]. In the absence of partitioning, the daemon will fork for every group found. With partitioning, the daemon will fork for every partition found. A partition may have several groups in it. All the initializations and library loading your worker function manages is thrown away when the fork concludes. Every fork has to be initialized. The configuration spark.r.daemonInit provides a way to avoid reloading packages every time the daemon forks by having the daemon pre-load packages. You do this by providing R code to initialize the daemon for your application. h4. Examples Suppose we want library(wow) to be pre-loaded for our workers. {{sparkR.session(spark.r.daemonInit = 'library(wow)')}} of course, that would only work if we knew that library(wow) was on our path and available on the executor. If we have to ship the library, we can use YARN sparkR.session( master = 'yarn', spark.r.daemonInit = '.libPaths(c("wowTarget", .libPaths())); library(wow)', spark.submit.deployMode = 'client', spark.yarn.dist.archives = 'wow.zip#wowTarget') YARN creates a directory for the new executor, unzips 'wow.zip' in some other directory, and then provides a symlink to it called ./wowTarget. When the executor starts the daemon, the daemon loads library(wow) from the newly created wowTarget. Warning: if your initialization takes longer than 10 seconds, consider increasing the configuration [spark.r.daemonTimeout](configuration.md#sparkr). was: Provide a way for users to initialize the sparkR daemon before it forks. I'm a contractor to Target, where we have several projects doing ML with sparkR. The changes proposed here results in weeks of compute-time saved with every run. (40000 partitions) * (5 seconds to load our R libraries) * (2 calls to gapply in our app) / 60 / 60 = 111 hours. (from [docs/sparkr.md|https://github.com/WamBamBoozle/spark/blob/daemon_init/docs/sparkr.md#daemon-initialization]) h3. Daemon Initialization If your worker function has a lengthy initialization, and your application has lots of partitions, you may find you are spending weeks of compute time repeatedly doing something that should have taken a few seconds during daemon initialization. Every Spark executor spawns a process running an R daemon. The daemon "forks a copy" of itself whenever Spark finds work for it to do. It may be applying a predefined method such as "max", or it may be applying your worker function. SparkR::gapply arranges things so that your worker function will be called with each group. A group is the pair Key-Seq[Row]. In the absence of partitioning, the daemon will fork for every group found. With partitioning, the daemon will fork for every partition found. A partition may have several groups in it. All the initializations and library loading your worker function manages is thrown away when the fork concludes. Every fork has to be initialized. The configuration spark.r.daemonInit provides a way to avoid reloading packages every time the daemon forks by having the daemon pre-load packages. You do this by providing R code to initialize the daemon for your application. h4. Examples Suppose we want library(wow) to be pre-loaded for our workers. {{sparkR.session(spark.r.daemonInit = 'library(wow)')}} of course, that would only work if we knew that library(wow) was on our path and available on the executor. If we have to ship the library, we can use YARN {{sparkR.session( master = 'yarn', spark.r.daemonInit = '.libPaths(c("wowTarget", .libPaths())); library(wow)', spark.submit.deployMode = 'client', spark.yarn.dist.archives = 'wow.zip#wowTarget') }} YARN creates a directory for the new executor, unzips 'wow.zip' in some other directory, and then provides a symlink to it called ./wowTarget. When the executor starts the daemon, the daemon loads library(wow) from the newly created wowTarget. Warning: if your initialization takes longer than 10 seconds, consider increasing the configuration [spark.r.daemonTimeout](configuration.md#sparkr). > SparkR Daemon Initialization > ---------------------------- > > Key: SPARK-34033 > URL: https://issues.apache.org/jira/browse/SPARK-34033 > Project: Spark > Issue Type: Improvement > Components: R, SparkR > Affects Versions: 3.2.0 > Environment: tested on centos 7 & spark 2.3.1 and on my mac & spark > at master > Reporter: Tom Howland > Priority: Major > Original Estimate: 0h > Remaining Estimate: 0h > > Provide a way for users to initialize the sparkR daemon before it forks. > I'm a contractor to Target, where we have several projects doing ML with > sparkR. The changes proposed here results in weeks of compute-time saved with > every run. > (40000 partitions) * (5 seconds to load our R libraries) * (2 calls to gapply > in our app) / 60 / 60 = 111 hours. > (from > [docs/sparkr.md|https://github.com/WamBamBoozle/spark/blob/daemon_init/docs/sparkr.md#daemon-initialization]) > h3. Daemon Initialization > If your worker function has a lengthy initialization, and your > application has lots of partitions, you may find you are spending weeks > of compute time repeatedly doing something that should have taken a few > seconds during daemon initialization. > Every Spark executor spawns a process running an R daemon. The daemon > "forks a copy" of itself whenever Spark finds work for it to do. It may > be applying a predefined method such as "max", or it may be applying > your worker function. SparkR::gapply arranges things so that your worker > function will be called with each group. A group is the pair > Key-Seq[Row]. In the absence of partitioning, the daemon will fork for > every group found. With partitioning, the daemon will fork for every > partition found. A partition may have several groups in it. > All the initializations and library loading your worker function manages > is thrown away when the fork concludes. Every fork has to be > initialized. > The configuration spark.r.daemonInit provides a way to avoid reloading > packages every time the daemon forks by having the daemon pre-load > packages. You do this by providing R code to initialize the daemon for > your application. > h4. Examples > Suppose we want library(wow) to be pre-loaded for our workers. > {{sparkR.session(spark.r.daemonInit = 'library(wow)')}} > of course, that would only work if we knew that library(wow) was on our > path and available on the executor. If we have to ship the library, we > can use YARN > sparkR.session( > master = 'yarn', > spark.r.daemonInit = '.libPaths(c("wowTarget", .libPaths())); > library(wow)', > spark.submit.deployMode = 'client', > spark.yarn.dist.archives = 'wow.zip#wowTarget') > YARN creates a directory for the new executor, unzips 'wow.zip' in some > other directory, and then provides a symlink to it called > ./wowTarget. When the executor starts the daemon, the daemon loads > library(wow) from the newly created wowTarget. > Warning: if your initialization takes longer than 10 seconds, consider > increasing the configuration > [spark.r.daemonTimeout](configuration.md#sparkr). -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org