Repository: spark
Updated Branches:
  refs/heads/master 884347e1f -> 6b3d02285


[SPARK-21093][R] Terminate R's worker processes in the parent of R's daemon to 
prevent a leak

## What changes were proposed in this pull request?

`mcfork` in R looks opening a pipe ahead but the existing logic does not 
properly close it when it is executed hot. This leads to the failure of more 
forking due to the limit for number of files open.

This hot execution looks particularly for `gapply`/`gapplyCollect`. For unknown 
reason, this happens more easily in CentOS and could be reproduced in Mac too.

All the details are described in 
https://issues.apache.org/jira/browse/SPARK-21093

This PR proposes simply to terminate R's worker processes in the parent of R's 
daemon to prevent a leak.

## How was this patch tested?

I ran the codes below on both CentOS and Mac with that configuration 
disabled/enabled.

```r
df <- createDataFrame(list(list(1L, 1, "1", 0.1)), c("a", "b", "c", "d"))
collect(gapply(df, "a", function(key, x) { x }, schema(df)))
collect(gapply(df, "a", function(key, x) { x }, schema(df)))
...  # 30 times
```

Also, now it passes R tests on CentOS as below:

```
SparkSQL functions: Spark package found in SPARK_HOME: .../spark
..............................................................................................................................................................
..............................................................................................................................................................
..............................................................................................................................................................
..............................................................................................................................................................
..............................................................................................................................................................
....................................................................................................................................
```

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #18320 from HyukjinKwon/SPARK-21093.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b3d0228
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b3d0228
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b3d0228

Branch: refs/heads/master
Commit: 6b3d02285ee0debc73cbcab01b10398a498fbeb8
Parents: 884347e
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Sun Jun 25 11:05:57 2017 -0700
Committer: Felix Cheung <felixche...@apache.org>
Committed: Sun Jun 25 11:05:57 2017 -0700

----------------------------------------------------------------------
 R/pkg/inst/worker/daemon.R | 59 ++++++++++++++++++++++++++++++++++++++---
 1 file changed, 55 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6b3d0228/R/pkg/inst/worker/daemon.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R
index 3a318b7..6e385b2 100644
--- a/R/pkg/inst/worker/daemon.R
+++ b/R/pkg/inst/worker/daemon.R
@@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
     port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
+# Exit code that children send to the parent to indicate they exited.
+exitCode <- 1
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each child 
terminates
+  # itself, it appears that the resource is not released properly, that causes 
an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to retrieve 
children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if there 
are child workers
+  #     running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children from 
the following
+  # new socket connection.
+
+  # Only the process IDs of children sent data to the parent are returned 
below. The children
+  # send a custom exit code to the parent after being exited and the parent 
tries
+  # to terminate them only if they sent the exit code.
+  children <- parallel:::selectChildren(timeout = 0)
+
+  if (is.integer(children)) {
+    lapply(children, function(child) {
+      # This data should be raw bytes if any data was sent from this child.
+      # Otherwise, this returns the PID.
+      data <- parallel:::readChild(child)
+      if (is.raw(data)) {
+        # This checks if the data from this child is the exit code that 
indicates an exited child.
+        if (unserialize(data) == exitCode) {
+          # If so, we terminate this child.
+          tools::pskill(child, tools::SIGUSR1)
+        }
+      }
+    })
+  } else if (is.null(children)) {
+    # If it is NULL, there are no children. Waits indefinitely for a socket 
connecion.
+    selectTimeout <- NULL
+  }
+
   if (ready) {
     port <- SparkR:::readInt(inputCon)
     # There is a small chance that it could be interrupted by signal, retry 
one time
@@ -44,12 +91,16 @@ while (TRUE) {
     }
     p <- parallel:::mcfork()
     if (inherits(p, "masterProcess")) {
+      # Reach here because this is a child process.
       close(inputCon)
       Sys.setenv(SPARKR_WORKER_PORT = port)
       try(source(script))
-      # Set SIGUSR1 so that child can exit
-      tools::pskill(Sys.getpid(), tools::SIGUSR1)
-      parallel:::mcexit(0L)
+      # Note that this mcexit does not fully terminate this child. So, this 
writes back
+      # a custom exit code so that the parent can read and terminate this 
child.
+      parallel:::mcexit(0L, send = exitCode)
+    } else {
+      # Forking succeeded and we need to check if they finished their jobs 
every second.
+      selectTimeout <- 1
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to