This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 17810a69a73 Fix dataflow java streaming infinite run (#55209)
17810a69a73 is described below
commit 17810a69a7368564a97517b2277f17130eb48802
Author: olegkachur-e <[email protected]>
AuthorDate: Fri Sep 12 16:44:04 2025 +0000
Fix dataflow java streaming infinite run (#55209)
- With previous implementation operator might end up with infinite
execution: if the streaming job with the same prefix is already
running, and the new one submitted with 'check_if_running=True' (by
default).
Co-authored-by: Oleg Kachur <[email protected]>
---
.../providers/apache/beam/operators/beam.py | 10 ++++++++++
.../tests/unit/apache/beam/operators/test_beam.py | 23 ++++++++++++++++++++++
.../dataflow/example_dataflow_java_streaming.py | 8 ++++----
3 files changed, 37 insertions(+), 4 deletions(-)
diff --git
a/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
b/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
index bba1af0d7e5..f04931711e8 100644
--- a/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
+++ b/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
@@ -620,6 +620,16 @@ class
BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
variables=self.pipeline_options,
location=self.dataflow_config.location,
)
+ if is_running and self.pipeline_options.get("streaming"):
+ self.log.warning(
+ "Stop execution, as dataflow streaming job name: %s is
found in a state: RUNNING. "
+ "If you want to submit a new job, please pass the dataflow
config option"
+ " check_if_running=False or another unique job_name.",
+ self.dataflow_job_name,
+ )
+ # Since there is no way to get job_id, skip link construction.
+ self.operator_extra_links = ()
+ return {"dataflow_job_id": None}
if not is_running:
self.pipeline_options["jobName"] = self.dataflow_job_name
diff --git
a/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
b/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
index c517517c299..b09cbfef759 100644
--- a/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
+++ b/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
@@ -494,6 +494,29 @@ class TestBeamRunJavaPipelineOperator:
dataflow_cancel_job.assert_not_called()
+ @mock.patch(BEAM_OPERATOR_PATH.format("DataflowJobLink.persist"))
+ @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
+ @mock.patch(BEAM_OPERATOR_PATH.format("DataflowHook"))
+ @mock.patch(BEAM_OPERATOR_PATH.format("GCSHook"))
+ def test_dataflow_streaming_not_stuck(
+ self, gcs_hook, dataflow_hook_mock, beam_hook_mock, persist_link_mock
+ ):
+ """Check that start java streaming pipeline does not enter infinite
loop,
+ when streaming pipeline with the same prefix is already running and
check_is_running=True"""
+ dataflow_config = DataflowConfiguration()
+ op_kwargs = copy.deepcopy(self.default_op_kwargs)
+ op_kwargs["pipeline_options"]["streaming"] = True
+ dataflow_hook_mock.return_value.is_job_dataflow_running.return_value =
True
+ start_java_mock = beam_hook_mock.return_value.start_java_pipeline
+
+ op = BeamRunJavaPipelineOperator(
+ **op_kwargs, dataflow_config=dataflow_config,
runner="DataflowRunner"
+ )
+ res = op.execute({})
+
+ start_java_mock.assert_not_called()
+ assert res == {"dataflow_job_id": None}
+
class TestBeamRunGoPipelineOperator:
@pytest.fixture(autouse=True)
diff --git
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
index ca054a1ed07..5a278f4ff8e 100644
---
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
+++
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
@@ -129,7 +129,7 @@ with DAG(
"streaming": True,
},
dataflow_config={
- "job_name": f"java-streaming-job-{ENV_ID}",
+ "job_name": f"java-streaming-job-def-{ENV_ID}",
"location": LOCATION,
},
deferrable=True,
@@ -164,13 +164,13 @@ with DAG(
# TEST SETUP
create_bucket
>> download_file
- >> create_output_pub_sub_topic
- >> create_output_pub_sub_topic_2
+ >> [create_output_pub_sub_topic, create_output_pub_sub_topic_2]
# TEST BODY
>> start_java_streaming_job_dataflow
+ >> stop_dataflow_job
>> start_java_streaming_job_dataflow_def
+ >> stop_dataflow_job_deferrable
# TEST TEARDOWN
- >> [stop_dataflow_job, stop_dataflow_job_deferrable]
>> delete_topic
>> delete_topic_2
>> delete_bucket