This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 87f8ff1  [BEAM-9290] Support runner_harness_container_image in 
released python sdks.
     new cec1094  Merge pull request #10827 from 
angoenka/runner_harness_image_dev
87f8ff1 is described below

commit 87f8ff1996a5d7a787968516234f32837d25135d
Author: Ankur Goenka <ankurgoe...@gmail.com>
AuthorDate: Mon Feb 10 17:28:20 2020 -0800

    [BEAM-9290] Support runner_harness_container_image in released python sdks.
---
 .../runners/dataflow/internal/apiclient.py         | 11 +++++----
 .../runners/dataflow/internal/apiclient_test.py    | 26 +++++++++++++++++++++-
 2 files changed, 32 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 878dc55..b8e9caf 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -184,12 +184,15 @@ class Environment(object):
     ])
     # TODO: Use enumerated type instead of strings for job types.
     if job_type.startswith('FNAPI_'):
+      self.debug_options = self.debug_options or DebugOptions()
       self.debug_options.experiments = self.debug_options.experiments or []
+      if not self.debug_options.lookup_experiment(
+          'runner_harness_container_image'):
+        runner_harness_override = (get_runner_harness_container_image())
+        if runner_harness_override:
+          self.debug_options.add_experiment(
+              'runner_harness_container_image=' + runner_harness_override)
       debug_options_experiments = self.debug_options.experiments
-      runner_harness_override = (get_runner_harness_container_image())
-      if runner_harness_override:
-        debug_options_experiments.append(
-            'runner_harness_container_image=' + runner_harness_override)
       # Add use_multiple_sdk_containers flag if it's not already present. Do 
not
       # add the flag if 'no_use_multiple_sdk_containers' is present.
       # TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index faaccc4..ee643a2 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -318,7 +318,7 @@ class UtilTest(unittest.TestCase):
       'apache_beam.runners.dataflow.internal.apiclient.'
       'beam_version.__version__',
       '2.2.0')
-  def test_harness_override_present_in_released_sdks(self):
+  def test_harness_override_default_in_released_sdks(self):
     pipeline_options = PipelineOptions(
         ['--temp_location', 'gs://any-location/temp', '--streaming'])
     override = ''.join([
@@ -335,6 +335,30 @@ class UtilTest(unittest.TestCase):
   @mock.patch(
       'apache_beam.runners.dataflow.internal.apiclient.'
       'beam_version.__version__',
+      '2.2.0')
+  def test_harness_override_custom_in_released_sdks(self):
+    pipeline_options = PipelineOptions([
+        '--temp_location',
+        'gs://any-location/temp',
+        '--streaming',
+        '--experiments=runner_harness_container_image=fake_image'
+    ])
+    env = apiclient.Environment([], #packages
+                                pipeline_options,
+                                '2.0.0', #any environment version
+                                FAKE_PIPELINE_URL)
+    self.assertEqual(
+        1,
+        len([
+            x for x in env.proto.experiments
+            if x.startswith('runner_harness_container_image=')
+        ]))
+    self.assertIn(
+        'runner_harness_container_image=fake_image', env.proto.experiments)
+
+  @mock.patch(
+      'apache_beam.runners.dataflow.internal.apiclient.'
+      'beam_version.__version__',
       '2.2.0.rc1')
   def test_harness_override_uses_base_version_in_rc_releases(self):
     pipeline_options = PipelineOptions(

Reply via email to