Lee-W commented on code in PR #37693:
URL: https://github.com/apache/airflow/pull/37693#discussion_r1567161787


##########
tests/providers/google/cloud/triggers/test_dataflow.py:
##########
@@ -108,33 +167,569 @@ async def test_run_loop_return_failed_event(self, 
mock_job_status, trigger):
                 "message": f"Dataflow job with id {JOB_ID} has failed its 
execution",
             }
         )
-        actual_event = await trigger.run().asend(None)
+        actual_event = await template_job_start_trigger.run().asend(None)
 
         assert actual_event == expected_event
 
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
-    async def test_run_loop_return_stopped_event(self, mock_job_status, 
trigger):
+    async def test_run_loop_return_stopped_event(self, mock_job_status, 
template_job_start_trigger):
         mock_job_status.return_value = JobState.JOB_STATE_STOPPED
         expected_event = TriggerEvent(
             {
                 "status": "stopped",
                 "message": f"Dataflow job with id {JOB_ID} was stopped",
             }
         )
-        actual_event = await trigger.run().asend(None)
+        actual_event = await template_job_start_trigger.run().asend(None)
 
         assert actual_event == expected_event
 
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
-    async def test_run_loop_is_still_running(self, mock_job_status, trigger, 
caplog):
+    async def test_run_loop_is_still_running(self, mock_job_status, 
template_job_start_trigger, caplog):
         mock_job_status.return_value = JobState.JOB_STATE_RUNNING
         caplog.set_level(logging.INFO)
 
-        task = asyncio.create_task(trigger.run().__anext__())
+        task = 
asyncio.create_task(template_job_start_trigger.run().__anext__())
         await asyncio.sleep(0.5)
 
         assert not task.done()
         assert f"Current job status is: {JobState.JOB_STATE_RUNNING}"
         assert f"Sleeping for {POLL_SLEEP} seconds."
+        # cancel the task to suppress test warnings
+        task.cancel()
+
+
+class TestDataflowJobAutoScalingEventTrigger:
+    def test_serialize(self, dataflow_job_autoscaling_event_trigger):
+        expected_data = (
+            
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger",
+            {
+                "project_id": PROJECT_ID,
+                "job_id": JOB_ID,
+                "location": LOCATION,
+                "gcp_conn_id": GCP_CONN_ID,
+                "poll_sleep": POLL_SLEEP,
+                "impersonation_chain": IMPERSONATION_CHAIN,
+                "fail_on_terminal_state": False,
+            },
+        )
+        actual_data = dataflow_job_autoscaling_event_trigger.serialize()
+        assert actual_data == expected_data
+
+    @pytest.mark.parametrize(
+        "attr, expected",
+        [
+            ("gcp_conn_id", GCP_CONN_ID),
+            ("poll_sleep", POLL_SLEEP),
+            ("impersonation_chain", IMPERSONATION_CHAIN),
+        ],
+    )
+    def test_async_hook(self, dataflow_job_autoscaling_event_trigger, attr, 
expected):
+        hook = dataflow_job_autoscaling_event_trigger.async_hook
+        actual = hook._hook_kwargs.get(attr)
+        assert actual is not None
+        assert actual == expected
+
+    @pytest.mark.parametrize(
+        "job_status_value",
+        [
+            JobState.JOB_STATE_DONE,
+            JobState.JOB_STATE_FAILED,
+            JobState.JOB_STATE_CANCELLED,
+            JobState.JOB_STATE_UPDATED,
+            JobState.JOB_STATE_DRAINED,
+        ],
+    )
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger.list_job_autoscaling_events"
+    )
+    async def test_run_yields_terminal_state_event_if_fail_on_terminal_state(
+        self,
+        mock_list_job_autoscaling_events,
+        mock_job_status,
+        job_status_value,
+        dataflow_job_autoscaling_event_trigger,
+    ):
+        dataflow_job_autoscaling_event_trigger.fail_on_terminal_state = True
+        mock_list_job_autoscaling_events.return_value = []
+        mock_job_status.return_value = job_status_value
+        expected_event = TriggerEvent(
+            {
+                "status": "error",
+                "message": f"Job with id '{JOB_ID}' is already in terminal 
state: {job_status_value.name}",
+                "result": None,
+            }
+        )
+        actual_event = await 
dataflow_job_autoscaling_event_trigger.run().asend(None)
+        assert actual_event == expected_event
+
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger.list_job_autoscaling_events"
+    )
+    async def test_run_loop_is_still_running_if_fail_on_terminal_state(
+        self,
+        mock_list_job_autoscaling_events,
+        mock_job_status,
+        dataflow_job_autoscaling_event_trigger,
+        caplog,
+    ):
+        """Test that DataflowJobAutoScalingEventTrigger is still in loop if 
the job status is RUNNING."""
+        dataflow_job_autoscaling_event_trigger.fail_on_terminal_state = True
+        mock_job_status.return_value = JobState.JOB_STATE_RUNNING
+        mock_list_job_autoscaling_events.return_value = []
+        caplog.set_level(logging.INFO)
+        task = 
asyncio.create_task(dataflow_job_autoscaling_event_trigger.run().__anext__())
+        await asyncio.sleep(0.5)
+        assert task.done() is False
+        # cancel the task to suppress test warnings
+        task.cancel()
+
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger.list_job_autoscaling_events"
+    )
+    async def test_run_yields_autoscaling_events(
+        self, mock_list_job_autoscaling_events, mock_job_status, 
dataflow_job_autoscaling_event_trigger
+    ):
+        mock_job_status.return_value = JobState.JOB_STATE_DONE
+        test_autoscaling_events = [
+            {
+                "event_type": 2,
+                "description": {},
+                "time": "2024-02-05T13:43:31.066611771Z",
+                "worker_pool": "Regular",
+                "current_num_workers": "0",
+                "target_num_workers": "0",
+            },
+            {
+                "target_num_workers": "1",
+                "event_type": 1,
+                "description": {},
+                "time": "2024-02-05T13:43:31.066611771Z",
+                "worker_pool": "Regular",
+                "current_num_workers": "0",
+            },
+        ]
+        mock_list_job_autoscaling_events.return_value = test_autoscaling_events
+        expected_event = TriggerEvent(
+            {
+                "status": "success",
+                "message": f"Detected 2 autoscaling events for job '{JOB_ID}'",
+                "result": test_autoscaling_events,
+            }
+        )
+        actual_event = await 
dataflow_job_autoscaling_event_trigger.run().asend(None)
+        assert actual_event == expected_event
+
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
+    async def test_run_raises_exception(self, mock_job_status, 
dataflow_job_autoscaling_event_trigger):
+        """
+        Tests the DataflowJobAutoScalingEventTrigger does fire if there is an 
exception.
+        """
+        mock_job_status.side_effect = 
mock.AsyncMock(side_effect=Exception("Test exception"))
+        expected_event = TriggerEvent({"status": "error", "message": "Test 
exception", "result": None})
+        actual_event = await 
dataflow_job_autoscaling_event_trigger.run().asend(None)
+        assert expected_event == actual_event
+
+
+class TestDataflowJobMessagesTrigger:
+    """Test case for DataflowJobMessagesTrigger"""
+
+    def test_serialize(self, dataflow_job_messages_trigger):
+        expected_data = (
+            
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobMessagesTrigger",
+            {
+                "project_id": PROJECT_ID,
+                "job_id": JOB_ID,
+                "location": LOCATION,
+                "gcp_conn_id": GCP_CONN_ID,
+                "poll_sleep": POLL_SLEEP,
+                "impersonation_chain": IMPERSONATION_CHAIN,
+                "fail_on_terminal_state": False,
+            },
+        )
+        actual_data = dataflow_job_messages_trigger.serialize()
+        assert actual_data == expected_data
+
+    @pytest.mark.parametrize(
+        "attr, expected",
+        [
+            ("gcp_conn_id", GCP_CONN_ID),
+            ("poll_sleep", POLL_SLEEP),
+            ("impersonation_chain", IMPERSONATION_CHAIN),
+        ],
+    )
+    def test_async_hook(self, dataflow_job_messages_trigger, attr, expected):
+        hook = dataflow_job_messages_trigger.async_hook
+        actual = hook._hook_kwargs.get(attr)
+        assert actual is not None

Review Comment:
   same here



##########
tests/providers/google/cloud/triggers/test_dataflow.py:
##########
@@ -108,33 +167,569 @@ async def test_run_loop_return_failed_event(self, 
mock_job_status, trigger):
                 "message": f"Dataflow job with id {JOB_ID} has failed its 
execution",
             }
         )
-        actual_event = await trigger.run().asend(None)
+        actual_event = await template_job_start_trigger.run().asend(None)
 
         assert actual_event == expected_event
 
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
-    async def test_run_loop_return_stopped_event(self, mock_job_status, 
trigger):
+    async def test_run_loop_return_stopped_event(self, mock_job_status, 
template_job_start_trigger):
         mock_job_status.return_value = JobState.JOB_STATE_STOPPED
         expected_event = TriggerEvent(
             {
                 "status": "stopped",
                 "message": f"Dataflow job with id {JOB_ID} was stopped",
             }
         )
-        actual_event = await trigger.run().asend(None)
+        actual_event = await template_job_start_trigger.run().asend(None)
 
         assert actual_event == expected_event
 
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
-    async def test_run_loop_is_still_running(self, mock_job_status, trigger, 
caplog):
+    async def test_run_loop_is_still_running(self, mock_job_status, 
template_job_start_trigger, caplog):
         mock_job_status.return_value = JobState.JOB_STATE_RUNNING
         caplog.set_level(logging.INFO)
 
-        task = asyncio.create_task(trigger.run().__anext__())
+        task = 
asyncio.create_task(template_job_start_trigger.run().__anext__())
         await asyncio.sleep(0.5)
 
         assert not task.done()
         assert f"Current job status is: {JobState.JOB_STATE_RUNNING}"
         assert f"Sleeping for {POLL_SLEEP} seconds."
+        # cancel the task to suppress test warnings
+        task.cancel()
+
+
+class TestDataflowJobAutoScalingEventTrigger:
+    def test_serialize(self, dataflow_job_autoscaling_event_trigger):
+        expected_data = (
+            
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger",
+            {
+                "project_id": PROJECT_ID,
+                "job_id": JOB_ID,
+                "location": LOCATION,
+                "gcp_conn_id": GCP_CONN_ID,
+                "poll_sleep": POLL_SLEEP,
+                "impersonation_chain": IMPERSONATION_CHAIN,
+                "fail_on_terminal_state": False,
+            },
+        )
+        actual_data = dataflow_job_autoscaling_event_trigger.serialize()
+        assert actual_data == expected_data
+
+    @pytest.mark.parametrize(
+        "attr, expected",
+        [
+            ("gcp_conn_id", GCP_CONN_ID),
+            ("poll_sleep", POLL_SLEEP),
+            ("impersonation_chain", IMPERSONATION_CHAIN),
+        ],
+    )
+    def test_async_hook(self, dataflow_job_autoscaling_event_trigger, attr, 
expected):
+        hook = dataflow_job_autoscaling_event_trigger.async_hook
+        actual = hook._hook_kwargs.get(attr)
+        assert actual is not None

Review Comment:
   Wondering if this line is required. Thanks!
   ```suggestion
   ```



##########
tests/providers/google/cloud/triggers/test_dataflow.py:
##########
@@ -108,33 +167,569 @@ async def test_run_loop_return_failed_event(self, 
mock_job_status, trigger):
                 "message": f"Dataflow job with id {JOB_ID} has failed its 
execution",
             }
         )
-        actual_event = await trigger.run().asend(None)
+        actual_event = await template_job_start_trigger.run().asend(None)
 
         assert actual_event == expected_event
 
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
-    async def test_run_loop_return_stopped_event(self, mock_job_status, 
trigger):
+    async def test_run_loop_return_stopped_event(self, mock_job_status, 
template_job_start_trigger):
         mock_job_status.return_value = JobState.JOB_STATE_STOPPED
         expected_event = TriggerEvent(
             {
                 "status": "stopped",
                 "message": f"Dataflow job with id {JOB_ID} was stopped",
             }
         )
-        actual_event = await trigger.run().asend(None)
+        actual_event = await template_job_start_trigger.run().asend(None)
 
         assert actual_event == expected_event
 
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
-    async def test_run_loop_is_still_running(self, mock_job_status, trigger, 
caplog):
+    async def test_run_loop_is_still_running(self, mock_job_status, 
template_job_start_trigger, caplog):
         mock_job_status.return_value = JobState.JOB_STATE_RUNNING
         caplog.set_level(logging.INFO)
 
-        task = asyncio.create_task(trigger.run().__anext__())
+        task = 
asyncio.create_task(template_job_start_trigger.run().__anext__())
         await asyncio.sleep(0.5)
 
         assert not task.done()
         assert f"Current job status is: {JobState.JOB_STATE_RUNNING}"
         assert f"Sleeping for {POLL_SLEEP} seconds."
+        # cancel the task to suppress test warnings
+        task.cancel()
+
+
+class TestDataflowJobAutoScalingEventTrigger:
+    def test_serialize(self, dataflow_job_autoscaling_event_trigger):
+        expected_data = (
+            
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger",
+            {
+                "project_id": PROJECT_ID,
+                "job_id": JOB_ID,
+                "location": LOCATION,
+                "gcp_conn_id": GCP_CONN_ID,
+                "poll_sleep": POLL_SLEEP,
+                "impersonation_chain": IMPERSONATION_CHAIN,
+                "fail_on_terminal_state": False,
+            },
+        )
+        actual_data = dataflow_job_autoscaling_event_trigger.serialize()
+        assert actual_data == expected_data
+
+    @pytest.mark.parametrize(
+        "attr, expected",
+        [
+            ("gcp_conn_id", GCP_CONN_ID),
+            ("poll_sleep", POLL_SLEEP),
+            ("impersonation_chain", IMPERSONATION_CHAIN),
+        ],
+    )
+    def test_async_hook(self, dataflow_job_autoscaling_event_trigger, attr, 
expected):
+        hook = dataflow_job_autoscaling_event_trigger.async_hook
+        actual = hook._hook_kwargs.get(attr)
+        assert actual is not None
+        assert actual == expected
+
+    @pytest.mark.parametrize(
+        "job_status_value",
+        [
+            JobState.JOB_STATE_DONE,
+            JobState.JOB_STATE_FAILED,
+            JobState.JOB_STATE_CANCELLED,
+            JobState.JOB_STATE_UPDATED,
+            JobState.JOB_STATE_DRAINED,
+        ],
+    )
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger.list_job_autoscaling_events"
+    )
+    async def test_run_yields_terminal_state_event_if_fail_on_terminal_state(
+        self,
+        mock_list_job_autoscaling_events,
+        mock_job_status,
+        job_status_value,
+        dataflow_job_autoscaling_event_trigger,
+    ):
+        dataflow_job_autoscaling_event_trigger.fail_on_terminal_state = True
+        mock_list_job_autoscaling_events.return_value = []
+        mock_job_status.return_value = job_status_value
+        expected_event = TriggerEvent(
+            {
+                "status": "error",
+                "message": f"Job with id '{JOB_ID}' is already in terminal 
state: {job_status_value.name}",
+                "result": None,
+            }
+        )
+        actual_event = await 
dataflow_job_autoscaling_event_trigger.run().asend(None)
+        assert actual_event == expected_event
+
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger.list_job_autoscaling_events"
+    )
+    async def test_run_loop_is_still_running_if_fail_on_terminal_state(
+        self,
+        mock_list_job_autoscaling_events,
+        mock_job_status,
+        dataflow_job_autoscaling_event_trigger,
+        caplog,
+    ):
+        """Test that DataflowJobAutoScalingEventTrigger is still in loop if 
the job status is RUNNING."""
+        dataflow_job_autoscaling_event_trigger.fail_on_terminal_state = True
+        mock_job_status.return_value = JobState.JOB_STATE_RUNNING
+        mock_list_job_autoscaling_events.return_value = []
+        caplog.set_level(logging.INFO)
+        task = 
asyncio.create_task(dataflow_job_autoscaling_event_trigger.run().__anext__())
+        await asyncio.sleep(0.5)
+        assert task.done() is False
+        # cancel the task to suppress test warnings
+        task.cancel()
+
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger.list_job_autoscaling_events"
+    )
+    async def test_run_yields_autoscaling_events(
+        self, mock_list_job_autoscaling_events, mock_job_status, 
dataflow_job_autoscaling_event_trigger
+    ):
+        mock_job_status.return_value = JobState.JOB_STATE_DONE
+        test_autoscaling_events = [
+            {
+                "event_type": 2,
+                "description": {},
+                "time": "2024-02-05T13:43:31.066611771Z",
+                "worker_pool": "Regular",
+                "current_num_workers": "0",
+                "target_num_workers": "0",
+            },
+            {
+                "target_num_workers": "1",
+                "event_type": 1,
+                "description": {},
+                "time": "2024-02-05T13:43:31.066611771Z",
+                "worker_pool": "Regular",
+                "current_num_workers": "0",
+            },
+        ]
+        mock_list_job_autoscaling_events.return_value = test_autoscaling_events
+        expected_event = TriggerEvent(
+            {
+                "status": "success",
+                "message": f"Detected 2 autoscaling events for job '{JOB_ID}'",
+                "result": test_autoscaling_events,
+            }
+        )
+        actual_event = await 
dataflow_job_autoscaling_event_trigger.run().asend(None)
+        assert actual_event == expected_event
+
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
+    async def test_run_raises_exception(self, mock_job_status, 
dataflow_job_autoscaling_event_trigger):
+        """
+        Tests the DataflowJobAutoScalingEventTrigger does fire if there is an 
exception.
+        """
+        mock_job_status.side_effect = 
mock.AsyncMock(side_effect=Exception("Test exception"))
+        expected_event = TriggerEvent({"status": "error", "message": "Test 
exception", "result": None})
+        actual_event = await 
dataflow_job_autoscaling_event_trigger.run().asend(None)
+        assert expected_event == actual_event
+
+
+class TestDataflowJobMessagesTrigger:
+    """Test case for DataflowJobMessagesTrigger"""
+
+    def test_serialize(self, dataflow_job_messages_trigger):
+        expected_data = (
+            
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobMessagesTrigger",
+            {
+                "project_id": PROJECT_ID,
+                "job_id": JOB_ID,
+                "location": LOCATION,
+                "gcp_conn_id": GCP_CONN_ID,
+                "poll_sleep": POLL_SLEEP,
+                "impersonation_chain": IMPERSONATION_CHAIN,
+                "fail_on_terminal_state": False,
+            },
+        )
+        actual_data = dataflow_job_messages_trigger.serialize()
+        assert actual_data == expected_data
+
+    @pytest.mark.parametrize(
+        "attr, expected",
+        [
+            ("gcp_conn_id", GCP_CONN_ID),
+            ("poll_sleep", POLL_SLEEP),
+            ("impersonation_chain", IMPERSONATION_CHAIN),
+        ],
+    )
+    def test_async_hook(self, dataflow_job_messages_trigger, attr, expected):
+        hook = dataflow_job_messages_trigger.async_hook
+        actual = hook._hook_kwargs.get(attr)
+        assert actual is not None
+        assert actual == expected
+
+    @pytest.mark.parametrize(
+        "job_status_value",
+        [
+            JobState.JOB_STATE_DONE,
+            JobState.JOB_STATE_FAILED,
+            JobState.JOB_STATE_CANCELLED,
+            JobState.JOB_STATE_UPDATED,
+            JobState.JOB_STATE_DRAINED,
+        ],
+    )
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobMessagesTrigger.list_job_messages"
+    )
+    async def test_run_yields_terminal_state_event_if_fail_on_terminal_state(
+        self,
+        mock_list_job_messages,
+        mock_job_status,
+        job_status_value,
+        dataflow_job_messages_trigger,
+    ):
+        dataflow_job_messages_trigger.fail_on_terminal_state = True
+        mock_list_job_messages.return_value = []
+        mock_job_status.return_value = job_status_value
+        expected_event = TriggerEvent(
+            {
+                "status": "error",
+                "message": f"Job with id '{JOB_ID}' is already in terminal 
state: {job_status_value.name}",
+                "result": None,
+            }
+        )
+        actual_event = await dataflow_job_messages_trigger.run().asend(None)
+        assert actual_event == expected_event
+
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobMessagesTrigger.list_job_messages"
+    )
+    async def test_run_loop_is_still_running_if_fail_on_terminal_state(
+        self,
+        mock_list_job_messages,
+        mock_job_status,
+        dataflow_job_messages_trigger,
+        caplog,
+    ):
+        """Test that DataflowJobMessagesTrigger is still in loop if the job 
status is RUNNING."""
+        dataflow_job_messages_trigger.fail_on_terminal_state = True
+        mock_job_status.return_value = JobState.JOB_STATE_RUNNING
+        mock_list_job_messages.return_value = []
+        caplog.set_level(logging.INFO)
+        task = 
asyncio.create_task(dataflow_job_messages_trigger.run().__anext__())
+        await asyncio.sleep(0.5)
+        assert task.done() is False
+        # cancel the task to suppress test warnings
+        task.cancel()
+
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
+    @mock.patch(
+        
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobMessagesTrigger.list_job_messages"
+    )
+    async def test_run_yields_job_messages(
+        self, mock_list_job_messages, mock_job_status, 
dataflow_job_messages_trigger
+    ):
+        mock_job_status.return_value = JobState.JOB_STATE_DONE
+        test_job_messages = [
+            {
+                "id": "1707695235850",
+                "time": "2024-02-06T23:47:15.850Z",
+                "message_text": "msg.",
+                "message_importance": 5,
+            },
+            {
+                "id": "1707695635401",
+                "time": "2024-02-06T23:53:55.401Z",
+                "message_text": "msg.",
+                "message_importance": 5,
+            },
+        ]
+        mock_list_job_messages.return_value = test_job_messages
+        expected_event = TriggerEvent(
+            {
+                "status": "success",
+                "message": f"Detected 2 job messages for job '{JOB_ID}'",
+                "result": test_job_messages,
+            }
+        )
+        actual_event = await dataflow_job_messages_trigger.run().asend(None)
+        assert actual_event == expected_event
+
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status")
+    async def test_run_raises_exception(self, mock_job_status, 
dataflow_job_messages_trigger):
+        """
+        Tests the DataflowJobMessagesTrigger does fire if there is an 
exception.
+        """
+        mock_job_status.side_effect = 
mock.AsyncMock(side_effect=Exception("Test exception"))
+        expected_event = TriggerEvent({"status": "error", "message": "Test 
exception", "result": None})
+        actual_event = await dataflow_job_messages_trigger.run().asend(None)
+        assert expected_event == actual_event
+
+
+class TestDataflowJobMetricsTrigger:
+    """Test case for DataflowJobMetricsTrigger"""
+
+    def test_serialize(self, dataflow_job_metrics_trigger):
+        expected_data = (
+            
"airflow.providers.google.cloud.triggers.dataflow.DataflowJobMetricsTrigger",
+            {
+                "project_id": PROJECT_ID,
+                "job_id": JOB_ID,
+                "location": LOCATION,
+                "gcp_conn_id": GCP_CONN_ID,
+                "poll_sleep": POLL_SLEEP,
+                "impersonation_chain": IMPERSONATION_CHAIN,
+                "fail_on_terminal_state": False,
+            },
+        )
+        actual_data = dataflow_job_metrics_trigger.serialize()
+        assert actual_data == expected_data
+
+    @pytest.mark.parametrize(
+        "attr, expected",
+        [
+            ("gcp_conn_id", GCP_CONN_ID),
+            ("poll_sleep", POLL_SLEEP),
+            ("impersonation_chain", IMPERSONATION_CHAIN),
+        ],
+    )
+    def test_async_hook(self, dataflow_job_metrics_trigger, attr, expected):
+        hook = dataflow_job_metrics_trigger.async_hook
+        actual = hook._hook_kwargs.get(attr)
+        assert actual is not None

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to