This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3d6066e [SPARK-21094][PYTHON] Add popen_kwargs to launch_gateway 3d6066e is described below commit 3d6066e9b6bcd24d1ece46f80689b1ff0fcddea3 Author: Peter Parente <pare...@cs.unc.edu> AuthorDate: Fri Feb 15 18:08:06 2019 -0800 [SPARK-21094][PYTHON] Add popen_kwargs to launch_gateway ## What changes were proposed in this pull request? Allow the caller to customize the py4j JVM subprocess pipes and buffers for programmatic capturing of its output. https://issues.apache.org/jira/browse/SPARK-21094 has more detail about the use case. ## How was this patch tested? Tested by running the pyspark unit tests locally. Closes #18339 from parente/feature/SPARK-21094-popen-args. Lead-authored-by: Peter Parente <pare...@cs.unc.edu> Co-authored-by: Peter Parente <peter.pare...@maxpoint.com> Signed-off-by: Holden Karau <hol...@pigscanfly.ca> --- python/pyspark/java_gateway.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index d8315c6..5a55401 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -36,15 +36,21 @@ from pyspark.serializers import read_int, write_with_length, UTF8Deserializer from pyspark.util import _exception_message -def launch_gateway(conf=None): +def launch_gateway(conf=None, popen_kwargs=None): """ launch jvm gateway :param conf: spark configuration passed to spark-submit + :param popen_kwargs: Dictionary of kwargs to pass to Popen when spawning + the py4j JVM. This is a developer feature intended for use in + customizing how pyspark interacts with the py4j JVM (e.g., capturing + stdout/stderr). :return: """ if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"] + # Process already exists + proc = None else: SPARK_HOME = _find_spark_home() # Launch the Py4j gateway using Spark's run command so that we pick up the @@ -75,15 +81,20 @@ def launch_gateway(conf=None): env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file # Launch the Java gateway. + popen_kwargs = {} if popen_kwargs is None else popen_kwargs # We open a pipe to stdin so that the Java gateway can die when the pipe is broken + popen_kwargs['stdin'] = PIPE + # We always set the necessary environment variables. + popen_kwargs['env'] = env if not on_windows: # Don't send ctrl-c / SIGINT to the Java gateway: def preexec_func(): signal.signal(signal.SIGINT, signal.SIG_IGN) - proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env) + popen_kwargs['preexec_fn'] = preexec_func + proc = Popen(command, **popen_kwargs) else: # preexec_fn not supported on Windows - proc = Popen(command, stdin=PIPE, env=env) + proc = Popen(command, **popen_kwargs) # Wait for the file to appear, or for the process to exit, whichever happens first. while not proc.poll() and not os.path.isfile(conn_info_file): @@ -118,6 +129,8 @@ def launch_gateway(conf=None): gateway = JavaGateway( gateway_parameters=GatewayParameters(port=gateway_port, auth_token=gateway_secret, auto_convert=True)) + # Store a reference to the Popen object for use by the caller (e.g., in reading stdout/stderr) + gateway.proc = proc # Import the classes used by PySpark java_import(gateway.jvm, "org.apache.spark.SparkConf") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org