Re: [Rd] Wish: a way to track progress of parallel operations

2024-04-09 Thread Ivan Krylov via R-devel
Dear Henrik (and everyone else):

Here's a patch implementing support for immediateConditions in
'parallel' socket clusters. What do you think?

I've tried to make the feature backwards-compatible in the sense that
an older R starting a newer cluster worker will not pass the flag
enabling condition passing and so will avoid being confused by packets
with type = 'CONDITION'.

In order to propagate the conditions in a timely manner, all 'parallel'
functions that currently use recvData() on individual nodes will have
to switch to calling recvOneData(). I've already adjusted
staticClusterApply(), but e.g. clusterCall() would still postpone
immediateConditions from nodes later in the list (should they appear).

If this is deemed a good way forward, I can prepare a similar patch for
the MPI and socket clusters implemented in the 'snow' package.

-- 
Best regards,
Ivan
Index: src/library/parallel/R/clusterApply.R
===
--- src/library/parallel/R/clusterApply.R	(revision 86373)
+++ src/library/parallel/R/clusterApply.R	(working copy)
@@ -28,8 +28,12 @@
 end <- min(n, start + p - 1L)
 	jobs <- end - start + 1L
 for (i in 1:jobs)
-sendCall(cl[[i]], fun, argfun(start + i - 1L))
-val[start:end] <- lapply(cl[1:jobs], recvResult)
+sendCall(cl[[i]], fun, argfun(start + i - 1L),
+ tag = start + i - 1L)
+for (i in 1:jobs) {
+d <- recvOneResult(cl)
+val[d$tag] <- list(d$value)
+}
 start <- start + jobs
 }
 checkForRemoteErrors(val)
Index: src/library/parallel/R/snow.R
===
--- src/library/parallel/R/snow.R	(revision 86373)
+++ src/library/parallel/R/snow.R	(working copy)
@@ -120,7 +120,8 @@
 rprog = file.path(R.home("bin"), "R"),
 snowlib = .libPaths()[1],
 useRscript = TRUE, # for use by snow clusters
-useXDR = TRUE)
+useXDR = TRUE,
+forward_conditions = TRUE)
 defaultClusterOptions <<- addClusterOptions(emptyenv(), options)
 }
 
Index: src/library/parallel/R/snowSOCK.R
===
--- src/library/parallel/R/snowSOCK.R	(revision 86373)
+++ src/library/parallel/R/snowSOCK.R	(working copy)
@@ -32,6 +32,7 @@
 methods <- getClusterOption("methods", options)
 useXDR <- getClusterOption("useXDR", options)
 homogeneous <- getClusterOption("homogeneous", options)
+forward_conditions <- getClusterOption('forward_conditions', options)
 
 ## build the local command for starting the worker
 env <- paste0("MASTER=", master,
@@ -40,7 +41,8 @@
  " SETUPTIMEOUT=", setup_timeout,
  " TIMEOUT=", timeout,
  " XDR=", useXDR,
- " SETUPSTRATEGY=", setup_strategy)
+ " SETUPSTRATEGY=", setup_strategy,
+ " FORWARDCONDITIONS=", forward_conditions)
 ## Should cmd be run on a worker with R <= 4.0.2,
 ## .workRSOCK will not exist, so fallback to .slaveRSOCK
 arg <- "tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK)()"
@@ -130,17 +132,26 @@
 sendData.SOCKnode <- function(node, data) serialize(data, node$con)
 sendData.SOCK0node <- function(node, data) serialize(data, node$con, xdr = FALSE)
 
-recvData.SOCKnode <- recvData.SOCK0node <- function(node) unserialize(node$con)
+recvData.SOCKnode <- recvData.SOCK0node <- function(node) repeat {
+val <- unserialize(node$con)
+if (val$type != 'CONDITION') return(val)
+signalCondition(val$value)
+}
 
 recvOneData.SOCKcluster <- function(cl)
 {
 socklist <- lapply(cl, function(x) x$con)
 repeat {
-ready <- socketSelect(socklist)
-if (length(ready) > 0) break;
+repeat {
+ready <- socketSelect(socklist)
+if (length(ready) > 0) break;
+}
+n <- which.max(ready) # may need rotation or some such for fairness
+value <- unserialize(socklist[[n]])
+if (value$type != 'CONDITION')
+return(list(node = n, value = value))
+signalCondition(value$value)
 }
-n <- which.max(ready) # may need rotation or some such for fairness
-list(node = n, value = unserialize(socklist[[n]]))
 }
 
 makePSOCKcluster <- function(names, ...)
@@ -349,6 +360,7 @@
 timeout <- 2592000L   # wait 30 days for new cmds before failing
 useXDR <- TRUE# binary serialization
 setup_strategy <- "sequential"
+forward_conditions <- FALSE
 
 for (a in commandArgs(TRUE)) {
 ## Or use strsplit?
@@ -365,6 +377,9 @@
SETUPSTRATEGY = {
setup_strategy <- match.arg(value,
c("sequential", "parallel")

Re: [Rd] Wish: a way to track progress of parallel operations

2024-03-26 Thread Ivan Krylov via R-devel
Henrik,

Thank you for taking the time to read and reply to my message!

On Mon, 25 Mar 2024 10:19:38 -0700
Henrik Bengtsson  wrote:

> * Target a solution that works the same regardless whether we run in
> parallel or not, i.e. the code/API should look the same regardless of
> using, say, parallel::parLapply(), parallel::mclapply(), or
> base::lapply(). The solution should also work as-is in other parallel
> frameworks.

You are absolutely right about mclapply(): it suffers from the same
problem where the task running inside it has no reliable mechanism of
reporting progress. Just like on a 'parallel' cluster (which can be
running on top of an R connection, MPI, the 'mirai' package, a server
pretending to be multiple cluster nodes, or something completely
different), there is currently no documented interface for the task to
report any additional data except the result of the computation.

> I argue the end-user should be able to decided whether they want to
> "see" progress updates or not, and the developer should focus on
> where to report on progress, but not how and when.

Agreed. As a package developer, I don't even want to bother calling
setTxtProgressBar(...), but it gets most of the job done at zero
dependency cost, and the users don't complain. The situation could
definitely be improved.

> It is possible to use the existing PSOCK socket connections to send
> such 'immediateCondition':s.

Thanks for pointing me towards ClusterFuture, that's a great hack, and
conditions are a much better fit for progress tracking than callbacks.

It would be even better if 'parallel' clusters could "officially"
handle immediateConditions and re-signal them in the main R session.
Since R-4.4 exports (but not yet documents) sendData, recvData and
recvOneData generics from 'parallel', we are still in a position to
codify and implement the change to the 'parallel' cluster back-end API.

It shouldn't be too hard to document the requirement that recvData() /
recvOneData() must signal immediateConditions arriving from the nodes
and patch the existing cluster types (socket and MPI). Not sure how
hard it will be to implement for 'mirai' clusters.

> I honestly think we could arrive at a solution where base-R proposes
> a very light, yet powerful, progress API that handles all of the
> above. The main task is to come up with a standard API/protocol -
> then the implementation does not matter.

Since you've already given it a lot of thought, which parts of
progressr would you suggest for inclusion into R, besides 'parallel'
clusters and mclapply() forwarding immediateConditions from the worker
processes?

-- 
Best regards,
Ivan

__
R-devel@r-project.org mailing list
https://stat.ethz.ch/mailman/listinfo/r-devel


Re: [Rd] Wish: a way to track progress of parallel operations

2024-03-25 Thread Stephen H. Dawson, DSL via R-devel
Thanks Ivan and Henrik for considering this work. It would be a valuable 
contribution.


Kindly,
*Stephen Dawson, DSL*
/Executive Strategy Consultant/
Business & Technology
+1 (865) 804-3454
http://www.shdawson.com


On 3/25/24 13:19, Henrik Bengtsson wrote:

Hello,

thanks for bringing this topic up, and it would be excellent if we
could come of with a generic solution for this in base R.  It is one
of the top frequently asked questions and requested features in
parallel processing, but also in sequential processing. We have also
seen lots of variants on how to attack the problem of reporting on
progress when running in parallel.

As the author Futureverse (a parallel framework), I've been exposed to
these requests and I thought quite a bit about how we could solve this
problem. I'll outline my opinionated view and suggestions on this
below:

* Target a solution that works the same regardless whether we run in
parallel or not, i.e. the code/API should look the same regardless of
using, say, parallel::parLapply(), parallel::mclapply(), or
base::lapply(). The solution should also work as-is in other parallel
frameworks.

* Consider who owns the control of whether progress updates should be
reported or not. I believe it's best to separate what the end-user and
the developer controls.  I argue the end-user should be able to
decided whether they want to "see" progress updates or not, and the
developer should focus on where to report on progress, but not how and
when.

* In line with the previous comment, controlling progress reporting
via an argument (e.g. `.progress`) is not powerful enough. With such
an approach, one need to make sure that that argument is exposed and
relayed throughout in all nested function calls. If a package decides
to introduce such an argument, what should the default be? If they set
`.progress = TRUE`, then all of a sudden, any code/packages that
depend on this function will all of a sudden see progress updates.
There are endless per-package versions of this on CRAN and
Bioconductor, any they rarely work in harmony.

* Consider accessibility as well as graphical user interfaces. This
means, don't assume progress is necessarily reported in the terminal.
I found it a good practice to never use the term "progress bar",
because that is too focused on how progress is reported.

* Let the end-user control how progress is reported, e.g. a progress
bar in the terminal, a progress bar in their favorite IDE/GUI,
OS-specific notifications, third-party notification services, auditory
output, etc.

The above objectives challenge you to take a step back and think about
what progress reporting is about, because the most immediate needs.
Based on these, I came up with the 'progressr' package
(https://progressr.futureverse.org/). FWIW, it was originally actually
meant to be a proof-of-concept proposal for a universal, generic
solution to this problem, but as the demands grew and the prototype
showed to be useful, I made it official.  Here is the gist:

* Motto: "The developer is responsible for providing progress updates,
but it’s only the end user who decides if, when, and how progress
should be presented. No exceptions will be allowed."

* It rely on R's condition system to signal progress. The developer
signals progress conditions. Condition handlers, which the end-user
controls, are used to report/render these progress updates. The
support for global condition handlers, introduced in R 4.0.0, makes
this much more convenient. It is useful to think of the condition
mechanism in R as a back channel for communication that operates
separately from the rest of the "communication" stream (calling
functions with arguments and returning value).

* For parallel processing, progress conditions can be relayed back to
the parent process via back channels in a "near-live" fashion, or at
the very end when the parallel task is completed. Technically,
progress conditions inherit from 'immediateCondition', which is a
special class indicating that such conditions are allowed to be
relayed immediately and out of order. It is possible to use the
existing PSOCK socket connections to send such 'immediateCondition':s.

* No assumption is made on progress updates arriving in a certain
order. They are just a stream of "progress of this and that amount"
was made.

* There is a progress handler API. Using this API, various types of
progress reporting can be implemented. This allows anyone to implement
progress handlers in contributed R packages.

See https://progressr.futureverse.org/ for more details.


I would be happy to prepare code and documentation. If there is no time now, we 
can return to it after R-4.4 is released.

I strongly recommend to not rush this. This is an important, big
problem that goes beyond the 'parallel' package. I think it would be a
disfavor to introduce a '.progress' argument. As mentioned above, I
think a solution should work throughout the R ecosystem - all base-R
packages and beyond. I honestl

Re: [Rd] Wish: a way to track progress of parallel operations

2024-03-25 Thread Henrik Bengtsson
Hello,

thanks for bringing this topic up, and it would be excellent if we
could come of with a generic solution for this in base R.  It is one
of the top frequently asked questions and requested features in
parallel processing, but also in sequential processing. We have also
seen lots of variants on how to attack the problem of reporting on
progress when running in parallel.

As the author Futureverse (a parallel framework), I've been exposed to
these requests and I thought quite a bit about how we could solve this
problem. I'll outline my opinionated view and suggestions on this
below:

* Target a solution that works the same regardless whether we run in
parallel or not, i.e. the code/API should look the same regardless of
using, say, parallel::parLapply(), parallel::mclapply(), or
base::lapply(). The solution should also work as-is in other parallel
frameworks.

* Consider who owns the control of whether progress updates should be
reported or not. I believe it's best to separate what the end-user and
the developer controls.  I argue the end-user should be able to
decided whether they want to "see" progress updates or not, and the
developer should focus on where to report on progress, but not how and
when.

* In line with the previous comment, controlling progress reporting
via an argument (e.g. `.progress`) is not powerful enough. With such
an approach, one need to make sure that that argument is exposed and
relayed throughout in all nested function calls. If a package decides
to introduce such an argument, what should the default be? If they set
`.progress = TRUE`, then all of a sudden, any code/packages that
depend on this function will all of a sudden see progress updates.
There are endless per-package versions of this on CRAN and
Bioconductor, any they rarely work in harmony.

* Consider accessibility as well as graphical user interfaces. This
means, don't assume progress is necessarily reported in the terminal.
I found it a good practice to never use the term "progress bar",
because that is too focused on how progress is reported.

* Let the end-user control how progress is reported, e.g. a progress
bar in the terminal, a progress bar in their favorite IDE/GUI,
OS-specific notifications, third-party notification services, auditory
output, etc.

The above objectives challenge you to take a step back and think about
what progress reporting is about, because the most immediate needs.
Based on these, I came up with the 'progressr' package
(https://progressr.futureverse.org/). FWIW, it was originally actually
meant to be a proof-of-concept proposal for a universal, generic
solution to this problem, but as the demands grew and the prototype
showed to be useful, I made it official.  Here is the gist:

* Motto: "The developer is responsible for providing progress updates,
but it’s only the end user who decides if, when, and how progress
should be presented. No exceptions will be allowed."

* It rely on R's condition system to signal progress. The developer
signals progress conditions. Condition handlers, which the end-user
controls, are used to report/render these progress updates. The
support for global condition handlers, introduced in R 4.0.0, makes
this much more convenient. It is useful to think of the condition
mechanism in R as a back channel for communication that operates
separately from the rest of the "communication" stream (calling
functions with arguments and returning value).

* For parallel processing, progress conditions can be relayed back to
the parent process via back channels in a "near-live" fashion, or at
the very end when the parallel task is completed. Technically,
progress conditions inherit from 'immediateCondition', which is a
special class indicating that such conditions are allowed to be
relayed immediately and out of order. It is possible to use the
existing PSOCK socket connections to send such 'immediateCondition':s.

* No assumption is made on progress updates arriving in a certain
order. They are just a stream of "progress of this and that amount"
was made.

* There is a progress handler API. Using this API, various types of
progress reporting can be implemented. This allows anyone to implement
progress handlers in contributed R packages.

See https://progressr.futureverse.org/ for more details.

> I would be happy to prepare code and documentation. If there is no time now, 
> we can return to it after R-4.4 is released.

I strongly recommend to not rush this. This is an important, big
problem that goes beyond the 'parallel' package. I think it would be a
disfavor to introduce a '.progress' argument. As mentioned above, I
think a solution should work throughout the R ecosystem - all base-R
packages and beyond. I honestly think we could arrive at a solution
where base-R proposes a very light, yet powerful, progress API that
handles all of the above. The main task is to come up with a standard
API/protocol - then the implementation does not matter.

/Henrik

On Mon, Mar 25,