This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dcd08a1  [BEAM-12439] Reuse Java job servers in spark_runner.py.
     new f07d2a2  Merge pull request #14941 from ibzib/BEAM-12439
dcd08a1 is described below

commit dcd08a1d0f5606ad492e3e1f5425a81d706c9570
Author: Kyle Weaver <kcwea...@google.com>
AuthorDate: Thu Jun 3 15:36:18 2021 -0700

    [BEAM-12439] Reuse Java job servers in spark_runner.py.
---
 .../portability/spark_java_job_server_test.py      | 65 ++++++++++++++++++++++
 .../runners/portability/spark_runner.py            | 14 ++++-
 2 files changed, 78 insertions(+), 1 deletion(-)

diff --git 
a/sdks/python/apache_beam/runners/portability/spark_java_job_server_test.py 
b/sdks/python/apache_beam/runners/portability/spark_java_job_server_test.py
new file mode 100644
index 0000000..50490d9
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/spark_java_job_server_test.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# pytype: skip-file
+
+import logging
+import unittest
+
+from apache_beam.options import pipeline_options
+from apache_beam.runners.portability.spark_runner import SparkRunner
+
+
+class SparkTestPipelineOptions(pipeline_options.PipelineOptions):
+  def view_as(self, cls):
+    # Ensure only SparkRunnerOptions and JobServerOptions are used when calling
+    # default_job_server. If other options classes are needed, the cache key
+    # must include them to prevent incorrect hits.
+    assert (
+        cls is pipeline_options.SparkRunnerOptions or
+        cls is pipeline_options.JobServerOptions)
+    return super().view_as(cls)
+
+
+class SparkJavaJobServerTest(unittest.TestCase):
+  def test_job_server_cache(self):
+    # Multiple SparkRunner instances may be created, so we need to make sure we
+    # cache job servers across runner instances.
+
+    # Most pipeline-specific options, such as sdk_worker_parallelism, don't
+    # affect job server configuration, so it is ok to ignore them for caching.
+    job_server1 = SparkRunner().default_job_server(
+        SparkTestPipelineOptions(['--sdk_worker_parallelism=1']))
+    job_server2 = SparkRunner().default_job_server(
+        SparkTestPipelineOptions(['--sdk_worker_parallelism=2']))
+    self.assertIs(job_server2, job_server1)
+
+    # JobServerOptions and SparkRunnerOptions do affect job server
+    # configuration, so using different pipeline options gives us a different
+    # job server.
+    job_server3 = SparkRunner().default_job_server(
+        SparkTestPipelineOptions(['--job_port=1234']))
+    self.assertIsNot(job_server3, job_server1)
+
+    job_server4 = SparkRunner().default_job_server(
+        
SparkTestPipelineOptions(['--spark_master_url=spark://localhost:5678']))
+    self.assertIsNot(job_server4, job_server1)
+    self.assertIsNot(job_server4, job_server3)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/runners/portability/spark_runner.py 
b/sdks/python/apache_beam/runners/portability/spark_runner.py
index fb5608c..1145507 100644
--- a/sdks/python/apache_beam/runners/portability/spark_runner.py
+++ b/sdks/python/apache_beam/runners/portability/spark_runner.py
@@ -31,6 +31,10 @@ from apache_beam.runners.portability import 
spark_uber_jar_job_server
 # https://spark.apache.org/docs/latest/submitting-applications.html#master-urls
 LOCAL_MASTER_PATTERN = r'^local(\[.+\])?$'
 
+# Since Java job servers are heavyweight external processes, cache them.
+# This applies only to SparkJarJobServer, not SparkUberJarJobServer.
+JOB_SERVER_CACHE = {}
+
 
 class SparkRunner(portable_runner.PortableRunner):
   def run_pipeline(self, pipeline, options):
@@ -49,7 +53,15 @@ class SparkRunner(portable_runner.PortableRunner):
         raise ValueError('Option spark_rest_url must be set.')
       return spark_uber_jar_job_server.SparkUberJarJobServer(
           spark_options.spark_rest_url, options)
-    return job_server.StopOnExitJobServer(SparkJarJobServer(options))
+    # Use Java job server by default.
+    # Only SparkRunnerOptions and JobServerOptions affect job server
+    # configuration, so concat those as the cache key.
+    job_server_options = options.view_as(pipeline_options.JobServerOptions)
+    options_str = str(spark_options) + str(job_server_options)
+    if not options_str in JOB_SERVER_CACHE:
+      JOB_SERVER_CACHE[options_str] = job_server.StopOnExitJobServer(
+          SparkJarJobServer(options))
+    return JOB_SERVER_CACHE[options_str]
 
   def create_job_service_handle(self, job_service, options):
     return portable_runner.JobServiceHandle(

Reply via email to