This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 87f8ff1 [BEAM-9290] Support runner_harness_container_image in released python sdks. new cec1094 Merge pull request #10827 from angoenka/runner_harness_image_dev 87f8ff1 is described below commit 87f8ff1996a5d7a787968516234f32837d25135d Author: Ankur Goenka <ankurgoe...@gmail.com> AuthorDate: Mon Feb 10 17:28:20 2020 -0800 [BEAM-9290] Support runner_harness_container_image in released python sdks. --- .../runners/dataflow/internal/apiclient.py | 11 +++++---- .../runners/dataflow/internal/apiclient_test.py | 26 +++++++++++++++++++++- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 878dc55..b8e9caf 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -184,12 +184,15 @@ class Environment(object): ]) # TODO: Use enumerated type instead of strings for job types. if job_type.startswith('FNAPI_'): + self.debug_options = self.debug_options or DebugOptions() self.debug_options.experiments = self.debug_options.experiments or [] + if not self.debug_options.lookup_experiment( + 'runner_harness_container_image'): + runner_harness_override = (get_runner_harness_container_image()) + if runner_harness_override: + self.debug_options.add_experiment( + 'runner_harness_container_image=' + runner_harness_override) debug_options_experiments = self.debug_options.experiments - runner_harness_override = (get_runner_harness_container_image()) - if runner_harness_override: - debug_options_experiments.append( - 'runner_harness_container_image=' + runner_harness_override) # Add use_multiple_sdk_containers flag if it's not already present. Do not # add the flag if 'no_use_multiple_sdk_containers' is present. # TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index faaccc4..ee643a2 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -318,7 +318,7 @@ class UtilTest(unittest.TestCase): 'apache_beam.runners.dataflow.internal.apiclient.' 'beam_version.__version__', '2.2.0') - def test_harness_override_present_in_released_sdks(self): + def test_harness_override_default_in_released_sdks(self): pipeline_options = PipelineOptions( ['--temp_location', 'gs://any-location/temp', '--streaming']) override = ''.join([ @@ -335,6 +335,30 @@ class UtilTest(unittest.TestCase): @mock.patch( 'apache_beam.runners.dataflow.internal.apiclient.' 'beam_version.__version__', + '2.2.0') + def test_harness_override_custom_in_released_sdks(self): + pipeline_options = PipelineOptions([ + '--temp_location', + 'gs://any-location/temp', + '--streaming', + '--experiments=runner_harness_container_image=fake_image' + ]) + env = apiclient.Environment([], #packages + pipeline_options, + '2.0.0', #any environment version + FAKE_PIPELINE_URL) + self.assertEqual( + 1, + len([ + x for x in env.proto.experiments + if x.startswith('runner_harness_container_image=') + ])) + self.assertIn( + 'runner_harness_container_image=fake_image', env.proto.experiments) + + @mock.patch( + 'apache_beam.runners.dataflow.internal.apiclient.' + 'beam_version.__version__', '2.2.0.rc1') def test_harness_override_uses_base_version_in_rc_releases(self): pipeline_options = PipelineOptions(