This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 1fe8cddde8 Fix location requirement in DataflowTemplatedJobStartOperator (#37069) 1fe8cddde8 is described below commit 1fe8cddde8159ecffc61f204b8383f5d1ff39047 Author: shardbread <34687386+shardbr...@users.noreply.github.com> AuthorDate: Mon Feb 5 15:34:33 2024 +0300 Fix location requirement in DataflowTemplatedJobStartOperator (#37069) Co-authored-by: tverdokhlib <tverdokh...@google.com> --- airflow/providers/google/cloud/operators/dataflow.py | 5 ++++- tests/providers/google/cloud/operators/test_dataflow.py | 7 +++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/airflow/providers/google/cloud/operators/dataflow.py b/airflow/providers/google/cloud/operators/dataflow.py index 2b4b787262..7cfc2e6c06 100644 --- a/airflow/providers/google/cloud/operators/dataflow.py +++ b/airflow/providers/google/cloud/operators/dataflow.py @@ -676,6 +676,9 @@ class DataflowTemplatedJobStartOperator(GoogleCloudBaseOperator): options = self.dataflow_default_options options.update(self.options) + if not self.location: + self.location = DEFAULT_DATAFLOW_LOCATION + self.job = self.hook.start_template_dataflow( job_name=self.job_name, variables=options, @@ -703,7 +706,7 @@ class DataflowTemplatedJobStartOperator(GoogleCloudBaseOperator): trigger=TemplateJobStartTrigger( project_id=self.project_id, job_id=job_id, - location=self.location if self.location else DEFAULT_DATAFLOW_LOCATION, + location=self.location, gcp_conn_id=self.gcp_conn_id, poll_sleep=self.poll_sleep, impersonation_chain=self.impersonation_chain, diff --git a/tests/providers/google/cloud/operators/test_dataflow.py b/tests/providers/google/cloud/operators/test_dataflow.py index eacd5fde5b..495287b9af 100644 --- a/tests/providers/google/cloud/operators/test_dataflow.py +++ b/tests/providers/google/cloud/operators/test_dataflow.py @@ -24,7 +24,10 @@ from unittest import mock import pytest import airflow -from airflow.providers.google.cloud.hooks.dataflow import DataflowJobStatus +from airflow.providers.google.cloud.hooks.dataflow import ( + DEFAULT_DATAFLOW_LOCATION, + DataflowJobStatus, +) from airflow.providers.google.cloud.operators.dataflow import ( CheckJobRunning, DataflowCreateJavaJobOperator, @@ -554,7 +557,7 @@ class TestDataflowTemplatedJobStartOperator: assert dataflow_mock.called _, kwargs = dataflow_mock.call_args_list[0] assert kwargs["variables"]["region"] == TEST_REGION - assert kwargs["location"] is None + assert kwargs["location"] == DEFAULT_DATAFLOW_LOCATION @pytest.mark.db_test @mock.patch("airflow.providers.google.cloud.operators.dataflow.DataflowHook.start_template_dataflow")