This is an automated email from the ASF dual-hosted git repository.

mikhail pushed a commit to branch release-2.17.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.17.0 by this push:
     new 9f32479  [BEAM-8835] Disable Flink Uber Jar by default. (#10270)
     new 7148e2e  Merge pull request #10274 from robertwb/release-2.17.0
9f32479 is described below

commit 9f32479944d36a41e73f40f72fce2c9ef536439b
Author: Robert Bradshaw <rober...@google.com>
AuthorDate: Tue Dec 3 13:50:58 2019 -0800

    [BEAM-8835] Disable Flink Uber Jar by default. (#10270)
---
 .../apache_beam/runners/portability/flink_runner.py    | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/flink_runner.py 
b/sdks/python/apache_beam/runners/portability/flink_runner.py
index 296c7bc..c438dfc 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner.py
@@ -46,14 +46,19 @@ class FlinkRunner(portable_runner.PortableRunner):
     flink_master = self.add_http_scheme(
         options.view_as(FlinkRunnerOptions).flink_master)
     options.view_as(FlinkRunnerOptions).flink_master = flink_master
-    if flink_master in MAGIC_HOST_NAMES or sys.version_info < (3, 6):
-      return job_server.StopOnExitJobServer(FlinkJarJobServer(options))
-    else:
+    if (options.view_as(FlinkRunnerOptions).flink_submit_uber_jar
+        and flink_master not in MAGIC_HOST_NAMES):
+      if sys.version_info < (3, 6):
+        raise ValueError(
+            'flink_submit_uber_jar requires Python 3.6+, current version %s'
+            % sys.version)
       # This has to be changed [auto], otherwise we will attempt to submit a
       # the pipeline remotely on the Flink JobMaster which will _fail_.
       # DO NOT CHANGE the following line, unless you have tested this.
       options.view_as(FlinkRunnerOptions).flink_master = '[auto]'
       return flink_uber_jar_job_server.FlinkUberJarJobServer(flink_master)
+    else:
+      return job_server.StopOnExitJobServer(FlinkJarJobServer(options))
 
   @staticmethod
   def add_http_scheme(flink_master):
@@ -84,6 +89,13 @@ class FlinkRunnerOptions(pipeline_options.PipelineOptions):
     parser.add_argument('--flink_job_server_jar',
                         help='Path or URL to a flink jobserver jar.')
     parser.add_argument('--artifacts_dir', default=None)
+    parser.add_argument('--flink_submit_uber_jar',
+                        default=False,
+                        action='store_true',
+                        help='Create and upload an uberjar to the flink master'
+                             ' directly, rather than starting up a job server.'
+                             ' Only applies when flink_master is set to a'
+                             ' cluster address.  Requires Python 3.6+.')
 
 
 class FlinkJarJobServer(job_server.JavaJarJobServer):

Reply via email to