This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch revert-30706-rollback-30533
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 900bf354dcff58b76e9eba36f3c5f3df3ec61e35
Author: Danny McCormick <[email protected]>
AuthorDate: Mon Apr 8 16:57:14 2024 -0400

    Revert "Revert "[BEAM-30531] Automatically execute unbounded pipelines in 
str…"
    
    This reverts commit e207a147c55ae7251f116183270f92e53b9dc350.
---
 CHANGES.md                                         |  1 +
 .../runners/dataflow/dataflow_runner.py            | 20 +++++++
 .../runners/dataflow/dataflow_runner_test.py       | 61 ++++++++++++++++++++++
 3 files changed, 82 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index 5824c71a98d..1769a51b8a8 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -118,6 +118,7 @@
 * Merged sdks/java/fn-execution and runners/core-construction-java into the 
main SDK. These artifacts were never meant for users, but noting
   that they no longer exist. These are steps to bring portability into the 
core SDK alongside all other core functionality.
 * Added Vertex AI Feature Store handler for Enrichment transform (Python) 
([#30388](https://github.com/apache/beam/pull/30388))
+* Python Dataflow users no longer need to manually specify --streaming for 
pipelines using unbounded sources such as ReadFromPubSub.
 
 ## Breaking Changes
 
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index e428551ef02..b3d64a49b5f 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -42,6 +42,7 @@ from apache_beam.options.pipeline_options import TestOptions
 from apache_beam.options.pipeline_options import TypeOptions
 from apache_beam.options.pipeline_options import WorkerOptions
 from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.common import group_by_key_input_visitor
 from apache_beam.runners.common import merge_common_environments
 from apache_beam.runners.dataflow.internal.clients import dataflow as 
dataflow_api
@@ -415,6 +416,12 @@ class DataflowRunner(PipelineRunner):
       self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
           return_context=True, default_environment=self._default_environment)
 
+    if any(pcoll.is_bounded == beam_runner_api_pb2.IsBounded.UNBOUNDED
+           for pcoll in self.proto_pipeline.components.pcollections.values()):
+      options.view_as(StandardOptions).streaming = True
+    if options.view_as(StandardOptions).streaming:
+      _check_and_add_missing_streaming_options(options)
+
     # Dataflow can only handle Docker environments.
     for env_id, env in self.proto_pipeline.components.environments.items():
       self.proto_pipeline.components.environments[env_id].CopyFrom(
@@ -473,6 +480,7 @@ class DataflowRunner(PipelineRunner):
     if test_options.dry_run:
       result = PipelineResult(PipelineState.DONE)
       result.wait_until_finish = lambda duration=None: None
+      result.job = self.job
       return result
 
     # Get a Dataflow API client and set its options
@@ -598,9 +606,21 @@ def _check_and_add_missing_options(options):
         "an SDK preinstalled in the default Dataflow dev runtime environment "
         "or in a custom container image, use --sdk_location=container.")
 
+
+def _check_and_add_missing_streaming_options(options):
+  # Type: (PipelineOptions) -> None
+
+  """Validates and adds missing pipeline options depending on options set.
+
+  Must be called after it has been determined whether we're running in
+  streaming mode.
+
+  :param options: PipelineOptions for this pipeline.
+  """
   # Streaming only supports using runner v2 (aka unified worker).
   # Runner v2 only supports using streaming engine (aka windmill service)
   if options.view_as(StandardOptions).streaming:
+    debug_options = options.view_as(DebugOptions)
     google_cloud_options = options.view_as(GoogleCloudOptions)
     if (not google_cloud_options.enable_streaming_engine and
         (debug_options.lookup_experiment("enable_windmill_service") or
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index bef184c45c4..b5568305ce6 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -39,6 +39,7 @@ from apache_beam.runners import create_runner
 from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
 from apache_beam.runners.dataflow.dataflow_runner import 
DataflowRuntimeException
 from apache_beam.runners.dataflow.dataflow_runner import 
_check_and_add_missing_options
+from apache_beam.runners.dataflow.dataflow_runner import 
_check_and_add_missing_streaming_options
 from apache_beam.runners.dataflow.internal.clients import dataflow as 
dataflow_api
 from apache_beam.runners.runner import PipelineState
 from apache_beam.testing.extra_assertions import ExtraAssertionsMixin
@@ -523,6 +524,7 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
   def test_streaming_is_runner_v2(self):
     options = PipelineOptions(['--sdk_location=container', '--streaming'])
     _check_and_add_missing_options(options)
+    _check_and_add_missing_streaming_options(options)
     for expected in ['beam_fn_api',
                      'use_unified_worker',
                      'use_runner_v2',
@@ -554,6 +556,7 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
         '--dataflow_service_options=enable_prime'
     ])
     _check_and_add_missing_options(options)
+    _check_and_add_missing_streaming_options(options)
     for expected in ['beam_fn_api',
                      'use_unified_worker',
                      'use_runner_v2',
@@ -564,6 +567,64 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
           options.view_as(DebugOptions).lookup_experiment(expected, False),
           expected)
 
+  @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
+  @mock.patch(
+      'apache_beam.options.pipeline_options.GoogleCloudOptions.validate',
+      lambda *args: [])
+  def test_auto_streaming_with_unbounded(self):
+    options = PipelineOptions([
+        '--sdk_location=container',
+        '--runner=DataflowRunner',
+        '--dry_run=True',
+        '--temp_location=gs://bucket',
+        '--project=project',
+        '--region=region'
+    ])
+    with beam.Pipeline(options=options) as p:
+      _ = p | beam.io.ReadFromPubSub('projects/some-project/topics/some-topic')
+    self.assertEqual(
+        p.result.job.proto.type,
+        apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING)
+
+  @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
+  @mock.patch(
+      'apache_beam.options.pipeline_options.GoogleCloudOptions.validate',
+      lambda *args: [])
+  def test_auto_streaming_no_unbounded(self):
+    options = PipelineOptions([
+        '--sdk_location=container',
+        '--runner=DataflowRunner',
+        '--dry_run=True',
+        '--temp_location=gs://bucket',
+        '--project=project',
+        '--region=region'
+    ])
+    with beam.Pipeline(options=options) as p:
+      _ = p | beam.Create([1, 2, 3])
+    self.assertEqual(
+        p.result.job.proto.type,
+        apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH)
+
+  @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
+  @mock.patch(
+      'apache_beam.options.pipeline_options.GoogleCloudOptions.validate',
+      lambda *args: [])
+  def test_explicit_streaming_no_unbounded(self):
+    options = PipelineOptions([
+        '--streaming',
+        '--sdk_location=container',
+        '--runner=DataflowRunner',
+        '--dry_run=True',
+        '--temp_location=gs://bucket',
+        '--project=project',
+        '--region=region'
+    ])
+    with beam.Pipeline(options=options) as p:
+      _ = p | beam.Create([1, 2, 3])
+    self.assertEqual(
+        p.result.job.proto.type,
+        apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING)
+
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to