[ 
https://issues.apache.org/jira/browse/SPARK-34033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Howland updated SPARK-34033:
--------------------------------
    Description: 
Provide a way for users to initialize the sparkR daemon before it forks.

I'm a contractor to Target, where we have several projects doing ML with 
sparkR. The changes proposed here results in weeks of compute-time saved with 
every run.

(40000 partitions) * (5 seconds to load our R libraries) * (2 calls to gapply 
in our app) / 60 / 60 = 111 hours.

(from 
[docs/sparkr.md|https://github.com/WamBamBoozle/spark/blob/daemon_init/docs/sparkr.md#daemon-initialization])
h3. Daemon Initialization

If your worker function has a lengthy initialization, and your
 application has lots of partitions, you may find you are spending weeks
 of compute time repeatedly doing something that should have taken a few
 seconds during daemon initialization.

Every Spark executor spawns a process running an R daemon. The daemon
 "forks a copy" of itself whenever Spark finds work for it to do. It may
 be applying a predefined method such as "max", or it may be applying
 your worker function. SparkR::gapply arranges things so that your worker
 function will be called with each group. A group is the pair
 Key-Seq[Row]. In the absence of partitioning, the daemon will fork for
 every group found. With partitioning, the daemon will fork for every
 partition found. A partition may have several groups in it.

All the initializations and library loading your worker function manages
 is thrown away when the fork concludes. Every fork has to be
 initialized.

The configuration spark.r.daemonInit provides a way to avoid reloading
 packages every time the daemon forks by having the daemon pre-load
 packages. You do this by providing R code to initialize the daemon for
 your application.
h4. Examples

Suppose we want library(wow) to be pre-loaded for our workers.

{{sparkR.session(spark.r.daemonInit = 'library(wow)')}}

of course, that would only work if we knew that library(wow) was on our
 path and available on the executor. If we have to ship the library, we
 can use YARN

    sparkR.session(
       master = 'yarn',
       spark.r.daemonInit = '.libPaths(c("wowTarget", .libPaths())); 
library(wow)',
       spark.submit.deployMode = 'client',
       spark.yarn.dist.archives = 'wow.zip#wowTarget')

YARN creates a directory for the new executor, unzips 'wow.zip' in some
 other directory, and then provides a symlink to it called
 ./wowTarget. When the executor starts the daemon, the daemon loads
 library(wow) from the newly created wowTarget.

Warning: if your initialization takes longer than 10 seconds, consider
 increasing the configuration [spark.r.daemonTimeout](configuration.md#sparkr).

  was:
Provide a way for users to initialize the sparkR daemon before it forks.

I'm a contractor to Target, where we have several projects doing ML with 
sparkR. The changes proposed here results in weeks of compute-time saved with 
every run.

(40000 partitions) * (5 seconds to load our R libraries) * (2 calls to gapply 
in our app) / 60 / 60 = 111 hours.


(from 
[docs/sparkr.md|https://github.com/WamBamBoozle/spark/blob/daemon_init/docs/sparkr.md#daemon-initialization])
h3. Daemon Initialization

If your worker function has a lengthy initialization, and your
 application has lots of partitions, you may find you are spending weeks
 of compute time repeatedly doing something that should have taken a few
 seconds during daemon initialization.

Every Spark executor spawns a process running an R daemon. The daemon
 "forks a copy" of itself whenever Spark finds work for it to do. It may
 be applying a predefined method such as "max", or it may be applying
 your worker function. SparkR::gapply arranges things so that your worker
 function will be called with each group. A group is the pair
 Key-Seq[Row]. In the absence of partitioning, the daemon will fork for
 every group found. With partitioning, the daemon will fork for every
 partition found. A partition may have several groups in it.

All the initializations and library loading your worker function manages
 is thrown away when the fork concludes. Every fork has to be
 initialized.

The configuration spark.r.daemonInit provides a way to avoid reloading
 packages every time the daemon forks by having the daemon pre-load
 packages. You do this by providing R code to initialize the daemon for
 your application.
h4. Examples

Suppose we want library(wow) to be pre-loaded for our workers.

{{sparkR.session(spark.r.daemonInit = 'library(wow)')}}

of course, that would only work if we knew that library(wow) was on our
 path and available on the executor. If we have to ship the library, we
 can use YARN

{{sparkR.session(
 master = 'yarn',
 spark.r.daemonInit = '.libPaths(c("wowTarget", .libPaths())); library(wow)',
 spark.submit.deployMode = 'client',
 spark.yarn.dist.archives = 'wow.zip#wowTarget')
}}

YARN creates a directory for the new executor, unzips 'wow.zip' in some
 other directory, and then provides a symlink to it called
 ./wowTarget. When the executor starts the daemon, the daemon loads
 library(wow) from the newly created wowTarget.

Warning: if your initialization takes longer than 10 seconds, consider
 increasing the configuration [spark.r.daemonTimeout](configuration.md#sparkr).


> SparkR Daemon Initialization
> ----------------------------
>
>                 Key: SPARK-34033
>                 URL: https://issues.apache.org/jira/browse/SPARK-34033
>             Project: Spark
>          Issue Type: Improvement
>          Components: R, SparkR
>    Affects Versions: 3.2.0
>         Environment: tested on centos 7 & spark 2.3.1 and on my mac & spark 
> at master
>            Reporter: Tom Howland
>            Priority: Major
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> Provide a way for users to initialize the sparkR daemon before it forks.
> I'm a contractor to Target, where we have several projects doing ML with 
> sparkR. The changes proposed here results in weeks of compute-time saved with 
> every run.
> (40000 partitions) * (5 seconds to load our R libraries) * (2 calls to gapply 
> in our app) / 60 / 60 = 111 hours.
> (from 
> [docs/sparkr.md|https://github.com/WamBamBoozle/spark/blob/daemon_init/docs/sparkr.md#daemon-initialization])
> h3. Daemon Initialization
> If your worker function has a lengthy initialization, and your
>  application has lots of partitions, you may find you are spending weeks
>  of compute time repeatedly doing something that should have taken a few
>  seconds during daemon initialization.
> Every Spark executor spawns a process running an R daemon. The daemon
>  "forks a copy" of itself whenever Spark finds work for it to do. It may
>  be applying a predefined method such as "max", or it may be applying
>  your worker function. SparkR::gapply arranges things so that your worker
>  function will be called with each group. A group is the pair
>  Key-Seq[Row]. In the absence of partitioning, the daemon will fork for
>  every group found. With partitioning, the daemon will fork for every
>  partition found. A partition may have several groups in it.
> All the initializations and library loading your worker function manages
>  is thrown away when the fork concludes. Every fork has to be
>  initialized.
> The configuration spark.r.daemonInit provides a way to avoid reloading
>  packages every time the daemon forks by having the daemon pre-load
>  packages. You do this by providing R code to initialize the daemon for
>  your application.
> h4. Examples
> Suppose we want library(wow) to be pre-loaded for our workers.
> {{sparkR.session(spark.r.daemonInit = 'library(wow)')}}
> of course, that would only work if we knew that library(wow) was on our
>  path and available on the executor. If we have to ship the library, we
>  can use YARN
>     sparkR.session(
>        master = 'yarn',
>        spark.r.daemonInit = '.libPaths(c("wowTarget", .libPaths())); 
> library(wow)',
>        spark.submit.deployMode = 'client',
>        spark.yarn.dist.archives = 'wow.zip#wowTarget')
> YARN creates a directory for the new executor, unzips 'wow.zip' in some
>  other directory, and then provides a symlink to it called
>  ./wowTarget. When the executor starts the daemon, the daemon loads
>  library(wow) from the newly created wowTarget.
> Warning: if your initialization takes longer than 10 seconds, consider
>  increasing the configuration 
> [spark.r.daemonTimeout](configuration.md#sparkr).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to