This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch revert-15140-streaming-v2 in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8fba88e4f1bde48a58c8052d121d2e737fc635b1 Author: Ankur <angoe...@users.noreply.github.com> AuthorDate: Fri Jul 23 10:40:07 2021 -0700 Revert "Default to Runner v2 for Python Streaming jobs. (#15140)" This reverts commit 09d4fab4cba974edaecd6e9c603f1e2a2855e56b. --- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 8 +------- .../apache_beam/runners/dataflow/dataflow_runner_test.py | 11 ++--------- 2 files changed, 3 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 0ac8c5f..c112bdd 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -594,15 +594,9 @@ class DataflowRunner(PipelineRunner): return result def _maybe_add_unified_worker_missing_options(self, options): - debug_options = options.view_as(DebugOptions) - # Streaming is always portable, default to runner v2. - if options.view_as(StandardOptions).streaming: - if not debug_options.lookup_experiment('disable_runner_v2'): - debug_options.add_experiment('beam_fn_api') - debug_options.add_experiment('use_runner_v2') - debug_options.add_experiment('use_portable_job_submission') # set default beam_fn_api experiment if use unified # worker experiment flag exists, no-op otherwise. + debug_options = options.view_as(DebugOptions) from apache_beam.runners.dataflow.internal import apiclient if apiclient._use_unified_worker(options): if not debug_options.lookup_experiment('beam_fn_api'): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 9c7546d..ec4edc9 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -256,7 +256,6 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin): def test_streaming_create_translation(self): remote_runner = DataflowRunner() self.default_properties.append("--streaming") - self.default_properties.append("--experiments=disable_runner_v2") with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p: p | ptransform.Create([1]) # pylint: disable=expression-not-assigned job_dict = json.loads(str(remote_runner.job)) @@ -839,8 +838,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin): 'Runner determined sharding not available in Dataflow for ' 'GroupIntoBatches for jobs not using Runner V2'): _ = self._run_group_into_batches_and_get_step_properties( - True, - ['--enable_streaming_engine', '--experiments=disable_runner_v2']) + True, ['--enable_streaming_engine']) # JRH with self.assertRaisesRegex( @@ -848,12 +846,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin): 'Runner determined sharding not available in Dataflow for ' 'GroupIntoBatches for jobs not using Runner V2'): _ = self._run_group_into_batches_and_get_step_properties( - True, - [ - '--enable_streaming_engine', - '--experiments=beam_fn_api', - '--experiments=disable_runner_v2' - ]) + True, ['--enable_streaming_engine', '--experiments=beam_fn_api']) def test_pack_combiners(self): class PackableCombines(beam.PTransform):