This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d62f992b167 disable links for beam provider operators using  runner 
(#55248)
d62f992b167 is described below

commit d62f992b167d7f401085952d4687002a280e1b2f
Author: olegkachur-e <[email protected]>
AuthorDate: Fri Sep 12 16:51:24 2025 +0000

    disable links for beam provider operators using  runner (#55248)
    
    Co-authored-by: Oleg Kachur <[email protected]>
---
 .../providers/apache/beam/operators/beam.py        |  7 ++++++
 .../tests/unit/apache/beam/operators/test_beam.py  | 28 ++++++++++++++++++++++
 2 files changed, 35 insertions(+)

diff --git 
a/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py 
b/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
index f04931711e8..8d4d893c919 100644
--- a/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
+++ b/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
@@ -399,6 +399,10 @@ class 
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
         if not self.beam_hook:
             raise AirflowException("Beam hook is not defined.")
 
+        if self.runner.lower() != BeamRunnerType.DataflowRunner.lower():
+            # Links are rendered only for dataflow runner
+            self.operator_extra_links = ()
+
         if self.deferrable and not self.is_dataflow:
             self.defer(
                 trigger=BeamPythonPipelineTrigger(
@@ -577,6 +581,9 @@ class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
         ) = self._init_pipeline_options()
         if not self.beam_hook:
             raise AirflowException("Beam hook is not defined.")
+        if self.runner.lower() != BeamRunnerType.DataflowRunner.lower():
+            # Links are rendered only for dataflow runner
+            self.operator_extra_links = ()
 
         if self.deferrable and not self.is_dataflow:
             self.defer(
diff --git 
a/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py 
b/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
index b09cbfef759..626800de7ca 100644
--- a/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
+++ b/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py
@@ -192,6 +192,19 @@ class TestBeamRunPythonPipelineOperator:
             py_system_site_packages=False,
         )
 
+    @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
+    @mock.patch(BEAM_OPERATOR_PATH.format("GCSHook"))
+    def test_direct_runner_no_op_extra_links(self, gcs_hook, beam_hook_mock, 
py_options):
+        """Test there is no operator_extra_links when running pipeline with 
direct runner type."""
+        start_python_hook = beam_hook_mock.return_value.start_python_pipeline
+        op = BeamRunPythonPipelineOperator(**self.default_op_kwargs)
+        op.execute({})
+
+        beam_hook_mock.assert_called_once_with(runner=DEFAULT_RUNNER)
+        start_python_hook.assert_called_once()
+
+        assert not op.operator_extra_links
+
     @mock.patch(BEAM_OPERATOR_PATH.format("DataflowJobLink.persist"))
     @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
     @mock.patch(BEAM_OPERATOR_PATH.format("DataflowHook"))
@@ -403,6 +416,21 @@ class TestBeamRunJavaPipelineOperator:
             job_class=JOB_CLASS,
         )
 
+    @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
+    @mock.patch(BEAM_OPERATOR_PATH.format("GCSHook"))
+    def test_direct_runner_no_op_extra_links(
+        self, gcs_hook, beam_hook_mock, default_options, pipeline_options
+    ):
+        """Test there is no operator_extra_links when running pipeline with 
direct runner type."""
+        start_java_hook = beam_hook_mock.return_value.start_java_pipeline
+        op = BeamRunJavaPipelineOperator(**self.default_op_kwargs)
+
+        op.execute({})
+
+        beam_hook_mock.assert_called_once_with(runner=DEFAULT_RUNNER)
+        start_java_hook.assert_called_once()
+        assert not op.operator_extra_links
+
     @mock.patch(BEAM_OPERATOR_PATH.format("DataflowJobLink.persist"))
     @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
     @mock.patch(BEAM_OPERATOR_PATH.format("DataflowHook"))

Reply via email to