This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 17810a69a73 Fix dataflow java streaming infinite run (#55209)
17810a69a73 is described below

commit 17810a69a7368564a97517b2277f17130eb48802
Author: olegkachur-e <[email protected]>
AuthorDate: Fri Sep 12 16:44:04 2025 +0000

    Fix dataflow java streaming infinite run (#55209)
    
    - With previous implementation operator might end up with infinite
      execution: if the streaming job with the same prefix is already
    running, and the new one submitted with 'check_if_running=True' (by 
default).
    
    Co-authored-by: Oleg Kachur <[email protected]>
---
 .../providers/apache/beam/operators/beam.py        | 10 ++++++++++
 .../tests/unit/apache/beam/operators/test_beam.py  | 23 ++++++++++++++++++++++
 .../dataflow/example_dataflow_java_streaming.py    |  8 ++++----
 3 files changed, 37 insertions(+), 4 deletions(-)

diff --git 
a/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py 
b/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
index bba1af0d7e5..f04931711e8 100644
--- a/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
+++ b/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
@@ -620,6 +620,16 @@ class 
BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
                 variables=self.pipeline_options,
                 location=self.dataflow_config.location,
             )
+            if is_running and self.pipeline_options.get("streaming"):
+                self.log.warning(
+                    "Stop execution, as dataflow streaming job name: %s is 
found in a state: RUNNING. "
+                    "If you want to submit a new job, please pass the dataflow 
config option"
+                    " check_if_running=False or another unique job_name.",
+                    self.dataflow_job_name,
+                )
+                # Since there is no way to get job_id, skip link construction.
+                self.operator_extra_links = ()
+                return {"dataflow_job_id": None}
 
         if not is_running:
             self.pipeline_options["jobName"] = self.dataflow_job_name
diff --git 
a/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py 
b/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
index c517517c299..b09cbfef759 100644
--- a/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
+++ b/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
@@ -494,6 +494,29 @@ class TestBeamRunJavaPipelineOperator:
 
         dataflow_cancel_job.assert_not_called()
 
+    @mock.patch(BEAM_OPERATOR_PATH.format("DataflowJobLink.persist"))
+    @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
+    @mock.patch(BEAM_OPERATOR_PATH.format("DataflowHook"))
+    @mock.patch(BEAM_OPERATOR_PATH.format("GCSHook"))
+    def test_dataflow_streaming_not_stuck(
+        self, gcs_hook, dataflow_hook_mock, beam_hook_mock, persist_link_mock
+    ):
+        """Check that start java streaming pipeline does not enter infinite 
loop,
+        when streaming pipeline with the same prefix is already running and 
check_is_running=True"""
+        dataflow_config = DataflowConfiguration()
+        op_kwargs = copy.deepcopy(self.default_op_kwargs)
+        op_kwargs["pipeline_options"]["streaming"] = True
+        dataflow_hook_mock.return_value.is_job_dataflow_running.return_value = 
True
+        start_java_mock = beam_hook_mock.return_value.start_java_pipeline
+
+        op = BeamRunJavaPipelineOperator(
+            **op_kwargs, dataflow_config=dataflow_config, 
runner="DataflowRunner"
+        )
+        res = op.execute({})
+
+        start_java_mock.assert_not_called()
+        assert res == {"dataflow_job_id": None}
+
 
 class TestBeamRunGoPipelineOperator:
     @pytest.fixture(autouse=True)
diff --git 
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
 
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
index ca054a1ed07..5a278f4ff8e 100644
--- 
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
+++ 
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
@@ -129,7 +129,7 @@ with DAG(
             "streaming": True,
         },
         dataflow_config={
-            "job_name": f"java-streaming-job-{ENV_ID}",
+            "job_name": f"java-streaming-job-def-{ENV_ID}",
             "location": LOCATION,
         },
         deferrable=True,
@@ -164,13 +164,13 @@ with DAG(
         # TEST SETUP
         create_bucket
         >> download_file
-        >> create_output_pub_sub_topic
-        >> create_output_pub_sub_topic_2
+        >> [create_output_pub_sub_topic, create_output_pub_sub_topic_2]
         # TEST BODY
         >> start_java_streaming_job_dataflow
+        >> stop_dataflow_job
         >> start_java_streaming_job_dataflow_def
+        >> stop_dataflow_job_deferrable
         # TEST TEARDOWN
-        >> [stop_dataflow_job, stop_dataflow_job_deferrable]
         >> delete_topic
         >> delete_topic_2
         >> delete_bucket

Reply via email to