Dear Henrik (and everyone else): Here's a patch implementing support for immediateConditions in 'parallel' socket clusters. What do you think?
I've tried to make the feature backwards-compatible in the sense that an older R starting a newer cluster worker will not pass the flag enabling condition passing and so will avoid being confused by packets with type = 'CONDITION'. In order to propagate the conditions in a timely manner, all 'parallel' functions that currently use recvData() on individual nodes will have to switch to calling recvOneData(). I've already adjusted staticClusterApply(), but e.g. clusterCall() would still postpone immediateConditions from nodes later in the list (should they appear). If this is deemed a good way forward, I can prepare a similar patch for the MPI and socket clusters implemented in the 'snow' package. -- Best regards, Ivan
Index: src/library/parallel/R/clusterApply.R =================================================================== --- src/library/parallel/R/clusterApply.R (revision 86373) +++ src/library/parallel/R/clusterApply.R (working copy) @@ -28,8 +28,12 @@ end <- min(n, start + p - 1L) jobs <- end - start + 1L for (i in 1:jobs) - sendCall(cl[[i]], fun, argfun(start + i - 1L)) - val[start:end] <- lapply(cl[1:jobs], recvResult) + sendCall(cl[[i]], fun, argfun(start + i - 1L), + tag = start + i - 1L) + for (i in 1:jobs) { + d <- recvOneResult(cl) + val[d$tag] <- list(d$value) + } start <- start + jobs } checkForRemoteErrors(val) Index: src/library/parallel/R/snow.R =================================================================== --- src/library/parallel/R/snow.R (revision 86373) +++ src/library/parallel/R/snow.R (working copy) @@ -120,7 +120,8 @@ rprog = file.path(R.home("bin"), "R"), snowlib = .libPaths()[1], useRscript = TRUE, # for use by snow clusters - useXDR = TRUE) + useXDR = TRUE, + forward_conditions = TRUE) defaultClusterOptions <<- addClusterOptions(emptyenv(), options) } Index: src/library/parallel/R/snowSOCK.R =================================================================== --- src/library/parallel/R/snowSOCK.R (revision 86373) +++ src/library/parallel/R/snowSOCK.R (working copy) @@ -32,6 +32,7 @@ methods <- getClusterOption("methods", options) useXDR <- getClusterOption("useXDR", options) homogeneous <- getClusterOption("homogeneous", options) + forward_conditions <- getClusterOption('forward_conditions', options) ## build the local command for starting the worker env <- paste0("MASTER=", master, @@ -40,7 +41,8 @@ " SETUPTIMEOUT=", setup_timeout, " TIMEOUT=", timeout, " XDR=", useXDR, - " SETUPSTRATEGY=", setup_strategy) + " SETUPSTRATEGY=", setup_strategy, + " FORWARDCONDITIONS=", forward_conditions) ## Should cmd be run on a worker with R <= 4.0.2, ## .workRSOCK will not exist, so fallback to .slaveRSOCK arg <- "tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK)()" @@ -130,17 +132,26 @@ sendData.SOCKnode <- function(node, data) serialize(data, node$con) sendData.SOCK0node <- function(node, data) serialize(data, node$con, xdr = FALSE) -recvData.SOCKnode <- recvData.SOCK0node <- function(node) unserialize(node$con) +recvData.SOCKnode <- recvData.SOCK0node <- function(node) repeat { + val <- unserialize(node$con) + if (val$type != 'CONDITION') return(val) + signalCondition(val$value) +} recvOneData.SOCKcluster <- function(cl) { socklist <- lapply(cl, function(x) x$con) repeat { - ready <- socketSelect(socklist) - if (length(ready) > 0) break; + repeat { + ready <- socketSelect(socklist) + if (length(ready) > 0) break; + } + n <- which.max(ready) # may need rotation or some such for fairness + value <- unserialize(socklist[[n]]) + if (value$type != 'CONDITION') + return(list(node = n, value = value)) + signalCondition(value$value) } - n <- which.max(ready) # may need rotation or some such for fairness - list(node = n, value = unserialize(socklist[[n]])) } makePSOCKcluster <- function(names, ...) @@ -349,6 +360,7 @@ timeout <- 2592000L # wait 30 days for new cmds before failing useXDR <- TRUE # binary serialization setup_strategy <- "sequential" + forward_conditions <- FALSE for (a in commandArgs(TRUE)) { ## Or use strsplit? @@ -365,6 +377,9 @@ SETUPSTRATEGY = { setup_strategy <- match.arg(value, c("sequential", "parallel")) + }, + FORWARDCONDITIONS = { + forward_conditions <- as.logical(value) }) } if (is.na(port)) stop("PORT must be specified") @@ -377,5 +392,5 @@ format(Sys.time(), "%H:%M:%OS3")) cat(msg) workLoop(makeSOCKmaster(master, port, setup_timeout, timeout, useXDR, - setup_strategy)) + setup_strategy), forward_conditions) } Index: src/library/parallel/R/worker.R =================================================================== --- src/library/parallel/R/worker.R (revision 86373) +++ src/library/parallel/R/worker.R (working copy) @@ -19,7 +19,7 @@ ## Derived from snow 0.3-6 by Luke Tierney ## NB: there is also workerCommand in snowSOCK.R -workCommand <- function(master) +workCommand <- function(master, forward_conditions) { tryCatch({ msg <- recvData(master) @@ -39,8 +39,14 @@ class = c("snow-try-error","try-error")) } t1 <- proc.time() - value <- tryCatch(do.call(msg$data$fun, msg$data$args, quote = TRUE), - error = handler) + withCallingHandlers({ + value <- tryCatch(do.call(msg$data$fun, msg$data$args, quote = TRUE), + error = handler) + }, immediateCondition = function(cond) { + if (forward_conditions) sendData(master, list( + type = "CONDITION", value = cond + )) + }) t2 <- proc.time() value <- list(type = "VALUE", value = value, success = success, time = t2 - t1, tag = msg$data$tag) @@ -55,10 +61,10 @@ }, interrupt = function(e) TRUE) } -workLoop <- function(master) +workLoop <- function(master, forward_conditions = FALSE) { if (!is.null(master)) - while(workCommand(master)) {} + while(workCommand(master, forward_conditions)) {} } ## NB: this only sinks the connections, not C-level stdout/err. Index: src/library/parallel/tests/snow1.RR =================================================================== --- src/library/parallel/tests/snow1.RR (revision 86373) +++ src/library/parallel/tests/snow1.RR (working copy) @@ -36,6 +36,24 @@ print(boot.ci(cd4.boot, type = c("norm", "basic", "perc"), conf = 0.9, h = atanh, hinv = tanh)) } + +caught <- numeric() +x <- 1:10 +withCallingHandlers( + parLapply(cl, x, function(i) { + signalCondition( + structure(list( + message = '', + payload = i + ), class = c('immediateCondition', 'condition')) + ) + i + }), + immediateCondition = function(cond) + caught <<- c(caught, cond$payload) +) +stopifnot(setequal(x, caught)) + stopCluster(cl)
______________________________________________ R-devel@r-project.org mailing list https://stat.ethz.ch/mailman/listinfo/r-devel