Re: [Rd] Wish: a way to track progress of parallel operations
Dear Henrik (and everyone else): Here's a patch implementing support for immediateConditions in 'parallel' socket clusters. What do you think? I've tried to make the feature backwards-compatible in the sense that an older R starting a newer cluster worker will not pass the flag enabling condition passing and so will avoid being confused by packets with type = 'CONDITION'. In order to propagate the conditions in a timely manner, all 'parallel' functions that currently use recvData() on individual nodes will have to switch to calling recvOneData(). I've already adjusted staticClusterApply(), but e.g. clusterCall() would still postpone immediateConditions from nodes later in the list (should they appear). If this is deemed a good way forward, I can prepare a similar patch for the MPI and socket clusters implemented in the 'snow' package. -- Best regards, Ivan Index: src/library/parallel/R/clusterApply.R === --- src/library/parallel/R/clusterApply.R (revision 86373) +++ src/library/parallel/R/clusterApply.R (working copy) @@ -28,8 +28,12 @@ end <- min(n, start + p - 1L) jobs <- end - start + 1L for (i in 1:jobs) -sendCall(cl[[i]], fun, argfun(start + i - 1L)) -val[start:end] <- lapply(cl[1:jobs], recvResult) +sendCall(cl[[i]], fun, argfun(start + i - 1L), + tag = start + i - 1L) +for (i in 1:jobs) { +d <- recvOneResult(cl) +val[d$tag] <- list(d$value) +} start <- start + jobs } checkForRemoteErrors(val) Index: src/library/parallel/R/snow.R === --- src/library/parallel/R/snow.R (revision 86373) +++ src/library/parallel/R/snow.R (working copy) @@ -120,7 +120,8 @@ rprog = file.path(R.home("bin"), "R"), snowlib = .libPaths()[1], useRscript = TRUE, # for use by snow clusters -useXDR = TRUE) +useXDR = TRUE, +forward_conditions = TRUE) defaultClusterOptions <<- addClusterOptions(emptyenv(), options) } Index: src/library/parallel/R/snowSOCK.R === --- src/library/parallel/R/snowSOCK.R (revision 86373) +++ src/library/parallel/R/snowSOCK.R (working copy) @@ -32,6 +32,7 @@ methods <- getClusterOption("methods", options) useXDR <- getClusterOption("useXDR", options) homogeneous <- getClusterOption("homogeneous", options) +forward_conditions <- getClusterOption('forward_conditions', options) ## build the local command for starting the worker env <- paste0("MASTER=", master, @@ -40,7 +41,8 @@ " SETUPTIMEOUT=", setup_timeout, " TIMEOUT=", timeout, " XDR=", useXDR, - " SETUPSTRATEGY=", setup_strategy) + " SETUPSTRATEGY=", setup_strategy, + " FORWARDCONDITIONS=", forward_conditions) ## Should cmd be run on a worker with R <= 4.0.2, ## .workRSOCK will not exist, so fallback to .slaveRSOCK arg <- "tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK)()" @@ -130,17 +132,26 @@ sendData.SOCKnode <- function(node, data) serialize(data, node$con) sendData.SOCK0node <- function(node, data) serialize(data, node$con, xdr = FALSE) -recvData.SOCKnode <- recvData.SOCK0node <- function(node) unserialize(node$con) +recvData.SOCKnode <- recvData.SOCK0node <- function(node) repeat { +val <- unserialize(node$con) +if (val$type != 'CONDITION') return(val) +signalCondition(val$value) +} recvOneData.SOCKcluster <- function(cl) { socklist <- lapply(cl, function(x) x$con) repeat { -ready <- socketSelect(socklist) -if (length(ready) > 0) break; +repeat { +ready <- socketSelect(socklist) +if (length(ready) > 0) break; +} +n <- which.max(ready) # may need rotation or some such for fairness +value <- unserialize(socklist[[n]]) +if (value$type != 'CONDITION') +return(list(node = n, value = value)) +signalCondition(value$value) } -n <- which.max(ready) # may need rotation or some such for fairness -list(node = n, value = unserialize(socklist[[n]])) } makePSOCKcluster <- function(names, ...) @@ -349,6 +360,7 @@ timeout <- 2592000L # wait 30 days for new cmds before failing useXDR <- TRUE# binary serialization setup_strategy <- "sequential" +forward_conditions <- FALSE for (a in commandArgs(TRUE)) { ## Or use strsplit? @@ -365,6 +377,9 @@ SETUPSTRATEGY = { setup_strategy <- match.arg(value, c("sequential", "parallel")
Re: [Rd] Wish: a way to track progress of parallel operations
Henrik, Thank you for taking the time to read and reply to my message! On Mon, 25 Mar 2024 10:19:38 -0700 Henrik Bengtsson wrote: > * Target a solution that works the same regardless whether we run in > parallel or not, i.e. the code/API should look the same regardless of > using, say, parallel::parLapply(), parallel::mclapply(), or > base::lapply(). The solution should also work as-is in other parallel > frameworks. You are absolutely right about mclapply(): it suffers from the same problem where the task running inside it has no reliable mechanism of reporting progress. Just like on a 'parallel' cluster (which can be running on top of an R connection, MPI, the 'mirai' package, a server pretending to be multiple cluster nodes, or something completely different), there is currently no documented interface for the task to report any additional data except the result of the computation. > I argue the end-user should be able to decided whether they want to > "see" progress updates or not, and the developer should focus on > where to report on progress, but not how and when. Agreed. As a package developer, I don't even want to bother calling setTxtProgressBar(...), but it gets most of the job done at zero dependency cost, and the users don't complain. The situation could definitely be improved. > It is possible to use the existing PSOCK socket connections to send > such 'immediateCondition':s. Thanks for pointing me towards ClusterFuture, that's a great hack, and conditions are a much better fit for progress tracking than callbacks. It would be even better if 'parallel' clusters could "officially" handle immediateConditions and re-signal them in the main R session. Since R-4.4 exports (but not yet documents) sendData, recvData and recvOneData generics from 'parallel', we are still in a position to codify and implement the change to the 'parallel' cluster back-end API. It shouldn't be too hard to document the requirement that recvData() / recvOneData() must signal immediateConditions arriving from the nodes and patch the existing cluster types (socket and MPI). Not sure how hard it will be to implement for 'mirai' clusters. > I honestly think we could arrive at a solution where base-R proposes > a very light, yet powerful, progress API that handles all of the > above. The main task is to come up with a standard API/protocol - > then the implementation does not matter. Since you've already given it a lot of thought, which parts of progressr would you suggest for inclusion into R, besides 'parallel' clusters and mclapply() forwarding immediateConditions from the worker processes? -- Best regards, Ivan __ R-devel@r-project.org mailing list https://stat.ethz.ch/mailman/listinfo/r-devel
Re: [Rd] Wish: a way to track progress of parallel operations
Thanks Ivan and Henrik for considering this work. It would be a valuable contribution. Kindly, *Stephen Dawson, DSL* /Executive Strategy Consultant/ Business & Technology +1 (865) 804-3454 http://www.shdawson.com On 3/25/24 13:19, Henrik Bengtsson wrote: Hello, thanks for bringing this topic up, and it would be excellent if we could come of with a generic solution for this in base R. It is one of the top frequently asked questions and requested features in parallel processing, but also in sequential processing. We have also seen lots of variants on how to attack the problem of reporting on progress when running in parallel. As the author Futureverse (a parallel framework), I've been exposed to these requests and I thought quite a bit about how we could solve this problem. I'll outline my opinionated view and suggestions on this below: * Target a solution that works the same regardless whether we run in parallel or not, i.e. the code/API should look the same regardless of using, say, parallel::parLapply(), parallel::mclapply(), or base::lapply(). The solution should also work as-is in other parallel frameworks. * Consider who owns the control of whether progress updates should be reported or not. I believe it's best to separate what the end-user and the developer controls. I argue the end-user should be able to decided whether they want to "see" progress updates or not, and the developer should focus on where to report on progress, but not how and when. * In line with the previous comment, controlling progress reporting via an argument (e.g. `.progress`) is not powerful enough. With such an approach, one need to make sure that that argument is exposed and relayed throughout in all nested function calls. If a package decides to introduce such an argument, what should the default be? If they set `.progress = TRUE`, then all of a sudden, any code/packages that depend on this function will all of a sudden see progress updates. There are endless per-package versions of this on CRAN and Bioconductor, any they rarely work in harmony. * Consider accessibility as well as graphical user interfaces. This means, don't assume progress is necessarily reported in the terminal. I found it a good practice to never use the term "progress bar", because that is too focused on how progress is reported. * Let the end-user control how progress is reported, e.g. a progress bar in the terminal, a progress bar in their favorite IDE/GUI, OS-specific notifications, third-party notification services, auditory output, etc. The above objectives challenge you to take a step back and think about what progress reporting is about, because the most immediate needs. Based on these, I came up with the 'progressr' package (https://progressr.futureverse.org/). FWIW, it was originally actually meant to be a proof-of-concept proposal for a universal, generic solution to this problem, but as the demands grew and the prototype showed to be useful, I made it official. Here is the gist: * Motto: "The developer is responsible for providing progress updates, but it’s only the end user who decides if, when, and how progress should be presented. No exceptions will be allowed." * It rely on R's condition system to signal progress. The developer signals progress conditions. Condition handlers, which the end-user controls, are used to report/render these progress updates. The support for global condition handlers, introduced in R 4.0.0, makes this much more convenient. It is useful to think of the condition mechanism in R as a back channel for communication that operates separately from the rest of the "communication" stream (calling functions with arguments and returning value). * For parallel processing, progress conditions can be relayed back to the parent process via back channels in a "near-live" fashion, or at the very end when the parallel task is completed. Technically, progress conditions inherit from 'immediateCondition', which is a special class indicating that such conditions are allowed to be relayed immediately and out of order. It is possible to use the existing PSOCK socket connections to send such 'immediateCondition':s. * No assumption is made on progress updates arriving in a certain order. They are just a stream of "progress of this and that amount" was made. * There is a progress handler API. Using this API, various types of progress reporting can be implemented. This allows anyone to implement progress handlers in contributed R packages. See https://progressr.futureverse.org/ for more details. I would be happy to prepare code and documentation. If there is no time now, we can return to it after R-4.4 is released. I strongly recommend to not rush this. This is an important, big problem that goes beyond the 'parallel' package. I think it would be a disfavor to introduce a '.progress' argument. As mentioned above, I think a solution should work throughout the R ecosystem - all base-R packages and beyond. I honestl
Re: [Rd] Wish: a way to track progress of parallel operations
Hello, thanks for bringing this topic up, and it would be excellent if we could come of with a generic solution for this in base R. It is one of the top frequently asked questions and requested features in parallel processing, but also in sequential processing. We have also seen lots of variants on how to attack the problem of reporting on progress when running in parallel. As the author Futureverse (a parallel framework), I've been exposed to these requests and I thought quite a bit about how we could solve this problem. I'll outline my opinionated view and suggestions on this below: * Target a solution that works the same regardless whether we run in parallel or not, i.e. the code/API should look the same regardless of using, say, parallel::parLapply(), parallel::mclapply(), or base::lapply(). The solution should also work as-is in other parallel frameworks. * Consider who owns the control of whether progress updates should be reported or not. I believe it's best to separate what the end-user and the developer controls. I argue the end-user should be able to decided whether they want to "see" progress updates or not, and the developer should focus on where to report on progress, but not how and when. * In line with the previous comment, controlling progress reporting via an argument (e.g. `.progress`) is not powerful enough. With such an approach, one need to make sure that that argument is exposed and relayed throughout in all nested function calls. If a package decides to introduce such an argument, what should the default be? If they set `.progress = TRUE`, then all of a sudden, any code/packages that depend on this function will all of a sudden see progress updates. There are endless per-package versions of this on CRAN and Bioconductor, any they rarely work in harmony. * Consider accessibility as well as graphical user interfaces. This means, don't assume progress is necessarily reported in the terminal. I found it a good practice to never use the term "progress bar", because that is too focused on how progress is reported. * Let the end-user control how progress is reported, e.g. a progress bar in the terminal, a progress bar in their favorite IDE/GUI, OS-specific notifications, third-party notification services, auditory output, etc. The above objectives challenge you to take a step back and think about what progress reporting is about, because the most immediate needs. Based on these, I came up with the 'progressr' package (https://progressr.futureverse.org/). FWIW, it was originally actually meant to be a proof-of-concept proposal for a universal, generic solution to this problem, but as the demands grew and the prototype showed to be useful, I made it official. Here is the gist: * Motto: "The developer is responsible for providing progress updates, but it’s only the end user who decides if, when, and how progress should be presented. No exceptions will be allowed." * It rely on R's condition system to signal progress. The developer signals progress conditions. Condition handlers, which the end-user controls, are used to report/render these progress updates. The support for global condition handlers, introduced in R 4.0.0, makes this much more convenient. It is useful to think of the condition mechanism in R as a back channel for communication that operates separately from the rest of the "communication" stream (calling functions with arguments and returning value). * For parallel processing, progress conditions can be relayed back to the parent process via back channels in a "near-live" fashion, or at the very end when the parallel task is completed. Technically, progress conditions inherit from 'immediateCondition', which is a special class indicating that such conditions are allowed to be relayed immediately and out of order. It is possible to use the existing PSOCK socket connections to send such 'immediateCondition':s. * No assumption is made on progress updates arriving in a certain order. They are just a stream of "progress of this and that amount" was made. * There is a progress handler API. Using this API, various types of progress reporting can be implemented. This allows anyone to implement progress handlers in contributed R packages. See https://progressr.futureverse.org/ for more details. > I would be happy to prepare code and documentation. If there is no time now, > we can return to it after R-4.4 is released. I strongly recommend to not rush this. This is an important, big problem that goes beyond the 'parallel' package. I think it would be a disfavor to introduce a '.progress' argument. As mentioned above, I think a solution should work throughout the R ecosystem - all base-R packages and beyond. I honestly think we could arrive at a solution where base-R proposes a very light, yet powerful, progress API that handles all of the above. The main task is to come up with a standard API/protocol - then the implementation does not matter. /Henrik On Mon, Mar 25,