This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1fe8cddde8 Fix location requirement in 
DataflowTemplatedJobStartOperator (#37069)
1fe8cddde8 is described below

commit 1fe8cddde8159ecffc61f204b8383f5d1ff39047
Author: shardbread <34687386+shardbr...@users.noreply.github.com>
AuthorDate: Mon Feb 5 15:34:33 2024 +0300

    Fix location requirement in DataflowTemplatedJobStartOperator (#37069)
    
    Co-authored-by: tverdokhlib <tverdokh...@google.com>
---
 airflow/providers/google/cloud/operators/dataflow.py    | 5 ++++-
 tests/providers/google/cloud/operators/test_dataflow.py | 7 +++++--
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/dataflow.py 
b/airflow/providers/google/cloud/operators/dataflow.py
index 2b4b787262..7cfc2e6c06 100644
--- a/airflow/providers/google/cloud/operators/dataflow.py
+++ b/airflow/providers/google/cloud/operators/dataflow.py
@@ -676,6 +676,9 @@ class 
DataflowTemplatedJobStartOperator(GoogleCloudBaseOperator):
         options = self.dataflow_default_options
         options.update(self.options)
 
+        if not self.location:
+            self.location = DEFAULT_DATAFLOW_LOCATION
+
         self.job = self.hook.start_template_dataflow(
             job_name=self.job_name,
             variables=options,
@@ -703,7 +706,7 @@ class 
DataflowTemplatedJobStartOperator(GoogleCloudBaseOperator):
             trigger=TemplateJobStartTrigger(
                 project_id=self.project_id,
                 job_id=job_id,
-                location=self.location if self.location else 
DEFAULT_DATAFLOW_LOCATION,
+                location=self.location,
                 gcp_conn_id=self.gcp_conn_id,
                 poll_sleep=self.poll_sleep,
                 impersonation_chain=self.impersonation_chain,
diff --git a/tests/providers/google/cloud/operators/test_dataflow.py 
b/tests/providers/google/cloud/operators/test_dataflow.py
index eacd5fde5b..495287b9af 100644
--- a/tests/providers/google/cloud/operators/test_dataflow.py
+++ b/tests/providers/google/cloud/operators/test_dataflow.py
@@ -24,7 +24,10 @@ from unittest import mock
 import pytest
 
 import airflow
-from airflow.providers.google.cloud.hooks.dataflow import DataflowJobStatus
+from airflow.providers.google.cloud.hooks.dataflow import (
+    DEFAULT_DATAFLOW_LOCATION,
+    DataflowJobStatus,
+)
 from airflow.providers.google.cloud.operators.dataflow import (
     CheckJobRunning,
     DataflowCreateJavaJobOperator,
@@ -554,7 +557,7 @@ class TestDataflowTemplatedJobStartOperator:
         assert dataflow_mock.called
         _, kwargs = dataflow_mock.call_args_list[0]
         assert kwargs["variables"]["region"] == TEST_REGION
-        assert kwargs["location"] is None
+        assert kwargs["location"] == DEFAULT_DATAFLOW_LOCATION
 
     @pytest.mark.db_test
     
@mock.patch("airflow.providers.google.cloud.operators.dataflow.DataflowHook.start_template_dataflow")

Reply via email to