Lee-W commented on code in PR #37693: URL: https://github.com/apache/airflow/pull/37693#discussion_r1567161787
########## tests/providers/google/cloud/triggers/test_dataflow.py: ########## @@ -108,33 +167,569 @@ async def test_run_loop_return_failed_event(self, mock_job_status, trigger): "message": f"Dataflow job with id {JOB_ID} has failed its execution", } ) - actual_event = await trigger.run().asend(None) + actual_event = await template_job_start_trigger.run().asend(None) assert actual_event == expected_event @pytest.mark.asyncio @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") - async def test_run_loop_return_stopped_event(self, mock_job_status, trigger): + async def test_run_loop_return_stopped_event(self, mock_job_status, template_job_start_trigger): mock_job_status.return_value = JobState.JOB_STATE_STOPPED expected_event = TriggerEvent( { "status": "stopped", "message": f"Dataflow job with id {JOB_ID} was stopped", } ) - actual_event = await trigger.run().asend(None) + actual_event = await template_job_start_trigger.run().asend(None) assert actual_event == expected_event @pytest.mark.asyncio @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") - async def test_run_loop_is_still_running(self, mock_job_status, trigger, caplog): + async def test_run_loop_is_still_running(self, mock_job_status, template_job_start_trigger, caplog): mock_job_status.return_value = JobState.JOB_STATE_RUNNING caplog.set_level(logging.INFO) - task = asyncio.create_task(trigger.run().__anext__()) + task = asyncio.create_task(template_job_start_trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() assert f"Current job status is: {JobState.JOB_STATE_RUNNING}" assert f"Sleeping for {POLL_SLEEP} seconds." + # cancel the task to suppress test warnings + task.cancel() + + +class TestDataflowJobAutoScalingEventTrigger: + def test_serialize(self, dataflow_job_autoscaling_event_trigger): + expected_data = ( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger", + { + "project_id": PROJECT_ID, + "job_id": JOB_ID, + "location": LOCATION, + "gcp_conn_id": GCP_CONN_ID, + "poll_sleep": POLL_SLEEP, + "impersonation_chain": IMPERSONATION_CHAIN, + "fail_on_terminal_state": False, + }, + ) + actual_data = dataflow_job_autoscaling_event_trigger.serialize() + assert actual_data == expected_data + + @pytest.mark.parametrize( + "attr, expected", + [ + ("gcp_conn_id", GCP_CONN_ID), + ("poll_sleep", POLL_SLEEP), + ("impersonation_chain", IMPERSONATION_CHAIN), + ], + ) + def test_async_hook(self, dataflow_job_autoscaling_event_trigger, attr, expected): + hook = dataflow_job_autoscaling_event_trigger.async_hook + actual = hook._hook_kwargs.get(attr) + assert actual is not None + assert actual == expected + + @pytest.mark.parametrize( + "job_status_value", + [ + JobState.JOB_STATE_DONE, + JobState.JOB_STATE_FAILED, + JobState.JOB_STATE_CANCELLED, + JobState.JOB_STATE_UPDATED, + JobState.JOB_STATE_DRAINED, + ], + ) + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") + @mock.patch( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger.list_job_autoscaling_events" + ) + async def test_run_yields_terminal_state_event_if_fail_on_terminal_state( + self, + mock_list_job_autoscaling_events, + mock_job_status, + job_status_value, + dataflow_job_autoscaling_event_trigger, + ): + dataflow_job_autoscaling_event_trigger.fail_on_terminal_state = True + mock_list_job_autoscaling_events.return_value = [] + mock_job_status.return_value = job_status_value + expected_event = TriggerEvent( + { + "status": "error", + "message": f"Job with id '{JOB_ID}' is already in terminal state: {job_status_value.name}", + "result": None, + } + ) + actual_event = await dataflow_job_autoscaling_event_trigger.run().asend(None) + assert actual_event == expected_event + + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") + @mock.patch( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger.list_job_autoscaling_events" + ) + async def test_run_loop_is_still_running_if_fail_on_terminal_state( + self, + mock_list_job_autoscaling_events, + mock_job_status, + dataflow_job_autoscaling_event_trigger, + caplog, + ): + """Test that DataflowJobAutoScalingEventTrigger is still in loop if the job status is RUNNING.""" + dataflow_job_autoscaling_event_trigger.fail_on_terminal_state = True + mock_job_status.return_value = JobState.JOB_STATE_RUNNING + mock_list_job_autoscaling_events.return_value = [] + caplog.set_level(logging.INFO) + task = asyncio.create_task(dataflow_job_autoscaling_event_trigger.run().__anext__()) + await asyncio.sleep(0.5) + assert task.done() is False + # cancel the task to suppress test warnings + task.cancel() + + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") + @mock.patch( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger.list_job_autoscaling_events" + ) + async def test_run_yields_autoscaling_events( + self, mock_list_job_autoscaling_events, mock_job_status, dataflow_job_autoscaling_event_trigger + ): + mock_job_status.return_value = JobState.JOB_STATE_DONE + test_autoscaling_events = [ + { + "event_type": 2, + "description": {}, + "time": "2024-02-05T13:43:31.066611771Z", + "worker_pool": "Regular", + "current_num_workers": "0", + "target_num_workers": "0", + }, + { + "target_num_workers": "1", + "event_type": 1, + "description": {}, + "time": "2024-02-05T13:43:31.066611771Z", + "worker_pool": "Regular", + "current_num_workers": "0", + }, + ] + mock_list_job_autoscaling_events.return_value = test_autoscaling_events + expected_event = TriggerEvent( + { + "status": "success", + "message": f"Detected 2 autoscaling events for job '{JOB_ID}'", + "result": test_autoscaling_events, + } + ) + actual_event = await dataflow_job_autoscaling_event_trigger.run().asend(None) + assert actual_event == expected_event + + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") + async def test_run_raises_exception(self, mock_job_status, dataflow_job_autoscaling_event_trigger): + """ + Tests the DataflowJobAutoScalingEventTrigger does fire if there is an exception. + """ + mock_job_status.side_effect = mock.AsyncMock(side_effect=Exception("Test exception")) + expected_event = TriggerEvent({"status": "error", "message": "Test exception", "result": None}) + actual_event = await dataflow_job_autoscaling_event_trigger.run().asend(None) + assert expected_event == actual_event + + +class TestDataflowJobMessagesTrigger: + """Test case for DataflowJobMessagesTrigger""" + + def test_serialize(self, dataflow_job_messages_trigger): + expected_data = ( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobMessagesTrigger", + { + "project_id": PROJECT_ID, + "job_id": JOB_ID, + "location": LOCATION, + "gcp_conn_id": GCP_CONN_ID, + "poll_sleep": POLL_SLEEP, + "impersonation_chain": IMPERSONATION_CHAIN, + "fail_on_terminal_state": False, + }, + ) + actual_data = dataflow_job_messages_trigger.serialize() + assert actual_data == expected_data + + @pytest.mark.parametrize( + "attr, expected", + [ + ("gcp_conn_id", GCP_CONN_ID), + ("poll_sleep", POLL_SLEEP), + ("impersonation_chain", IMPERSONATION_CHAIN), + ], + ) + def test_async_hook(self, dataflow_job_messages_trigger, attr, expected): + hook = dataflow_job_messages_trigger.async_hook + actual = hook._hook_kwargs.get(attr) + assert actual is not None Review Comment: same here ########## tests/providers/google/cloud/triggers/test_dataflow.py: ########## @@ -108,33 +167,569 @@ async def test_run_loop_return_failed_event(self, mock_job_status, trigger): "message": f"Dataflow job with id {JOB_ID} has failed its execution", } ) - actual_event = await trigger.run().asend(None) + actual_event = await template_job_start_trigger.run().asend(None) assert actual_event == expected_event @pytest.mark.asyncio @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") - async def test_run_loop_return_stopped_event(self, mock_job_status, trigger): + async def test_run_loop_return_stopped_event(self, mock_job_status, template_job_start_trigger): mock_job_status.return_value = JobState.JOB_STATE_STOPPED expected_event = TriggerEvent( { "status": "stopped", "message": f"Dataflow job with id {JOB_ID} was stopped", } ) - actual_event = await trigger.run().asend(None) + actual_event = await template_job_start_trigger.run().asend(None) assert actual_event == expected_event @pytest.mark.asyncio @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") - async def test_run_loop_is_still_running(self, mock_job_status, trigger, caplog): + async def test_run_loop_is_still_running(self, mock_job_status, template_job_start_trigger, caplog): mock_job_status.return_value = JobState.JOB_STATE_RUNNING caplog.set_level(logging.INFO) - task = asyncio.create_task(trigger.run().__anext__()) + task = asyncio.create_task(template_job_start_trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() assert f"Current job status is: {JobState.JOB_STATE_RUNNING}" assert f"Sleeping for {POLL_SLEEP} seconds." + # cancel the task to suppress test warnings + task.cancel() + + +class TestDataflowJobAutoScalingEventTrigger: + def test_serialize(self, dataflow_job_autoscaling_event_trigger): + expected_data = ( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger", + { + "project_id": PROJECT_ID, + "job_id": JOB_ID, + "location": LOCATION, + "gcp_conn_id": GCP_CONN_ID, + "poll_sleep": POLL_SLEEP, + "impersonation_chain": IMPERSONATION_CHAIN, + "fail_on_terminal_state": False, + }, + ) + actual_data = dataflow_job_autoscaling_event_trigger.serialize() + assert actual_data == expected_data + + @pytest.mark.parametrize( + "attr, expected", + [ + ("gcp_conn_id", GCP_CONN_ID), + ("poll_sleep", POLL_SLEEP), + ("impersonation_chain", IMPERSONATION_CHAIN), + ], + ) + def test_async_hook(self, dataflow_job_autoscaling_event_trigger, attr, expected): + hook = dataflow_job_autoscaling_event_trigger.async_hook + actual = hook._hook_kwargs.get(attr) + assert actual is not None Review Comment: Wondering if this line is required. Thanks! ```suggestion ``` ########## tests/providers/google/cloud/triggers/test_dataflow.py: ########## @@ -108,33 +167,569 @@ async def test_run_loop_return_failed_event(self, mock_job_status, trigger): "message": f"Dataflow job with id {JOB_ID} has failed its execution", } ) - actual_event = await trigger.run().asend(None) + actual_event = await template_job_start_trigger.run().asend(None) assert actual_event == expected_event @pytest.mark.asyncio @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") - async def test_run_loop_return_stopped_event(self, mock_job_status, trigger): + async def test_run_loop_return_stopped_event(self, mock_job_status, template_job_start_trigger): mock_job_status.return_value = JobState.JOB_STATE_STOPPED expected_event = TriggerEvent( { "status": "stopped", "message": f"Dataflow job with id {JOB_ID} was stopped", } ) - actual_event = await trigger.run().asend(None) + actual_event = await template_job_start_trigger.run().asend(None) assert actual_event == expected_event @pytest.mark.asyncio @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") - async def test_run_loop_is_still_running(self, mock_job_status, trigger, caplog): + async def test_run_loop_is_still_running(self, mock_job_status, template_job_start_trigger, caplog): mock_job_status.return_value = JobState.JOB_STATE_RUNNING caplog.set_level(logging.INFO) - task = asyncio.create_task(trigger.run().__anext__()) + task = asyncio.create_task(template_job_start_trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() assert f"Current job status is: {JobState.JOB_STATE_RUNNING}" assert f"Sleeping for {POLL_SLEEP} seconds." + # cancel the task to suppress test warnings + task.cancel() + + +class TestDataflowJobAutoScalingEventTrigger: + def test_serialize(self, dataflow_job_autoscaling_event_trigger): + expected_data = ( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger", + { + "project_id": PROJECT_ID, + "job_id": JOB_ID, + "location": LOCATION, + "gcp_conn_id": GCP_CONN_ID, + "poll_sleep": POLL_SLEEP, + "impersonation_chain": IMPERSONATION_CHAIN, + "fail_on_terminal_state": False, + }, + ) + actual_data = dataflow_job_autoscaling_event_trigger.serialize() + assert actual_data == expected_data + + @pytest.mark.parametrize( + "attr, expected", + [ + ("gcp_conn_id", GCP_CONN_ID), + ("poll_sleep", POLL_SLEEP), + ("impersonation_chain", IMPERSONATION_CHAIN), + ], + ) + def test_async_hook(self, dataflow_job_autoscaling_event_trigger, attr, expected): + hook = dataflow_job_autoscaling_event_trigger.async_hook + actual = hook._hook_kwargs.get(attr) + assert actual is not None + assert actual == expected + + @pytest.mark.parametrize( + "job_status_value", + [ + JobState.JOB_STATE_DONE, + JobState.JOB_STATE_FAILED, + JobState.JOB_STATE_CANCELLED, + JobState.JOB_STATE_UPDATED, + JobState.JOB_STATE_DRAINED, + ], + ) + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") + @mock.patch( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger.list_job_autoscaling_events" + ) + async def test_run_yields_terminal_state_event_if_fail_on_terminal_state( + self, + mock_list_job_autoscaling_events, + mock_job_status, + job_status_value, + dataflow_job_autoscaling_event_trigger, + ): + dataflow_job_autoscaling_event_trigger.fail_on_terminal_state = True + mock_list_job_autoscaling_events.return_value = [] + mock_job_status.return_value = job_status_value + expected_event = TriggerEvent( + { + "status": "error", + "message": f"Job with id '{JOB_ID}' is already in terminal state: {job_status_value.name}", + "result": None, + } + ) + actual_event = await dataflow_job_autoscaling_event_trigger.run().asend(None) + assert actual_event == expected_event + + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") + @mock.patch( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger.list_job_autoscaling_events" + ) + async def test_run_loop_is_still_running_if_fail_on_terminal_state( + self, + mock_list_job_autoscaling_events, + mock_job_status, + dataflow_job_autoscaling_event_trigger, + caplog, + ): + """Test that DataflowJobAutoScalingEventTrigger is still in loop if the job status is RUNNING.""" + dataflow_job_autoscaling_event_trigger.fail_on_terminal_state = True + mock_job_status.return_value = JobState.JOB_STATE_RUNNING + mock_list_job_autoscaling_events.return_value = [] + caplog.set_level(logging.INFO) + task = asyncio.create_task(dataflow_job_autoscaling_event_trigger.run().__anext__()) + await asyncio.sleep(0.5) + assert task.done() is False + # cancel the task to suppress test warnings + task.cancel() + + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") + @mock.patch( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger.list_job_autoscaling_events" + ) + async def test_run_yields_autoscaling_events( + self, mock_list_job_autoscaling_events, mock_job_status, dataflow_job_autoscaling_event_trigger + ): + mock_job_status.return_value = JobState.JOB_STATE_DONE + test_autoscaling_events = [ + { + "event_type": 2, + "description": {}, + "time": "2024-02-05T13:43:31.066611771Z", + "worker_pool": "Regular", + "current_num_workers": "0", + "target_num_workers": "0", + }, + { + "target_num_workers": "1", + "event_type": 1, + "description": {}, + "time": "2024-02-05T13:43:31.066611771Z", + "worker_pool": "Regular", + "current_num_workers": "0", + }, + ] + mock_list_job_autoscaling_events.return_value = test_autoscaling_events + expected_event = TriggerEvent( + { + "status": "success", + "message": f"Detected 2 autoscaling events for job '{JOB_ID}'", + "result": test_autoscaling_events, + } + ) + actual_event = await dataflow_job_autoscaling_event_trigger.run().asend(None) + assert actual_event == expected_event + + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") + async def test_run_raises_exception(self, mock_job_status, dataflow_job_autoscaling_event_trigger): + """ + Tests the DataflowJobAutoScalingEventTrigger does fire if there is an exception. + """ + mock_job_status.side_effect = mock.AsyncMock(side_effect=Exception("Test exception")) + expected_event = TriggerEvent({"status": "error", "message": "Test exception", "result": None}) + actual_event = await dataflow_job_autoscaling_event_trigger.run().asend(None) + assert expected_event == actual_event + + +class TestDataflowJobMessagesTrigger: + """Test case for DataflowJobMessagesTrigger""" + + def test_serialize(self, dataflow_job_messages_trigger): + expected_data = ( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobMessagesTrigger", + { + "project_id": PROJECT_ID, + "job_id": JOB_ID, + "location": LOCATION, + "gcp_conn_id": GCP_CONN_ID, + "poll_sleep": POLL_SLEEP, + "impersonation_chain": IMPERSONATION_CHAIN, + "fail_on_terminal_state": False, + }, + ) + actual_data = dataflow_job_messages_trigger.serialize() + assert actual_data == expected_data + + @pytest.mark.parametrize( + "attr, expected", + [ + ("gcp_conn_id", GCP_CONN_ID), + ("poll_sleep", POLL_SLEEP), + ("impersonation_chain", IMPERSONATION_CHAIN), + ], + ) + def test_async_hook(self, dataflow_job_messages_trigger, attr, expected): + hook = dataflow_job_messages_trigger.async_hook + actual = hook._hook_kwargs.get(attr) + assert actual is not None + assert actual == expected + + @pytest.mark.parametrize( + "job_status_value", + [ + JobState.JOB_STATE_DONE, + JobState.JOB_STATE_FAILED, + JobState.JOB_STATE_CANCELLED, + JobState.JOB_STATE_UPDATED, + JobState.JOB_STATE_DRAINED, + ], + ) + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") + @mock.patch( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobMessagesTrigger.list_job_messages" + ) + async def test_run_yields_terminal_state_event_if_fail_on_terminal_state( + self, + mock_list_job_messages, + mock_job_status, + job_status_value, + dataflow_job_messages_trigger, + ): + dataflow_job_messages_trigger.fail_on_terminal_state = True + mock_list_job_messages.return_value = [] + mock_job_status.return_value = job_status_value + expected_event = TriggerEvent( + { + "status": "error", + "message": f"Job with id '{JOB_ID}' is already in terminal state: {job_status_value.name}", + "result": None, + } + ) + actual_event = await dataflow_job_messages_trigger.run().asend(None) + assert actual_event == expected_event + + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") + @mock.patch( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobMessagesTrigger.list_job_messages" + ) + async def test_run_loop_is_still_running_if_fail_on_terminal_state( + self, + mock_list_job_messages, + mock_job_status, + dataflow_job_messages_trigger, + caplog, + ): + """Test that DataflowJobMessagesTrigger is still in loop if the job status is RUNNING.""" + dataflow_job_messages_trigger.fail_on_terminal_state = True + mock_job_status.return_value = JobState.JOB_STATE_RUNNING + mock_list_job_messages.return_value = [] + caplog.set_level(logging.INFO) + task = asyncio.create_task(dataflow_job_messages_trigger.run().__anext__()) + await asyncio.sleep(0.5) + assert task.done() is False + # cancel the task to suppress test warnings + task.cancel() + + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") + @mock.patch( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobMessagesTrigger.list_job_messages" + ) + async def test_run_yields_job_messages( + self, mock_list_job_messages, mock_job_status, dataflow_job_messages_trigger + ): + mock_job_status.return_value = JobState.JOB_STATE_DONE + test_job_messages = [ + { + "id": "1707695235850", + "time": "2024-02-06T23:47:15.850Z", + "message_text": "msg.", + "message_importance": 5, + }, + { + "id": "1707695635401", + "time": "2024-02-06T23:53:55.401Z", + "message_text": "msg.", + "message_importance": 5, + }, + ] + mock_list_job_messages.return_value = test_job_messages + expected_event = TriggerEvent( + { + "status": "success", + "message": f"Detected 2 job messages for job '{JOB_ID}'", + "result": test_job_messages, + } + ) + actual_event = await dataflow_job_messages_trigger.run().asend(None) + assert actual_event == expected_event + + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook.get_job_status") + async def test_run_raises_exception(self, mock_job_status, dataflow_job_messages_trigger): + """ + Tests the DataflowJobMessagesTrigger does fire if there is an exception. + """ + mock_job_status.side_effect = mock.AsyncMock(side_effect=Exception("Test exception")) + expected_event = TriggerEvent({"status": "error", "message": "Test exception", "result": None}) + actual_event = await dataflow_job_messages_trigger.run().asend(None) + assert expected_event == actual_event + + +class TestDataflowJobMetricsTrigger: + """Test case for DataflowJobMetricsTrigger""" + + def test_serialize(self, dataflow_job_metrics_trigger): + expected_data = ( + "airflow.providers.google.cloud.triggers.dataflow.DataflowJobMetricsTrigger", + { + "project_id": PROJECT_ID, + "job_id": JOB_ID, + "location": LOCATION, + "gcp_conn_id": GCP_CONN_ID, + "poll_sleep": POLL_SLEEP, + "impersonation_chain": IMPERSONATION_CHAIN, + "fail_on_terminal_state": False, + }, + ) + actual_data = dataflow_job_metrics_trigger.serialize() + assert actual_data == expected_data + + @pytest.mark.parametrize( + "attr, expected", + [ + ("gcp_conn_id", GCP_CONN_ID), + ("poll_sleep", POLL_SLEEP), + ("impersonation_chain", IMPERSONATION_CHAIN), + ], + ) + def test_async_hook(self, dataflow_job_metrics_trigger, attr, expected): + hook = dataflow_job_metrics_trigger.async_hook + actual = hook._hook_kwargs.get(attr) + assert actual is not None Review Comment: same here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org