This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d6066e  [SPARK-21094][PYTHON] Add popen_kwargs to launch_gateway
3d6066e is described below

commit 3d6066e9b6bcd24d1ece46f80689b1ff0fcddea3
Author: Peter Parente <pare...@cs.unc.edu>
AuthorDate: Fri Feb 15 18:08:06 2019 -0800

    [SPARK-21094][PYTHON] Add popen_kwargs to launch_gateway
    
    ## What changes were proposed in this pull request?
    
    Allow the caller to customize the py4j JVM subprocess pipes and buffers for 
programmatic capturing of its output.
    
    https://issues.apache.org/jira/browse/SPARK-21094 has more detail about the 
use case.
    
    ## How was this patch tested?
    
    Tested by running the pyspark unit tests locally.
    
    Closes #18339 from parente/feature/SPARK-21094-popen-args.
    
    Lead-authored-by: Peter Parente <pare...@cs.unc.edu>
    Co-authored-by: Peter Parente <peter.pare...@maxpoint.com>
    Signed-off-by: Holden Karau <hol...@pigscanfly.ca>
---
 python/pyspark/java_gateway.py | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index d8315c6..5a55401 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -36,15 +36,21 @@ from pyspark.serializers import read_int, 
write_with_length, UTF8Deserializer
 from pyspark.util import _exception_message
 
 
-def launch_gateway(conf=None):
+def launch_gateway(conf=None, popen_kwargs=None):
     """
     launch jvm gateway
     :param conf: spark configuration passed to spark-submit
+    :param popen_kwargs: Dictionary of kwargs to pass to Popen when spawning
+        the py4j JVM. This is a developer feature intended for use in
+        customizing how pyspark interacts with the py4j JVM (e.g., capturing
+        stdout/stderr).
     :return:
     """
     if "PYSPARK_GATEWAY_PORT" in os.environ:
         gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
         gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
+        # Process already exists
+        proc = None
     else:
         SPARK_HOME = _find_spark_home()
         # Launch the Py4j gateway using Spark's run command so that we pick up 
the
@@ -75,15 +81,20 @@ def launch_gateway(conf=None):
             env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file
 
             # Launch the Java gateway.
+            popen_kwargs = {} if popen_kwargs is None else popen_kwargs
             # We open a pipe to stdin so that the Java gateway can die when 
the pipe is broken
+            popen_kwargs['stdin'] = PIPE
+            # We always set the necessary environment variables.
+            popen_kwargs['env'] = env
             if not on_windows:
                 # Don't send ctrl-c / SIGINT to the Java gateway:
                 def preexec_func():
                     signal.signal(signal.SIGINT, signal.SIG_IGN)
-                proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, 
env=env)
+                popen_kwargs['preexec_fn'] = preexec_func
+                proc = Popen(command, **popen_kwargs)
             else:
                 # preexec_fn not supported on Windows
-                proc = Popen(command, stdin=PIPE, env=env)
+                proc = Popen(command, **popen_kwargs)
 
             # Wait for the file to appear, or for the process to exit, 
whichever happens first.
             while not proc.poll() and not os.path.isfile(conn_info_file):
@@ -118,6 +129,8 @@ def launch_gateway(conf=None):
     gateway = JavaGateway(
         gateway_parameters=GatewayParameters(port=gateway_port, 
auth_token=gateway_secret,
                                              auto_convert=True))
+    # Store a reference to the Popen object for use by the caller (e.g., in 
reading stdout/stderr)
+    gateway.proc = proc
 
     # Import the classes used by PySpark
     java_import(gateway.jvm, "org.apache.spark.SparkConf")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to