This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b14088  [SPARK-26175][PYTHON] Redirect the standard input of the 
forked child to devnull in daemon
3b14088 is described below

commit 3b140885410362fced9d98fca61d6a357de604af
Author: WeichenXu <weichen...@databricks.com>
AuthorDate: Wed Jul 31 09:10:24 2019 +0900

    [SPARK-26175][PYTHON] Redirect the standard input of the forked child to 
devnull in daemon
    
    ## What changes were proposed in this pull request?
    
    PySpark worker daemon reads from stdin the worker PIDs to kill. 
https://github.com/apache/spark/blob/1bb60ab8392adf8b896cc04fb1d060620cf09d8a/python/pyspark/daemon.py#L127
    
    However, the worker process is a forked process from the worker daemon 
process and we didn't close stdin on the child after fork. This means the child 
and user program can read stdin as well, which blocks daemon from receiving the 
PID to kill. This can cause issues because the task reaper might detect the 
task was not terminated and eventually kill the JVM.
    
    This PR fix this by redirecting the standard input of the forked child to 
devnull.
    
    ## How was this patch tested?
    
    Manually test.
    
    In `pyspark`, run:
    ```
    import subprocess
    def task(_):
      subprocess.check_output(["cat"])
    
    sc.parallelize(range(1), 1).mapPartitions(task).count()
    ```
    
    Before:
    The job will get stuck and press Ctrl+C to exit the job but the python 
worker process do not exit.
    After:
    The job finish correctly. The "cat" print nothing (because the dummay stdin 
is "/dev/null").
    The python worker process exit normally.
    
    Please review https://spark.apache.org/contributing.html before opening a 
pull request.
    
    Closes #25138 from WeichenXu123/SPARK-26175.
    
    Authored-by: WeichenXu <weichen...@databricks.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 python/pyspark/daemon.py             | 15 +++++++++++++++
 python/pyspark/sql/tests/test_udf.py | 12 ++++++++++++
 2 files changed, 27 insertions(+)

diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 6f42ad3..97b6b25 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -160,6 +160,21 @@ def manager():
                 if pid == 0:
                     # in child process
                     listen_sock.close()
+
+                    # It should close the standard input in the child process 
so that
+                    # Python native function executions stay intact.
+                    #
+                    # Note that if we just close the standard input (file 
descriptor 0),
+                    # the lowest file descriptor (file descriptor 0) will be 
allocated,
+                    # later when other file descriptors should happen to open.
+                    #
+                    # Therefore, here we redirects it to '/dev/null' by 
duplicating
+                    # another file descriptor for '/dev/null' to the standard 
input (0).
+                    # See SPARK-26175.
+                    devnull = open(os.devnull, 'r')
+                    os.dup2(devnull.fileno(), 0)
+                    devnull.close()
+
                     try:
                         # Acknowledge that the fork was successful
                         outfile = sock.makefile(mode="wb")
diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index 803d471..1999311 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -616,6 +616,18 @@ class UDFTests(ReusedSQLTestCase):
 
         self.spark.range(1).select(f()).collect()
 
+    def test_worker_original_stdin_closed(self):
+        # Test if it closes the original standard input of worker inherited 
from the daemon,
+        # and replaces it with '/dev/null'.  See SPARK-26175.
+        def task(iterator):
+            import sys
+            res = sys.stdin.read()
+            # Because the standard input is '/dev/null', it reaches to EOF.
+            assert res == '', "Expect read EOF from stdin."
+            return iterator
+
+        self.sc.parallelize(range(1), 1).mapPartitions(task).count()
+
 
 class UDFInitializationTests(unittest.TestCase):
     def tearDown(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to