Repository: spark Updated Branches: refs/heads/master 303a4e4d2 -> 7fe08b43c
[SPARK-4415] [PySpark] JVM should exit after Python exit When JVM is started in a Python process, it should exit once the stdin is closed. test: add spark.driver.memory in conf/spark-defaults.conf ``` daviesdm:~/work/spark$ cat conf/spark-defaults.conf spark.driver.memory 8g daviesdm:~/work/spark$ bin/pyspark >>> quit daviesdm:~/work/spark$ jps 4931 Jps 286 daviesdm:~/work/spark$ python wc.py 943738 0.719928026199 daviesdm:~/work/spark$ jps 286 4990 Jps ``` Author: Davies Liu <dav...@databricks.com> Closes #3274 from davies/exit and squashes the following commits: df0e524 [Davies Liu] address comments ce8599c [Davies Liu] address comments 050651f [Davies Liu] JVM should exit after Python exit Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fe08b43 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fe08b43 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fe08b43 Branch: refs/heads/master Commit: 7fe08b43c78bf9e8515f671e72aa03a83ea782f8 Parents: 303a4e4 Author: Davies Liu <dav...@databricks.com> Authored: Fri Nov 14 20:13:46 2014 -0800 Committer: Andrew Or <and...@databricks.com> Committed: Fri Nov 14 20:14:33 2014 -0800 ---------------------------------------------------------------------- bin/pyspark | 2 -- bin/pyspark2.cmd | 1 - .../spark/deploy/SparkSubmitDriverBootstrapper.scala | 11 ++++++----- python/pyspark/java_gateway.py | 4 +++- 4 files changed, 9 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/bin/pyspark ---------------------------------------------------------------------- diff --git a/bin/pyspark b/bin/pyspark index 1d8c94d..0b4f695 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -132,7 +132,5 @@ if [[ "$1" =~ \.py$ ]]; then gatherSparkSubmitOpts "$@" exec "$FWDIR"/bin/spark-submit "${SUBMISSION_OPTS[@]}" "$primary" "${APPLICATION_OPTS[@]}" else - # PySpark shell requires special handling downstream - export PYSPARK_SHELL=1 exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS fi http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/bin/pyspark2.cmd ---------------------------------------------------------------------- diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index 59415e9..a542ec8 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -59,7 +59,6 @@ for /f %%i in ('echo %1^| findstr /R "\.py"') do ( ) if [%PYTHON_FILE%] == [] ( - set PYSPARK_SHELL=1 if [%IPYTHON%] == [1] ( ipython %IPYTHON_OPTS% ) else ( http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 7ffff29..aa3743c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -149,14 +149,15 @@ private[spark] object SparkSubmitDriverBootstrapper { // subprocess there already reads directly from our stdin, so we should avoid spawning a // thread that contends with the subprocess in reading from System.in. val isWindows = Utils.isWindows - val isPySparkShell = sys.env.contains("PYSPARK_SHELL") + val isSubprocess = sys.env.contains("IS_SUBPROCESS") if (!isWindows) { val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") stdinThread.start() - // For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM - // should terminate on broken pipe, which signals that the parent process has exited. In - // Windows, the termination logic for the PySpark shell is handled in java_gateway.py - if (isPySparkShell) { + // Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on + // broken pipe, signaling that the parent process has exited. This is the case if the + // application is launched directly from python, as in the PySpark shell. In Windows, + // the termination logic is handled in java_gateway.py + if (isSubprocess) { stdinThread.join() process.destroy() } http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/python/pyspark/java_gateway.py ---------------------------------------------------------------------- diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 9c70fa5..a975dc1 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -45,7 +45,9 @@ def launch_gateway(): # Don't send ctrl-c / SIGINT to the Java gateway: def preexec_func(): signal.signal(signal.SIGINT, signal.SIG_IGN) - proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func) + env = dict(os.environ) + env["IS_SUBPROCESS"] = "1" # tell JVM to exit after python exits + proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func, env=env) else: # preexec_fn not supported on Windows proc = Popen(command, stdout=PIPE, stdin=PIPE) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org