This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d62f992b167 disable links for beam provider operators using runner
(#55248)
d62f992b167 is described below
commit d62f992b167d7f401085952d4687002a280e1b2f
Author: olegkachur-e <[email protected]>
AuthorDate: Fri Sep 12 16:51:24 2025 +0000
disable links for beam provider operators using runner (#55248)
Co-authored-by: Oleg Kachur <[email protected]>
---
.../providers/apache/beam/operators/beam.py | 7 ++++++
.../tests/unit/apache/beam/operators/test_beam.py | 28 ++++++++++++++++++++++
2 files changed, 35 insertions(+)
diff --git
a/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
b/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
index f04931711e8..8d4d893c919 100644
--- a/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
+++ b/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
@@ -399,6 +399,10 @@ class
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
if not self.beam_hook:
raise AirflowException("Beam hook is not defined.")
+ if self.runner.lower() != BeamRunnerType.DataflowRunner.lower():
+ # Links are rendered only for dataflow runner
+ self.operator_extra_links = ()
+
if self.deferrable and not self.is_dataflow:
self.defer(
trigger=BeamPythonPipelineTrigger(
@@ -577,6 +581,9 @@ class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
) = self._init_pipeline_options()
if not self.beam_hook:
raise AirflowException("Beam hook is not defined.")
+ if self.runner.lower() != BeamRunnerType.DataflowRunner.lower():
+ # Links are rendered only for dataflow runner
+ self.operator_extra_links = ()
if self.deferrable and not self.is_dataflow:
self.defer(
diff --git
a/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
b/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
index b09cbfef759..626800de7ca 100644
--- a/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
+++ b/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
@@ -192,6 +192,19 @@ class TestBeamRunPythonPipelineOperator:
py_system_site_packages=False,
)
+ @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
+ @mock.patch(BEAM_OPERATOR_PATH.format("GCSHook"))
+ def test_direct_runner_no_op_extra_links(self, gcs_hook, beam_hook_mock,
py_options):
+ """Test there is no operator_extra_links when running pipeline with
direct runner type."""
+ start_python_hook = beam_hook_mock.return_value.start_python_pipeline
+ op = BeamRunPythonPipelineOperator(**self.default_op_kwargs)
+ op.execute({})
+
+ beam_hook_mock.assert_called_once_with(runner=DEFAULT_RUNNER)
+ start_python_hook.assert_called_once()
+
+ assert not op.operator_extra_links
+
@mock.patch(BEAM_OPERATOR_PATH.format("DataflowJobLink.persist"))
@mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
@mock.patch(BEAM_OPERATOR_PATH.format("DataflowHook"))
@@ -403,6 +416,21 @@ class TestBeamRunJavaPipelineOperator:
job_class=JOB_CLASS,
)
+ @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
+ @mock.patch(BEAM_OPERATOR_PATH.format("GCSHook"))
+ def test_direct_runner_no_op_extra_links(
+ self, gcs_hook, beam_hook_mock, default_options, pipeline_options
+ ):
+ """Test there is no operator_extra_links when running pipeline with
direct runner type."""
+ start_java_hook = beam_hook_mock.return_value.start_java_pipeline
+ op = BeamRunJavaPipelineOperator(**self.default_op_kwargs)
+
+ op.execute({})
+
+ beam_hook_mock.assert_called_once_with(runner=DEFAULT_RUNNER)
+ start_java_hook.assert_called_once()
+ assert not op.operator_extra_links
+
@mock.patch(BEAM_OPERATOR_PATH.format("DataflowJobLink.persist"))
@mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
@mock.patch(BEAM_OPERATOR_PATH.format("DataflowHook"))