This is an automated email from the ASF dual-hosted git repository. turbaszek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new a7e144b Google Dataflow Hook to handle no Job Type (#14914) a7e144b is described below commit a7e144bec855f6ccf0fa5ae8447894195ffe170f Author: Tobiasz Kędzierski <tobiaszkedzier...@gmail.com> AuthorDate: Tue Mar 23 19:48:42 2021 +0100 Google Dataflow Hook to handle no Job Type (#14914) Co-authored-by: Tomek Urbaszek <turbas...@gmail.com> --- airflow/providers/google/cloud/hooks/dataflow.py | 2 +- .../providers/google/cloud/hooks/test_dataflow.py | 28 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/hooks/dataflow.py b/airflow/providers/google/cloud/hooks/dataflow.py index f0986e6..7c53507 100644 --- a/airflow/providers/google/cloud/hooks/dataflow.py +++ b/airflow/providers/google/cloud/hooks/dataflow.py @@ -404,7 +404,7 @@ class _DataflowJobsController(LoggingMixin): :raise: Exception """ if self._wait_until_finished is None: - wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMING + wait_for_running = job.get('type') == DataflowJobType.JOB_TYPE_STREAMING else: wait_for_running = not self._wait_until_finished diff --git a/tests/providers/google/cloud/hooks/test_dataflow.py b/tests/providers/google/cloud/hooks/test_dataflow.py index 7ceef1f..03d5ce3 100644 --- a/tests/providers/google/cloud/hooks/test_dataflow.py +++ b/tests/providers/google/cloud/hooks/test_dataflow.py @@ -1416,6 +1416,34 @@ class TestDataflowJob(unittest.TestCase): # fmt: off @parameterized.expand([ + # RUNNING + (DataflowJobStatus.JOB_STATE_RUNNING, None, False), + (DataflowJobStatus.JOB_STATE_RUNNING, True, False), + (DataflowJobStatus.JOB_STATE_RUNNING, False, True), + # AWAITING STATE + (DataflowJobStatus.JOB_STATE_PENDING, None, False), + (DataflowJobStatus.JOB_STATE_PENDING, True, False), + (DataflowJobStatus.JOB_STATE_PENDING, False, True), + ]) + # fmt: on + def test_check_dataflow_job_state_without_job_type(self, job_state, wait_until_finished, expected_result): + job = {"id": "id-2", "name": "name-2", "currentState": job_state} + dataflow_job = _DataflowJobsController( + dataflow=self.mock_dataflow, + project_number=TEST_PROJECT, + name="name-", + location=TEST_LOCATION, + poll_sleep=0, + job_id=None, + num_retries=20, + multiple_jobs=True, + wait_until_finished=wait_until_finished, + ) + result = dataflow_job._check_dataflow_job_state(job) + assert result == expected_result + + # fmt: off + @parameterized.expand([ (DataflowJobType.JOB_TYPE_BATCH, DataflowJobStatus.JOB_STATE_FAILED, "Google Cloud Dataflow job name-2 has failed\\."), (DataflowJobType.JOB_TYPE_STREAMING, DataflowJobStatus.JOB_STATE_FAILED,