Repository: spark
Updated Branches:
  refs/heads/master c3712b77a -> 08e0d033b


[SPARK-21093][R] Terminate R's worker processes in the parent of R's daemon to 
prevent a leak

## What changes were proposed in this pull request?

This is a retry for #18320. This PR was reverted due to unexpected test 
failures with -10 error code.

I was unable to reproduce in MacOS, CentOS and Ubuntu but only in Jenkins. So, 
the tests proceeded to verify this and revert the past try here - 
https://github.com/apache/spark/pull/18456

This new approach was tested in https://github.com/apache/spark/pull/18463.

**Test results**:

- With the part of suspicious change in the past try 
(https://github.com/apache/spark/pull/18463/commits/466325d3fd353668583f3bde38ae490d9db0b189)

  Tests ran 4 times and 2 times passed and 2 time failed.

- Without the part of suspicious change in the past try 
(https://github.com/apache/spark/pull/18463/commits/466325d3fd353668583f3bde38ae490d9db0b189)

  Tests ran 5 times and they all passed.

- With this new approach 
(https://github.com/apache/spark/pull/18463/commits/0a7589c09f53dfc2094497d8d3e59d6407569417)

  Tests ran 5 times and they all passed.

It looks the cause is as below (see 
https://github.com/apache/spark/pull/18463/commits/466325d3fd353668583f3bde38ae490d9db0b189):

```diff
+ exitCode <- 1
...
+   data <- parallel:::readChild(child)
+   if (is.raw(data)) {
+     if (unserialize(data) == exitCode) {
      ...
+     }
+   }

...

- parallel:::mcexit(0L)
+ parallel:::mcexit(0L, send = exitCode)
```

Two possibilities I think

 - `parallel:::mcexit(.. , send = exitCode)`

   https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/mcfork.html

   > It sends send to the master (unless NULL) and then shuts down the child 
process.

   However, it looks possible that the parent attemps to terminate the child 
right after getting our custom exit code. So, the child gets terminated between 
"send" and "shuts down", failing to exit properly.

 - A bug between `parallel:::mcexit(..., send = ...)` and 
`parallel:::readChild`.

**Proposal**:

To resolve this, I simply decided to avoid both possibilities with this new 
approach here 
(https://github.com/apache/spark/pull/18465/commits/9ff89a7859cb9f427fc774f33c3521c7d962b723).
 To support this idea, I explained with some quotation of the documentation as 
below:

https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/mcfork.html

> `readChild` and `readChildren` return a raw vector with a "pid" attribute if 
> data were available, an integer vector of length one with the process ID if a 
> child terminated or `NULL` if the child no longer exists (no children at all 
> for `readChildren`).

`readChild` returns "an integer vector of length one with the process ID if a 
child terminated" so we can check if it is `integer` and the same selected 
"process ID". I believe this makes sure that the children are exited.

In case that children happen to send any data manually to parent (which is why 
we introduced the suspicious part of the change 
(https://github.com/apache/spark/pull/18463/commits/466325d3fd353668583f3bde38ae490d9db0b189)),
 this should be raw bytes and will be discarded (and then will try to read the 
next and check if it is `integer` in the next loop).

## How was this patch tested?

Manual tests and Jenkins tests.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #18465 from HyukjinKwon/SPARK-21093-retry-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08e0d033
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08e0d033
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08e0d033

Branch: refs/heads/master
Commit: 08e0d033b40946b4ef5741a7aa1e7ba0bd48c6fb
Parents: c3712b7
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Sat Jul 8 14:24:37 2017 -0700
Committer: Felix Cheung <felixche...@apache.org>
Committed: Sat Jul 8 14:24:37 2017 -0700

----------------------------------------------------------------------
 R/pkg/inst/worker/daemon.R | 51 ++++++++++++++++++++++++++++++++++++++---
 1 file changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/08e0d033/R/pkg/inst/worker/daemon.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R
index 3a318b7..2e31dc5 100644
--- a/R/pkg/inst/worker/daemon.R
+++ b/R/pkg/inst/worker/daemon.R
@@ -30,8 +30,50 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
     port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each child 
terminates
+  # itself, it appears that the resource is not released properly, that causes 
an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to retrieve 
children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if there 
are child workers
+  #     running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children from 
the following
+  # new socket connection.
+
+  # The process IDs of exited children are returned below.
+  children <- parallel:::selectChildren(timeout = 0)
+
+  if (is.integer(children)) {
+    lapply(children, function(child) {
+      # This should be the PIDs of exited children. Otherwise, this returns 
raw bytes if any data
+      # was sent from this child. In this case, we discard it.
+      pid <- parallel:::readChild(child)
+      if (is.integer(pid)) {
+        # This checks if the data from this child is the same pid of this 
selected child.
+        if (child == pid) {
+          # If so, we terminate this child.
+          tools::pskill(child, tools::SIGUSR1)
+        }
+      }
+    })
+  } else if (is.null(children)) {
+    # If it is NULL, there are no children. Waits indefinitely for a socket 
connecion.
+    selectTimeout <- NULL
+  }
+
   if (ready) {
     port <- SparkR:::readInt(inputCon)
     # There is a small chance that it could be interrupted by signal, retry 
one time
@@ -44,12 +86,15 @@ while (TRUE) {
     }
     p <- parallel:::mcfork()
     if (inherits(p, "masterProcess")) {
+      # Reach here because this is a child process.
       close(inputCon)
       Sys.setenv(SPARKR_WORKER_PORT = port)
       try(source(script))
-      # Set SIGUSR1 so that child can exit
-      tools::pskill(Sys.getpid(), tools::SIGUSR1)
+      # Note that this mcexit does not fully terminate this child.
       parallel:::mcexit(0L)
+    } else {
+      # Forking succeeded and we need to check if they finished their jobs 
every second.
+      selectTimeout <- 1
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to