Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
guan404ming commented on PR #50487: URL: https://github.com/apache/airflow/pull/50487#issuecomment-2876746363 Thanks for all reviews and suggestions! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
Lee-W merged PR #50487: URL: https://github.com/apache/airflow/pull/50487 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
guan404ming commented on code in PR #50487: URL: https://github.com/apache/airflow/pull/50487#discussion_r2086555433 ## providers/standard/tests/unit/standard/triggers/test_external_task.py: ## @@ -487,14 +508,29 @@ async def test_task_workflow_trigger_fail_count_eq_0(self, mock_get_count): result = trigger_task.result() assert isinstance(result, TriggerEvent) assert result.payload == {"status": "success"} -mock_get_count.assert_called_once_with( -dttm_filter=value, -external_task_ids=["external_task_op"], -external_task_group_id=None, -external_dag_id="external_task", -states=["success", "fail"], + +assert mock_get_count.call_count == 2 +mock_get_count.assert_has_calls( +[ +mock.call( +dttm_filter=value, +external_task_ids=["external_task_op"], +external_task_group_id=None, +external_dag_id="external_task", +states=["success", "fail"], +), +mock.call( +dttm_filter=value, +external_task_ids=["external_task_op"], +external_task_group_id=None, +external_dag_id="external_task", +states=["success", "fail"], +), +] Review Comment: This case is in AF2 thus some field seems not worked here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
guan404ming commented on PR #50487: URL: https://github.com/apache/airflow/pull/50487#issuecomment-2876045319 I've fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
eladkal commented on PR #50487: URL: https://github.com/apache/airflow/pull/50487#issuecomment-2875852199 static checks fails -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
guan404ming commented on code in PR #50487: URL: https://github.com/apache/airflow/pull/50487#discussion_r2086245864 ## providers/standard/tests/unit/standard/test_exceptions.py: ## @@ -0,0 +1,110 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations Review Comment: Yes, it would fail. Actually, I forget to add it at first thus I know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
Lee-W commented on code in PR #50487: URL: https://github.com/apache/airflow/pull/50487#discussion_r2086218686 ## providers/standard/tests/unit/standard/test_exceptions.py: ## @@ -0,0 +1,110 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations Review Comment: I don't mind having this module. I'm just curious if it's failing anything in the CI if we don't have it. ## providers/standard/tests/unit/standard/triggers/test_external_task.py: ## @@ -140,13 +141,28 @@ async def test_task_workflow_trigger_fail_count_eq_0(self, mock_get_count): result = trigger_task.result() assert isinstance(result, TriggerEvent) assert result.payload == {"status": "success"} -mock_get_count.assert_called_once_with( -dag_id="external_task", -task_ids=["external_task_op"], -logical_dates=[self.LOGICAL_DATE], -run_ids=[self.RUN_ID], -states=["success", "fail"], + +# Verify both calls were made +assert mock_get_count.call_count == 2 +mock_get_count.assert_has_calls( +[ +mock.call( +dag_id="external_task", +task_ids=["external_task_op"], +logical_dates=[self.LOGICAL_DATE], +run_ids=[self.RUN_ID], +states=["success", "fail"], +), +mock.call( +dag_id="external_task", +task_ids=["external_task_op"], +logical_dates=[self.LOGICAL_DATE], +run_ids=[self.RUN_ID], +states=["success", "fail"], +), +] Review Comment: nit ```suggestion [ mock.call( dag_id="external_task", task_ids=["external_task_op"], logical_dates=[self.LOGICAL_DATE], run_ids=[self.RUN_ID], states=["success", "fail"], ), ] * 2 ``` ## providers/standard/tests/unit/standard/triggers/test_external_task.py: ## @@ -487,14 +508,29 @@ async def test_task_workflow_trigger_fail_count_eq_0(self, mock_get_count): result = trigger_task.result() assert isinstance(result, TriggerEvent) assert result.payload == {"status": "success"} -mock_get_count.assert_called_once_with( -dttm_filter=value, -external_task_ids=["external_task_op"], -external_task_group_id=None, -external_dag_id="external_task", -states=["success", "fail"], + +assert mock_get_count.call_count == 2 +mock_get_count.assert_has_calls( +[ +mock.call( +dttm_filter=value, +external_task_ids=["external_task_op"], +external_task_group_id=None, +external_dag_id="external_task", +states=["success", "fail"], +), +mock.call( +dttm_filter=value, +external_task_ids=["external_task_op"], +external_task_group_id=None, +external_dag_id="external_task", +states=["success", "fail"], +), +] Review Comment: nit ```suggestion [ mock.call( dag_id="external_task", task_ids=["external_task_op"], logical_dates=[self.LOGICAL_DATE], run_ids=[self.RUN_ID], states=["success", "fail"], ), ] * 2 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
guan404ming commented on PR #50487: URL: https://github.com/apache/airflow/pull/50487#issuecomment-2875228169 CI failure seems fixed in #50521, reopen it for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
guan404ming commented on PR #50487: URL: https://github.com/apache/airflow/pull/50487#issuecomment-2873512212 Mark it draft for fixing failing ci -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
guan404ming commented on code in PR #50487: URL: https://github.com/apache/airflow/pull/50487#discussion_r2085047153 ## providers/standard/src/airflow/providers/standard/sensors/external_task.py: ## @@ -455,23 +461,25 @@ def _check_for_existence(self, session) -> None: dag_to_wait = DagModel.get_current(self.external_dag_id, session) if not dag_to_wait: -raise AirflowException(f"The external DAG {self.external_dag_id} does not exist.") +raise AirflowExternalTaskSensorException( +f"The external DAG {self.external_dag_id} does not exist." +) if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)): -raise AirflowException(f"The external DAG {self.external_dag_id} was deleted.") +raise AirflowExternalTaskSensorException(f"The external DAG {self.external_dag_id} was deleted.") if self.external_task_ids: refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) for external_task_id in self.external_task_ids: if not refreshed_dag_info.has_task(external_task_id): -raise AirflowException( +raise AirflowExternalTaskSensorException( f"The external task {external_task_id} in DAG {self.external_dag_id} does not exist." ) if self.external_task_group_id: refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) if not refreshed_dag_info.has_task_group(self.external_task_group_id): -raise AirflowException( +raise AirflowExternalTaskSensorException( Review Comment: I've created multiple exceptions for different cases and also update the test cases. Thanks for the suggestions! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
guan404ming commented on code in PR #50487: URL: https://github.com/apache/airflow/pull/50487#discussion_r2085047153 ## providers/standard/src/airflow/providers/standard/sensors/external_task.py: ## @@ -455,23 +461,25 @@ def _check_for_existence(self, session) -> None: dag_to_wait = DagModel.get_current(self.external_dag_id, session) if not dag_to_wait: -raise AirflowException(f"The external DAG {self.external_dag_id} does not exist.") +raise AirflowExternalTaskSensorException( +f"The external DAG {self.external_dag_id} does not exist." +) if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)): -raise AirflowException(f"The external DAG {self.external_dag_id} was deleted.") +raise AirflowExternalTaskSensorException(f"The external DAG {self.external_dag_id} was deleted.") if self.external_task_ids: refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) for external_task_id in self.external_task_ids: if not refreshed_dag_info.has_task(external_task_id): -raise AirflowException( +raise AirflowExternalTaskSensorException( f"The external task {external_task_id} in DAG {self.external_dag_id} does not exist." ) if self.external_task_group_id: refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) if not refreshed_dag_info.has_task_group(self.external_task_group_id): -raise AirflowException( +raise AirflowExternalTaskSensorException( Review Comment: I've created multiple exceptions for different cases and also update the test cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
Lee-W commented on code in PR #50487: URL: https://github.com/apache/airflow/pull/50487#discussion_r2084918140 ## providers/standard/src/airflow/providers/standard/sensors/external_task.py: ## @@ -455,23 +461,25 @@ def _check_for_existence(self, session) -> None: dag_to_wait = DagModel.get_current(self.external_dag_id, session) if not dag_to_wait: -raise AirflowException(f"The external DAG {self.external_dag_id} does not exist.") +raise AirflowExternalTaskSensorException( +f"The external DAG {self.external_dag_id} does not exist." +) if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)): -raise AirflowException(f"The external DAG {self.external_dag_id} was deleted.") +raise AirflowExternalTaskSensorException(f"The external DAG {self.external_dag_id} was deleted.") if self.external_task_ids: refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) for external_task_id in self.external_task_ids: if not refreshed_dag_info.has_task(external_task_id): -raise AirflowException( +raise AirflowExternalTaskSensorException( f"The external task {external_task_id} in DAG {self.external_dag_id} does not exist." ) if self.external_task_group_id: refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) if not refreshed_dag_info.has_task_group(self.external_task_group_id): -raise AirflowException( +raise AirflowExternalTaskSensorException( Review Comment: The exception created here is not a description and does not help much. Maybe something like `ExternalDagNotExistsError` would be better ## providers/standard/src/airflow/providers/standard/sensors/external_task.py: ## @@ -455,23 +461,25 @@ def _check_for_existence(self, session) -> None: dag_to_wait = DagModel.get_current(self.external_dag_id, session) if not dag_to_wait: -raise AirflowException(f"The external DAG {self.external_dag_id} does not exist.") +raise AirflowExternalTaskSensorException( +f"The external DAG {self.external_dag_id} does not exist." +) if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)): -raise AirflowException(f"The external DAG {self.external_dag_id} was deleted.") +raise AirflowExternalTaskSensorException(f"The external DAG {self.external_dag_id} was deleted.") if self.external_task_ids: refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) for external_task_id in self.external_task_ids: if not refreshed_dag_info.has_task(external_task_id): -raise AirflowException( +raise AirflowExternalTaskSensorException( f"The external task {external_task_id} in DAG {self.external_dag_id} does not exist." ) if self.external_task_group_id: refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) if not refreshed_dag_info.has_task_group(self.external_task_group_id): -raise AirflowException( +raise AirflowExternalTaskSensorException( Review Comment: and we can create multiple exceptions for different kinds of errors ## providers/standard/src/airflow/providers/standard/sensors/external_task.py: ## @@ -24,9 +23,12 @@ from typing import TYPE_CHECKING, Any, Callable, ClassVar from airflow.configuration import conf -from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.exceptions import ( +AirflowSkipException, +) Review Comment: ```suggestion from airflow.exceptions import AirflowSkipException ``` nit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
guan404ming commented on PR #50487: URL: https://github.com/apache/airflow/pull/50487#issuecomment-2872964410 Pushed for adding test for exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
guan404ming commented on code in PR #50487: URL: https://github.com/apache/airflow/pull/50487#discussion_r2084855929 ## providers/standard/src/airflow/providers/standard/sensors/external_task.py: ## @@ -443,6 +443,10 @@ def execute_complete(self, context, event=None): self.log.info("External tasks %s has executed successfully.", self.external_task_ids) elif event["status"] == "skipped": raise AirflowSkipException("External job has skipped skipping.") +elif event["status"] == "failed": +if self.soft_fail: +raise AirflowSkipException("External job has failed skipping.") +raise AirflowException("External job has failed.") Review Comment: Updated. Hope everything good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
guan404ming commented on code in PR #50487: URL: https://github.com/apache/airflow/pull/50487#discussion_r2084841870 ## providers/standard/src/airflow/providers/standard/sensors/external_task.py: ## @@ -443,6 +443,10 @@ def execute_complete(self, context, event=None): self.log.info("External tasks %s has executed successfully.", self.external_task_ids) elif event["status"] == "skipped": raise AirflowSkipException("External job has skipped skipping.") +elif event["status"] == "failed": +if self.soft_fail: +raise AirflowSkipException("External job has failed skipping.") +raise AirflowException("External job has failed.") Review Comment: let me fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
eladkal commented on code in PR #50487: URL: https://github.com/apache/airflow/pull/50487#discussion_r2084837866 ## providers/standard/src/airflow/providers/standard/sensors/external_task.py: ## @@ -443,6 +443,10 @@ def execute_complete(self, context, event=None): self.log.info("External tasks %s has executed successfully.", self.external_task_ids) elif event["status"] == "skipped": raise AirflowSkipException("External job has skipped skipping.") +elif event["status"] == "failed": +if self.soft_fail: +raise AirflowSkipException("External job has failed skipping.") +raise AirflowException("External job has failed.") Review Comment: If we do please set it with exceptions.py in the provider itself (not in airflow-core) example: https://github.com/apache/airflow/blob/1cde11a447e60d0738b0c317c3d3e8265360014f/providers/databricks/src/airflow/providers/databricks/exceptions.py#L27-L28 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
guan404ming commented on code in PR #50487: URL: https://github.com/apache/airflow/pull/50487#discussion_r2084835463 ## providers/standard/src/airflow/providers/standard/sensors/external_task.py: ## @@ -443,6 +443,10 @@ def execute_complete(self, context, event=None): self.log.info("External tasks %s has executed successfully.", self.external_task_ids) elif event["status"] == "skipped": raise AirflowSkipException("External job has skipped skipping.") +elif event["status"] == "failed": +if self.soft_fail: +raise AirflowSkipException("External job has failed skipping.") +raise AirflowException("External job has failed.") Review Comment: Sure, I've updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
Lee-W commented on PR #50487: URL: https://github.com/apache/airflow/pull/50487#issuecomment-2872672617 Looks like we might need to fix some compat issue 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]
Lee-W commented on code in PR #50487: URL: https://github.com/apache/airflow/pull/50487#discussion_r2084726523 ## providers/standard/src/airflow/providers/standard/sensors/external_task.py: ## @@ -443,6 +443,10 @@ def execute_complete(self, context, event=None): self.log.info("External tasks %s has executed successfully.", self.external_task_ids) elif event["status"] == "skipped": raise AirflowSkipException("External job has skipped skipping.") +elif event["status"] == "failed": +if self.soft_fail: +raise AirflowSkipException("External job has failed skipping.") +raise AirflowException("External job has failed.") Review Comment: It would be better if we could add a new exception instead of using `AirflowExceptiuon`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org