This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new a7e144b  Google Dataflow Hook to handle no Job Type (#14914)
a7e144b is described below

commit a7e144bec855f6ccf0fa5ae8447894195ffe170f
Author: Tobiasz Kędzierski <tobiaszkedzier...@gmail.com>
AuthorDate: Tue Mar 23 19:48:42 2021 +0100

    Google Dataflow Hook to handle no Job Type (#14914)
    
    
    Co-authored-by: Tomek Urbaszek <turbas...@gmail.com>
---
 airflow/providers/google/cloud/hooks/dataflow.py   |  2 +-
 .../providers/google/cloud/hooks/test_dataflow.py  | 28 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/google/cloud/hooks/dataflow.py 
b/airflow/providers/google/cloud/hooks/dataflow.py
index f0986e6..7c53507 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -404,7 +404,7 @@ class _DataflowJobsController(LoggingMixin):
         :raise: Exception
         """
         if self._wait_until_finished is None:
-            wait_for_running = job['type'] == 
DataflowJobType.JOB_TYPE_STREAMING
+            wait_for_running = job.get('type') == 
DataflowJobType.JOB_TYPE_STREAMING
         else:
             wait_for_running = not self._wait_until_finished
 
diff --git a/tests/providers/google/cloud/hooks/test_dataflow.py 
b/tests/providers/google/cloud/hooks/test_dataflow.py
index 7ceef1f..03d5ce3 100644
--- a/tests/providers/google/cloud/hooks/test_dataflow.py
+++ b/tests/providers/google/cloud/hooks/test_dataflow.py
@@ -1416,6 +1416,34 @@ class TestDataflowJob(unittest.TestCase):
 
     # fmt: off
     @parameterized.expand([
+        # RUNNING
+        (DataflowJobStatus.JOB_STATE_RUNNING, None, False),
+        (DataflowJobStatus.JOB_STATE_RUNNING, True, False),
+        (DataflowJobStatus.JOB_STATE_RUNNING, False, True),
+        # AWAITING STATE
+        (DataflowJobStatus.JOB_STATE_PENDING, None, False),
+        (DataflowJobStatus.JOB_STATE_PENDING, True, False),
+        (DataflowJobStatus.JOB_STATE_PENDING, False, True),
+    ])
+    # fmt: on
+    def test_check_dataflow_job_state_without_job_type(self, job_state, 
wait_until_finished, expected_result):
+        job = {"id": "id-2", "name": "name-2", "currentState": job_state}
+        dataflow_job = _DataflowJobsController(
+            dataflow=self.mock_dataflow,
+            project_number=TEST_PROJECT,
+            name="name-",
+            location=TEST_LOCATION,
+            poll_sleep=0,
+            job_id=None,
+            num_retries=20,
+            multiple_jobs=True,
+            wait_until_finished=wait_until_finished,
+        )
+        result = dataflow_job._check_dataflow_job_state(job)
+        assert result == expected_result
+
+    # fmt: off
+    @parameterized.expand([
         (DataflowJobType.JOB_TYPE_BATCH, DataflowJobStatus.JOB_STATE_FAILED,
             "Google Cloud Dataflow job name-2 has failed\\."),
         (DataflowJobType.JOB_TYPE_STREAMING, 
DataflowJobStatus.JOB_STATE_FAILED,

Reply via email to