Dear Henrik (and everyone else):

Here's a patch implementing support for immediateConditions in
'parallel' socket clusters. What do you think?

I've tried to make the feature backwards-compatible in the sense that
an older R starting a newer cluster worker will not pass the flag
enabling condition passing and so will avoid being confused by packets
with type = 'CONDITION'.

In order to propagate the conditions in a timely manner, all 'parallel'
functions that currently use recvData() on individual nodes will have
to switch to calling recvOneData(). I've already adjusted
staticClusterApply(), but e.g. clusterCall() would still postpone
immediateConditions from nodes later in the list (should they appear).

If this is deemed a good way forward, I can prepare a similar patch for
the MPI and socket clusters implemented in the 'snow' package.

-- 
Best regards,
Ivan
Index: src/library/parallel/R/clusterApply.R
===================================================================
--- src/library/parallel/R/clusterApply.R	(revision 86373)
+++ src/library/parallel/R/clusterApply.R	(working copy)
@@ -28,8 +28,12 @@
             end <- min(n, start + p - 1L)
 	    jobs <- end - start + 1L
             for (i in 1:jobs)
-                sendCall(cl[[i]], fun, argfun(start + i - 1L))
-            val[start:end] <- lapply(cl[1:jobs], recvResult)
+                sendCall(cl[[i]], fun, argfun(start + i - 1L),
+                         tag = start + i - 1L)
+            for (i in 1:jobs) {
+                d <- recvOneResult(cl)
+                val[d$tag] <- list(d$value)
+            }
             start <- start + jobs
         }
         checkForRemoteErrors(val)
Index: src/library/parallel/R/snow.R
===================================================================
--- src/library/parallel/R/snow.R	(revision 86373)
+++ src/library/parallel/R/snow.R	(working copy)
@@ -120,7 +120,8 @@
                     rprog = file.path(R.home("bin"), "R"),
                     snowlib = .libPaths()[1],
                     useRscript = TRUE, # for use by snow clusters
-                    useXDR = TRUE)
+                    useXDR = TRUE,
+                    forward_conditions = TRUE)
     defaultClusterOptions <<- addClusterOptions(emptyenv(), options)
 }
 
Index: src/library/parallel/R/snowSOCK.R
===================================================================
--- src/library/parallel/R/snowSOCK.R	(revision 86373)
+++ src/library/parallel/R/snowSOCK.R	(working copy)
@@ -32,6 +32,7 @@
     methods <- getClusterOption("methods", options)
     useXDR <- getClusterOption("useXDR", options)
     homogeneous <- getClusterOption("homogeneous", options)
+    forward_conditions <- getClusterOption('forward_conditions', options)
 
     ## build the local command for starting the worker
     env <- paste0("MASTER=", master,
@@ -40,7 +41,8 @@
                  " SETUPTIMEOUT=", setup_timeout,
                  " TIMEOUT=", timeout,
                  " XDR=", useXDR,
-                 " SETUPSTRATEGY=", setup_strategy)
+                 " SETUPSTRATEGY=", setup_strategy,
+                 " FORWARDCONDITIONS=", forward_conditions)
     ## Should cmd be run on a worker with R <= 4.0.2,
     ## .workRSOCK will not exist, so fallback to .slaveRSOCK
     arg <- "tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK)()"
@@ -130,17 +132,26 @@
 sendData.SOCKnode <- function(node, data) serialize(data, node$con)
 sendData.SOCK0node <- function(node, data) serialize(data, node$con, xdr = FALSE)
 
-recvData.SOCKnode <- recvData.SOCK0node <- function(node) unserialize(node$con)
+recvData.SOCKnode <- recvData.SOCK0node <- function(node) repeat {
+    val <- unserialize(node$con)
+    if (val$type != 'CONDITION') return(val)
+    signalCondition(val$value)
+}
 
 recvOneData.SOCKcluster <- function(cl)
 {
     socklist <- lapply(cl, function(x) x$con)
     repeat {
-        ready <- socketSelect(socklist)
-        if (length(ready) > 0) break;
+        repeat {
+            ready <- socketSelect(socklist)
+            if (length(ready) > 0) break;
+        }
+        n <- which.max(ready) # may need rotation or some such for fairness
+        value <- unserialize(socklist[[n]])
+        if (value$type != 'CONDITION')
+            return(list(node = n, value = value))
+        signalCondition(value$value)
     }
-    n <- which.max(ready) # may need rotation or some such for fairness
-    list(node = n, value = unserialize(socklist[[n]]))
 }
 
 makePSOCKcluster <- function(names, ...)
@@ -349,6 +360,7 @@
     timeout <- 2592000L   # wait 30 days for new cmds before failing
     useXDR <- TRUE        # binary serialization
     setup_strategy <- "sequential"
+    forward_conditions <- FALSE
 
     for (a in commandArgs(TRUE)) {
         ## Or use strsplit?
@@ -365,6 +377,9 @@
                SETUPSTRATEGY = {
                    setup_strategy <- match.arg(value,
                                                c("sequential", "parallel"))
+               },
+               FORWARDCONDITIONS = {
+                   forward_conditions <- as.logical(value)
                })
     }
     if (is.na(port)) stop("PORT must be specified")
@@ -377,5 +392,5 @@
                    format(Sys.time(), "%H:%M:%OS3"))
     cat(msg)
     workLoop(makeSOCKmaster(master, port, setup_timeout, timeout, useXDR,
-                            setup_strategy))
+                            setup_strategy), forward_conditions)
 }
Index: src/library/parallel/R/worker.R
===================================================================
--- src/library/parallel/R/worker.R	(revision 86373)
+++ src/library/parallel/R/worker.R	(working copy)
@@ -19,7 +19,7 @@
 ## Derived from snow 0.3-6 by Luke Tierney
 
 ## NB: there is also workerCommand in snowSOCK.R
-workCommand <- function(master)
+workCommand <- function(master, forward_conditions)
 {
     tryCatch({
         msg <- recvData(master)
@@ -39,8 +39,14 @@
                           class = c("snow-try-error","try-error"))
             }
             t1 <- proc.time()
-            value <- tryCatch(do.call(msg$data$fun, msg$data$args, quote = TRUE),
-                              error = handler)
+            withCallingHandlers({
+                value <- tryCatch(do.call(msg$data$fun, msg$data$args, quote = TRUE),
+                                  error = handler)
+            }, immediateCondition = function(cond) {
+                if (forward_conditions) sendData(master, list(
+                    type = "CONDITION", value = cond
+                ))
+            })
             t2 <- proc.time()
             value <- list(type = "VALUE", value = value, success = success,
                           time = t2 - t1, tag = msg$data$tag)
@@ -55,10 +61,10 @@
     }, interrupt = function(e) TRUE)
 }
 
-workLoop <- function(master)
+workLoop <- function(master, forward_conditions = FALSE)
 {
     if (!is.null(master))
-        while(workCommand(master)) {}
+        while(workCommand(master, forward_conditions)) {}
 }
 
 ## NB: this only sinks the connections, not C-level stdout/err.
Index: src/library/parallel/tests/snow1.RR
===================================================================
--- src/library/parallel/tests/snow1.RR	(revision 86373)
+++ src/library/parallel/tests/snow1.RR	(working copy)
@@ -36,6 +36,24 @@
     print(boot.ci(cd4.boot,  type = c("norm", "basic", "perc"),
                   conf = 0.9, h = atanh, hinv = tanh))
 }
+
+caught <- numeric()
+x <- 1:10
+withCallingHandlers(
+    parLapply(cl, x, function(i) {
+        signalCondition(
+            structure(list(
+                message = '',
+                payload = i
+            ), class = c('immediateCondition', 'condition'))
+        )
+        i
+    }),
+    immediateCondition = function(cond)
+        caught <<- c(caught, cond$payload)
+)
+stopifnot(setequal(x, caught))
+
 stopCluster(cl)
 
 
______________________________________________
R-devel@r-project.org mailing list
https://stat.ethz.ch/mailman/listinfo/r-devel

Reply via email to