Repository: spark
Updated Branches:
  refs/heads/branch-1.1 bb23b118e -> 92daffed4


[SPARK-2898] [PySpark] fix bugs in deamon.py

1. do not use signal handler for SIGCHILD, it's easy to cause deadlock
2. handle EINTR during accept()
3. pass errno into JVM
4. handle EAGAIN during fork()

Now, it can pass 50k tasks tests in 180 seconds.

Author: Davies Liu <davies....@gmail.com>

Closes #1842 from davies/qa and squashes the following commits:

f0ea451 [Davies Liu] fix lint
03a2e8c [Davies Liu] cleanup dead children every seconds
32cb829 [Davies Liu] fix lint
0cd0817 [Davies Liu] fix bugs in deamon.py

(cherry picked from commit 28dcbb531ae57dc50f15ad9df6c31022731669c9)
Signed-off-by: Josh Rosen <joshro...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92daffed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92daffed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92daffed

Branch: refs/heads/branch-1.1
Commit: 92daffed4c17e373a06333c85124075d0fd18f0c
Parents: bb23b11
Author: Davies Liu <davies....@gmail.com>
Authored: Sun Aug 10 13:00:38 2014 -0700
Committer: Josh Rosen <joshro...@apache.org>
Committed: Sun Aug 10 13:00:57 2014 -0700

----------------------------------------------------------------------
 .../spark/api/python/PythonWorkerFactory.scala  |  2 +-
 python/pyspark/daemon.py                        | 78 ++++++++++++--------
 2 files changed, 48 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/92daffed/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 7af260d..bf716a8 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -68,7 +68,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, 
envVars: Map[String
       val socket = new Socket(daemonHost, daemonPort)
       val pid = new DataInputStream(socket.getInputStream).readInt()
       if (pid < 0) {
-        throw new IllegalStateException("Python daemon failed to launch 
worker")
+        throw new IllegalStateException("Python daemon failed to launch worker 
with code " + pid)
       }
       daemonWorkers.put(socket, pid)
       socket

http://git-wip-us.apache.org/repos/asf/spark/blob/92daffed/python/pyspark/daemon.py
----------------------------------------------------------------------
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index e73538b..22ab8d3 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -22,7 +22,8 @@ import select
 import socket
 import sys
 import traceback
-from errno import EINTR, ECHILD
+import time
+from errno import EINTR, ECHILD, EAGAIN
 from socket import AF_INET, SOCK_STREAM, SOMAXCONN
 from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
 from pyspark.worker import main as worker_main
@@ -80,6 +81,17 @@ def worker(sock):
         os._exit(compute_real_exit_code(exit_code))
 
 
+# Cleanup zombie children
+def cleanup_dead_children():
+    try:
+        while True:
+            pid, _ = os.waitpid(0, os.WNOHANG)
+            if not pid:
+                break
+    except:
+        pass
+
+
 def manager():
     # Create a new process group to corral our children
     os.setpgid(0, 0)
@@ -102,29 +114,21 @@ def manager():
     signal.signal(SIGTERM, handle_sigterm)  # Gracefully exit on SIGTERM
     signal.signal(SIGHUP, SIG_IGN)  # Don't die on SIGHUP
 
-    # Cleanup zombie children
-    def handle_sigchld(*args):
-        try:
-            pid, status = os.waitpid(0, os.WNOHANG)
-            if status != 0:
-                msg = "worker %s crashed abruptly with exit status %s" % (pid, 
status)
-                print >> sys.stderr, msg
-        except EnvironmentError as err:
-            if err.errno not in (ECHILD, EINTR):
-                raise
-    signal.signal(SIGCHLD, handle_sigchld)
-
     # Initialization complete
     sys.stdout.close()
     try:
         while True:
             try:
-                ready_fds = select.select([0, listen_sock], [], [])[0]
+                ready_fds = select.select([0, listen_sock], [], [], 1)[0]
             except select.error as ex:
                 if ex[0] == EINTR:
                     continue
                 else:
                     raise
+
+            # cleanup in signal handler will cause deadlock
+            cleanup_dead_children()
+
             if 0 in ready_fds:
                 try:
                     worker_pid = read_int(sys.stdin)
@@ -137,29 +141,41 @@ def manager():
                     pass  # process already died
 
             if listen_sock in ready_fds:
-                sock, addr = listen_sock.accept()
+                try:
+                    sock, _ = listen_sock.accept()
+                except OSError as e:
+                    if e.errno == EINTR:
+                        continue
+                    raise
+
                 # Launch a worker process
                 try:
                     pid = os.fork()
-                    if pid == 0:
-                        listen_sock.close()
-                        try:
-                            worker(sock)
-                        except:
-                            traceback.print_exc()
-                            os._exit(1)
-                        else:
-                            os._exit(0)
+                except OSError as e:
+                    if e.errno in (EAGAIN, EINTR):
+                        time.sleep(1)
+                        pid = os.fork()  # error here will shutdown daemon
                     else:
+                        outfile = sock.makefile('w')
+                        write_int(e.errno, outfile)  # Signal that the fork 
failed
+                        outfile.flush()
+                        outfile.close()
                         sock.close()
-
-                except OSError as e:
-                    print >> sys.stderr, "Daemon failed to fork PySpark 
worker: %s" % e
-                    outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
-                    write_int(-1, outfile)  # Signal that the fork failed
-                    outfile.flush()
-                    outfile.close()
+                        continue
+
+                if pid == 0:
+                    # in child process
+                    listen_sock.close()
+                    try:
+                        worker(sock)
+                    except:
+                        traceback.print_exc()
+                        os._exit(1)
+                    else:
+                        os._exit(0)
+                else:
                     sock.close()
+
     finally:
         shutdown(1)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to