This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch revert-15140-streaming-v2
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8fba88e4f1bde48a58c8052d121d2e737fc635b1
Author: Ankur <angoe...@users.noreply.github.com>
AuthorDate: Fri Jul 23 10:40:07 2021 -0700

    Revert "Default to Runner v2 for Python Streaming jobs. (#15140)"
    
    This reverts commit 09d4fab4cba974edaecd6e9c603f1e2a2855e56b.
---
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py   |  8 +-------
 .../apache_beam/runners/dataflow/dataflow_runner_test.py      | 11 ++---------
 2 files changed, 3 insertions(+), 16 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 0ac8c5f..c112bdd 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -594,15 +594,9 @@ class DataflowRunner(PipelineRunner):
     return result
 
   def _maybe_add_unified_worker_missing_options(self, options):
-    debug_options = options.view_as(DebugOptions)
-    # Streaming is always portable, default to runner v2.
-    if options.view_as(StandardOptions).streaming:
-      if not debug_options.lookup_experiment('disable_runner_v2'):
-        debug_options.add_experiment('beam_fn_api')
-        debug_options.add_experiment('use_runner_v2')
-        debug_options.add_experiment('use_portable_job_submission')
     # set default beam_fn_api experiment if use unified
     # worker experiment flag exists, no-op otherwise.
+    debug_options = options.view_as(DebugOptions)
     from apache_beam.runners.dataflow.internal import apiclient
     if apiclient._use_unified_worker(options):
       if not debug_options.lookup_experiment('beam_fn_api'):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 9c7546d..ec4edc9 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -256,7 +256,6 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
   def test_streaming_create_translation(self):
     remote_runner = DataflowRunner()
     self.default_properties.append("--streaming")
-    self.default_properties.append("--experiments=disable_runner_v2")
     with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as 
p:
       p | ptransform.Create([1])  # pylint: disable=expression-not-assigned
     job_dict = json.loads(str(remote_runner.job))
@@ -839,8 +838,7 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
         'Runner determined sharding not available in Dataflow for '
         'GroupIntoBatches for jobs not using Runner V2'):
       _ = self._run_group_into_batches_and_get_step_properties(
-          True,
-          ['--enable_streaming_engine', '--experiments=disable_runner_v2'])
+          True, ['--enable_streaming_engine'])
 
     # JRH
     with self.assertRaisesRegex(
@@ -848,12 +846,7 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
         'Runner determined sharding not available in Dataflow for '
         'GroupIntoBatches for jobs not using Runner V2'):
       _ = self._run_group_into_batches_and_get_step_properties(
-          True,
-          [
-              '--enable_streaming_engine',
-              '--experiments=beam_fn_api',
-              '--experiments=disable_runner_v2'
-          ])
+          True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])
 
   def test_pack_combiners(self):
     class PackableCombines(beam.PTransform):

Reply via email to