Repository: spark
Updated Branches:
  refs/heads/branch-1.1 2381e90dc -> e7672f196


[SPARK-3167] Handle special driver configs in Windows (Branch 1.1)

This is an effort to bring the Windows scripts up to speed after recent 
splashing changes in #1845.

Author: Andrew Or <andrewo...@gmail.com>

Closes #2156 from andrewor14/windows-config-branch-1.1 and squashes the 
following commits:

00b9dfe [Andrew Or] [SPARK-3167] Handle special driver configs in Windows


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7672f19
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7672f19
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7672f19

Branch: refs/heads/branch-1.1
Commit: e7672f19674c37fbd1a43fb3793b69097349bca1
Parents: 2381e90
Author: Andrew Or <andrewo...@gmail.com>
Authored: Tue Aug 26 23:06:11 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Tue Aug 26 23:06:21 2014 -0700

----------------------------------------------------------------------
 bin/compute-classpath.cmd                       |  3 +-
 bin/spark-class2.cmd                            | 46 +++++++++++++++++---
 bin/spark-submit                                |  2 +-
 bin/spark-submit.cmd                            | 34 ++++++++++-----
 .../deploy/SparkSubmitDriverBootstrapper.scala  | 19 +++++---
 python/pyspark/java_gateway.py                  | 17 ++++++++
 6 files changed, 95 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e7672f19/bin/compute-classpath.cmd
----------------------------------------------------------------------
diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd
index 58710cd..5ad5245 100644
--- a/bin/compute-classpath.cmd
+++ b/bin/compute-classpath.cmd
@@ -36,7 +36,8 @@ rem Load environment variables from conf\spark-env.cmd, if it 
exists
 if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
 
 rem Build up classpath
-set CLASSPATH=%FWDIR%conf
+set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%;%FWDIR%conf
+
 if exist "%FWDIR%RELEASE" (
   for %%d in ("%FWDIR%lib\spark-assembly*.jar") do (
     set ASSEMBLY_JAR=%%d

http://git-wip-us.apache.org/repos/asf/spark/blob/e7672f19/bin/spark-class2.cmd
----------------------------------------------------------------------
diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd
old mode 100755
new mode 100644
index e420eb4..b606255
--- a/bin/spark-class2.cmd
+++ b/bin/spark-class2.cmd
@@ -17,6 +17,8 @@ rem See the License for the specific language governing 
permissions and
 rem limitations under the License.
 rem
 
+rem Any changes to this file must be reflected in 
SparkSubmitDriverBootstrapper.scala!
+
 setlocal enabledelayedexpansion
 
 set SCALA_VERSION=2.10
@@ -38,7 +40,7 @@ if not "x%1"=="x" goto arg_given
 
 if not "x%SPARK_MEM%"=="x" (
   echo Warning: SPARK_MEM is deprecated, please use a more specific config 
option
-  echo e.g., spark.executor.memory or SPARK_DRIVER_MEMORY.
+  echo e.g., spark.executor.memory or spark.driver.memory.
 )
 
 rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific 
options
@@ -67,18 +69,26 @@ rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
   set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
   if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set 
OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%
 
-rem All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses 
SPARK_REPL_OPTS.
-) else if "%1"=="org.apache.spark.repl.Main" (
-  set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_REPL_OPTS%
+rem Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
+rem SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
+rem The repl also uses SPARK_REPL_OPTS.
+) else if "%1"=="org.apache.spark.deploy.SparkSubmit" (
+  set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_SUBMIT_OPTS% %SPARK_REPL_OPTS%
+  if not "x%SPARK_SUBMIT_LIBRARY_PATH%"=="x" (
+    set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! 
-Djava.library.path=%SPARK_SUBMIT_LIBRARY_PATH%
+  ) else if not "x%SPARK_LIBRARY_PATH%"=="x" (
+    set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_LIBRARY_PATH%
+  )
   if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
+  if not "x%SPARK_SUBMIT_DRIVER_MEMORY%"=="x" set 
OUR_JAVA_MEM=%SPARK_SUBMIT_DRIVER_MEMORY%
 ) else (
   set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
   if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
 )
 
-rem Set JAVA_OPTS to be able to load native libraries and to set heap size
-set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% 
-Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
 rem Attention: when changing the way the JAVA_OPTS are assembled, the change 
must be reflected in CommandUtils.scala!
+rem Set JAVA_OPTS to be able to load native libraries and to set heap size
+set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% 
-Xmx%OUR_JAVA_MEM%
 
 rem Test whether the user has built Spark
 if exist "%FWDIR%RELEASE" goto skip_build_test
@@ -109,5 +119,27 @@ rem Figure out where java is.
 set RUNNER=java
 if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
 
-"%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
+rem In Spark submit client mode, the driver is launched in the same JVM as 
Spark submit itself.
+rem Here we must parse the properties file for relevant "spark.driver.*" 
configs before launching
+rem the driver JVM itself. Instead of handling this complexity here, we launch 
a separate JVM
+rem to prepare the launch environment of this driver JVM.
+
+rem In this case, leave out the main class 
(org.apache.spark.deploy.SparkSubmit) and use our own.
+rem Leaving out the first argument is surprisingly difficult to do in Windows. 
Note that this must
+rem be done here because the Windows "shift" command does not work in a 
conditional block.
+set BOOTSTRAP_ARGS=
+shift
+:start_parse
+if "%~1" == "" goto end_parse
+set BOOTSTRAP_ARGS=%BOOTSTRAP_ARGS% %~1
+shift
+goto start_parse
+:end_parse
+
+if not [%SPARK_SUBMIT_BOOTSTRAP_DRIVER%] == [] (
+  set SPARK_CLASS=1
+  "%RUNNER%" org.apache.spark.deploy.SparkSubmitDriverBootstrapper 
%BOOTSTRAP_ARGS%
+) else (
+  "%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
+)
 :exit

http://git-wip-us.apache.org/repos/asf/spark/blob/e7672f19/bin/spark-submit
----------------------------------------------------------------------
diff --git a/bin/spark-submit b/bin/spark-submit
index 32c911c..277c4ce 100755
--- a/bin/spark-submit
+++ b/bin/spark-submit
@@ -17,7 +17,7 @@
 # limitations under the License.
 #
 
-# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala!
+# NOTE: Any changes in this file must be reflected in 
SparkSubmitDriverBootstrapper.scala!
 
 export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
 ORIG_ARGS=("$@")

http://git-wip-us.apache.org/repos/asf/spark/blob/e7672f19/bin/spark-submit.cmd
----------------------------------------------------------------------
diff --git a/bin/spark-submit.cmd b/bin/spark-submit.cmd
index 6eb702e..cf6046d 100644
--- a/bin/spark-submit.cmd
+++ b/bin/spark-submit.cmd
@@ -17,23 +17,28 @@ rem See the License for the specific language governing 
permissions and
 rem limitations under the License.
 rem
 
+rem NOTE: Any changes in this file must be reflected in 
SparkSubmitDriverBootstrapper.scala!
+
 set SPARK_HOME=%~dp0..
 set ORIG_ARGS=%*
 
-rem Clear the values of all variables used
-set DEPLOY_MODE=
-set DRIVER_MEMORY=
+rem Reset the values of all variables used
+set SPARK_SUBMIT_DEPLOY_MODE=client
+set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_HOME%\conf\spark-defaults.conf
+set SPARK_SUBMIT_DRIVER_MEMORY=
 set SPARK_SUBMIT_LIBRARY_PATH=
 set SPARK_SUBMIT_CLASSPATH=
 set SPARK_SUBMIT_OPTS=
-set SPARK_DRIVER_MEMORY=
+set SPARK_SUBMIT_BOOTSTRAP_DRIVER=
 
 :loop
 if [%1] == [] goto continue
   if [%1] == [--deploy-mode] (
-    set DEPLOY_MODE=%2
+    set SPARK_SUBMIT_DEPLOY_MODE=%2
+  ) else if [%1] == [--properties-file] (
+    set SPARK_SUBMIT_PROPERTIES_FILE=%2
   ) else if [%1] == [--driver-memory] (
-    set DRIVER_MEMORY=%2
+    set SPARK_SUBMIT_DRIVER_MEMORY=%2
   ) else if [%1] == [--driver-library-path] (
     set SPARK_SUBMIT_LIBRARY_PATH=%2
   ) else if [%1] == [--driver-class-path] (
@@ -45,12 +50,19 @@ if [%1] == [] goto continue
 goto loop
 :continue
 
-if [%DEPLOY_MODE%] == [] (
-  set DEPLOY_MODE=client
-)
+rem For client mode, the driver will be launched in the same JVM that launches
+rem SparkSubmit, so we may need to read the properties file for any extra class
+rem paths, library paths, java options and memory early on. Otherwise, it will
+rem be too late by the time the driver JVM has started.
 
-if not [%DRIVER_MEMORY%] == [] if [%DEPLOY_MODE%] == [client] (
-  set SPARK_DRIVER_MEMORY=%DRIVER_MEMORY%
+if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] (
+  if exist %SPARK_SUBMIT_PROPERTIES_FILE% (
+    rem Parse the properties file only if the special configs exist
+    for /f %%i in ('findstr /r /c:"^[\t ]*spark.driver.memory" /c:"^[\t 
]*spark.driver.extra" ^
+      %SPARK_SUBMIT_PROPERTIES_FILE%') do (
+      set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
+    )
+  )
 )
 
 cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd 
org.apache.spark.deploy.SparkSubmit %ORIG_ARGS%

http://git-wip-us.apache.org/repos/asf/spark/blob/e7672f19/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index af607e6..7ca96ed 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -133,17 +133,24 @@ private[spark] object SparkSubmitDriverBootstrapper {
     val process = builder.start()
 
     // Redirect stdin, stdout, and stderr to/from the child JVM
-    val stdinThread = new RedirectThread(System.in, process.getOutputStream, 
"redirect stdin")
     val stdoutThread = new RedirectThread(process.getInputStream, System.out, 
"redirect stdout")
     val stderrThread = new RedirectThread(process.getErrorStream, System.err, 
"redirect stderr")
-    stdinThread.start()
     stdoutThread.start()
     stderrThread.start()
 
-    // Terminate on broken pipe, which signals that the parent process has 
exited. This is
-    // important for the PySpark shell, where Spark submit itself is a python 
subprocess.
-    stdinThread.join()
-    process.destroy()
+    // In Windows, the subprocess reads directly from our stdin, so we should 
avoid spawning
+    // a thread that contends with the subprocess in reading from System.in.
+    if (Utils.isWindows) {
+      // For the PySpark shell, the termination of this process is handled in 
java_gateway.py
+      process.waitFor()
+    } else {
+      // Terminate on broken pipe, which signals that the parent process has 
exited. This is
+      // important for the PySpark shell, where Spark submit itself is a 
python subprocess.
+      val stdinThread = new RedirectThread(System.in, process.getOutputStream, 
"redirect stdin")
+      stdinThread.start()
+      stdinThread.join()
+      process.destroy()
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7672f19/python/pyspark/java_gateway.py
----------------------------------------------------------------------
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 6f4f62f..9c70fa5 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+import atexit
 import os
 import sys
 import signal
@@ -69,6 +70,22 @@ def launch_gateway():
                 error_msg += 
"--------------------------------------------------------------\n"
             raise Exception(error_msg)
 
+        # In Windows, ensure the Java child processes do not linger after 
Python has exited.
+        # In UNIX-based systems, the child process can kill itself on broken 
pipe (i.e. when
+        # the parent process' stdin sends an EOF). In Windows, however, this 
is not possible
+        # because java.lang.Process reads directly from the parent process' 
stdin, contending
+        # with any opportunity to read an EOF from the parent. Note that this 
is only best
+        # effort and will not take effect if the python process is violently 
terminated.
+        if on_windows:
+            # In Windows, the child process here is "spark-submit.cmd", not 
the JVM itself
+            # (because the UNIX "exec" command is not available). This means 
we cannot simply
+            # call proc.kill(), which kills only the "spark-submit.cmd" 
process but not the
+            # JVMs. Instead, we use "taskkill" with the tree-kill option "/t" 
to terminate all
+            # child processes in the tree 
(http://technet.microsoft.com/en-us/library/bb491009.aspx)
+            def killChild():
+                Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", 
str(proc.pid)])
+            atexit.register(killChild)
+
         # Create a thread to echo output from the GatewayServer, which is 
required
         # for Java log output to show up:
         class EchoOutputThread(Thread):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to