Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-13 Thread via GitHub


guan404ming commented on PR #50487:
URL: https://github.com/apache/airflow/pull/50487#issuecomment-2876746363

   Thanks for all reviews and suggestions!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-13 Thread via GitHub


Lee-W merged PR #50487:
URL: https://github.com/apache/airflow/pull/50487


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-13 Thread via GitHub


guan404ming commented on code in PR #50487:
URL: https://github.com/apache/airflow/pull/50487#discussion_r2086555433


##
providers/standard/tests/unit/standard/triggers/test_external_task.py:
##
@@ -487,14 +508,29 @@ async def 
test_task_workflow_trigger_fail_count_eq_0(self, mock_get_count):
 result = trigger_task.result()
 assert isinstance(result, TriggerEvent)
 assert result.payload == {"status": "success"}
-mock_get_count.assert_called_once_with(
-dttm_filter=value,
-external_task_ids=["external_task_op"],
-external_task_group_id=None,
-external_dag_id="external_task",
-states=["success", "fail"],
+
+assert mock_get_count.call_count == 2
+mock_get_count.assert_has_calls(
+[
+mock.call(
+dttm_filter=value,
+external_task_ids=["external_task_op"],
+external_task_group_id=None,
+external_dag_id="external_task",
+states=["success", "fail"],
+),
+mock.call(
+dttm_filter=value,
+external_task_ids=["external_task_op"],
+external_task_group_id=None,
+external_dag_id="external_task",
+states=["success", "fail"],
+),
+]

Review Comment:
   This case is in AF2 thus some field seems not worked here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-13 Thread via GitHub


guan404ming commented on PR #50487:
URL: https://github.com/apache/airflow/pull/50487#issuecomment-2876045319

   I've fixed it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-13 Thread via GitHub


eladkal commented on PR #50487:
URL: https://github.com/apache/airflow/pull/50487#issuecomment-2875852199

   static checks fails


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-13 Thread via GitHub


guan404ming commented on code in PR #50487:
URL: https://github.com/apache/airflow/pull/50487#discussion_r2086245864


##
providers/standard/tests/unit/standard/test_exceptions.py:
##
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations

Review Comment:
   Yes, it would fail. Actually, I forget to add it at first thus I know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-13 Thread via GitHub


Lee-W commented on code in PR #50487:
URL: https://github.com/apache/airflow/pull/50487#discussion_r2086218686


##
providers/standard/tests/unit/standard/test_exceptions.py:
##
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations

Review Comment:
   I don't mind having this module. I'm just curious if it's failing anything 
in the CI if we don't have it.



##
providers/standard/tests/unit/standard/triggers/test_external_task.py:
##
@@ -140,13 +141,28 @@ async def 
test_task_workflow_trigger_fail_count_eq_0(self, mock_get_count):
 result = trigger_task.result()
 assert isinstance(result, TriggerEvent)
 assert result.payload == {"status": "success"}
-mock_get_count.assert_called_once_with(
-dag_id="external_task",
-task_ids=["external_task_op"],
-logical_dates=[self.LOGICAL_DATE],
-run_ids=[self.RUN_ID],
-states=["success", "fail"],
+
+# Verify both calls were made
+assert mock_get_count.call_count == 2
+mock_get_count.assert_has_calls(
+[
+mock.call(
+dag_id="external_task",
+task_ids=["external_task_op"],
+logical_dates=[self.LOGICAL_DATE],
+run_ids=[self.RUN_ID],
+states=["success", "fail"],
+),
+mock.call(
+dag_id="external_task",
+task_ids=["external_task_op"],
+logical_dates=[self.LOGICAL_DATE],
+run_ids=[self.RUN_ID],
+states=["success", "fail"],
+),
+]

Review Comment:
   nit
   
   ```suggestion
   [
   mock.call(
   dag_id="external_task",
   task_ids=["external_task_op"],
   logical_dates=[self.LOGICAL_DATE],
   run_ids=[self.RUN_ID],
   states=["success", "fail"],
   ),
   ] * 2
   ```



##
providers/standard/tests/unit/standard/triggers/test_external_task.py:
##
@@ -487,14 +508,29 @@ async def 
test_task_workflow_trigger_fail_count_eq_0(self, mock_get_count):
 result = trigger_task.result()
 assert isinstance(result, TriggerEvent)
 assert result.payload == {"status": "success"}
-mock_get_count.assert_called_once_with(
-dttm_filter=value,
-external_task_ids=["external_task_op"],
-external_task_group_id=None,
-external_dag_id="external_task",
-states=["success", "fail"],
+
+assert mock_get_count.call_count == 2
+mock_get_count.assert_has_calls(
+[
+mock.call(
+dttm_filter=value,
+external_task_ids=["external_task_op"],
+external_task_group_id=None,
+external_dag_id="external_task",
+states=["success", "fail"],
+),
+mock.call(
+dttm_filter=value,
+external_task_ids=["external_task_op"],
+external_task_group_id=None,
+external_dag_id="external_task",
+states=["success", "fail"],
+),
+]

Review Comment:
   nit
   
   ```suggestion
   [
   mock.call(
   dag_id="external_task",
   task_ids=["external_task_op"],
   logical_dates=[self.LOGICAL_DATE],
   run_ids=[self.RUN_ID],
   states=["success", "fail"],
   ),
   ] * 2
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-12 Thread via GitHub


guan404ming commented on PR #50487:
URL: https://github.com/apache/airflow/pull/50487#issuecomment-2875228169

   CI failure seems fixed in #50521, reopen it for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-12 Thread via GitHub


guan404ming commented on PR #50487:
URL: https://github.com/apache/airflow/pull/50487#issuecomment-2873512212

   Mark it draft for fixing failing ci


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-12 Thread via GitHub


guan404ming commented on code in PR #50487:
URL: https://github.com/apache/airflow/pull/50487#discussion_r2085047153


##
providers/standard/src/airflow/providers/standard/sensors/external_task.py:
##
@@ -455,23 +461,25 @@ def _check_for_existence(self, session) -> None:
 dag_to_wait = DagModel.get_current(self.external_dag_id, session)
 
 if not dag_to_wait:
-raise AirflowException(f"The external DAG {self.external_dag_id} 
does not exist.")
+raise AirflowExternalTaskSensorException(
+f"The external DAG {self.external_dag_id} does not exist."
+)
 
 if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)):
-raise AirflowException(f"The external DAG {self.external_dag_id} 
was deleted.")
+raise AirflowExternalTaskSensorException(f"The external DAG 
{self.external_dag_id} was deleted.")
 
 if self.external_task_ids:
 refreshed_dag_info = 
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
 for external_task_id in self.external_task_ids:
 if not refreshed_dag_info.has_task(external_task_id):
-raise AirflowException(
+raise AirflowExternalTaskSensorException(
 f"The external task {external_task_id} in DAG 
{self.external_dag_id} does not exist."
 )
 
 if self.external_task_group_id:
 refreshed_dag_info = 
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
 if not 
refreshed_dag_info.has_task_group(self.external_task_group_id):
-raise AirflowException(
+raise AirflowExternalTaskSensorException(

Review Comment:
   I've created multiple exceptions for different cases and also update the 
test cases. Thanks for the suggestions!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-12 Thread via GitHub


guan404ming commented on code in PR #50487:
URL: https://github.com/apache/airflow/pull/50487#discussion_r2085047153


##
providers/standard/src/airflow/providers/standard/sensors/external_task.py:
##
@@ -455,23 +461,25 @@ def _check_for_existence(self, session) -> None:
 dag_to_wait = DagModel.get_current(self.external_dag_id, session)
 
 if not dag_to_wait:
-raise AirflowException(f"The external DAG {self.external_dag_id} 
does not exist.")
+raise AirflowExternalTaskSensorException(
+f"The external DAG {self.external_dag_id} does not exist."
+)
 
 if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)):
-raise AirflowException(f"The external DAG {self.external_dag_id} 
was deleted.")
+raise AirflowExternalTaskSensorException(f"The external DAG 
{self.external_dag_id} was deleted.")
 
 if self.external_task_ids:
 refreshed_dag_info = 
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
 for external_task_id in self.external_task_ids:
 if not refreshed_dag_info.has_task(external_task_id):
-raise AirflowException(
+raise AirflowExternalTaskSensorException(
 f"The external task {external_task_id} in DAG 
{self.external_dag_id} does not exist."
 )
 
 if self.external_task_group_id:
 refreshed_dag_info = 
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
 if not 
refreshed_dag_info.has_task_group(self.external_task_group_id):
-raise AirflowException(
+raise AirflowExternalTaskSensorException(

Review Comment:
   I've created multiple exceptions for different cases and also update the 
test cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-12 Thread via GitHub


Lee-W commented on code in PR #50487:
URL: https://github.com/apache/airflow/pull/50487#discussion_r2084918140


##
providers/standard/src/airflow/providers/standard/sensors/external_task.py:
##
@@ -455,23 +461,25 @@ def _check_for_existence(self, session) -> None:
 dag_to_wait = DagModel.get_current(self.external_dag_id, session)
 
 if not dag_to_wait:
-raise AirflowException(f"The external DAG {self.external_dag_id} 
does not exist.")
+raise AirflowExternalTaskSensorException(
+f"The external DAG {self.external_dag_id} does not exist."
+)
 
 if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)):
-raise AirflowException(f"The external DAG {self.external_dag_id} 
was deleted.")
+raise AirflowExternalTaskSensorException(f"The external DAG 
{self.external_dag_id} was deleted.")
 
 if self.external_task_ids:
 refreshed_dag_info = 
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
 for external_task_id in self.external_task_ids:
 if not refreshed_dag_info.has_task(external_task_id):
-raise AirflowException(
+raise AirflowExternalTaskSensorException(
 f"The external task {external_task_id} in DAG 
{self.external_dag_id} does not exist."
 )
 
 if self.external_task_group_id:
 refreshed_dag_info = 
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
 if not 
refreshed_dag_info.has_task_group(self.external_task_group_id):
-raise AirflowException(
+raise AirflowExternalTaskSensorException(

Review Comment:
   The exception created here is not a description and does not help much. 
Maybe something like `ExternalDagNotExistsError` would be better



##
providers/standard/src/airflow/providers/standard/sensors/external_task.py:
##
@@ -455,23 +461,25 @@ def _check_for_existence(self, session) -> None:
 dag_to_wait = DagModel.get_current(self.external_dag_id, session)
 
 if not dag_to_wait:
-raise AirflowException(f"The external DAG {self.external_dag_id} 
does not exist.")
+raise AirflowExternalTaskSensorException(
+f"The external DAG {self.external_dag_id} does not exist."
+)
 
 if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)):
-raise AirflowException(f"The external DAG {self.external_dag_id} 
was deleted.")
+raise AirflowExternalTaskSensorException(f"The external DAG 
{self.external_dag_id} was deleted.")
 
 if self.external_task_ids:
 refreshed_dag_info = 
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
 for external_task_id in self.external_task_ids:
 if not refreshed_dag_info.has_task(external_task_id):
-raise AirflowException(
+raise AirflowExternalTaskSensorException(
 f"The external task {external_task_id} in DAG 
{self.external_dag_id} does not exist."
 )
 
 if self.external_task_group_id:
 refreshed_dag_info = 
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
 if not 
refreshed_dag_info.has_task_group(self.external_task_group_id):
-raise AirflowException(
+raise AirflowExternalTaskSensorException(

Review Comment:
   and we can create multiple exceptions for different kinds of errors



##
providers/standard/src/airflow/providers/standard/sensors/external_task.py:
##
@@ -24,9 +23,12 @@
 from typing import TYPE_CHECKING, Any, Callable, ClassVar
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import (
+AirflowSkipException,
+)

Review Comment:
   ```suggestion
   from airflow.exceptions import AirflowSkipException
   ```
   
   nit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-12 Thread via GitHub


guan404ming commented on PR #50487:
URL: https://github.com/apache/airflow/pull/50487#issuecomment-2872964410

   Pushed for adding test for exception.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-12 Thread via GitHub


guan404ming commented on code in PR #50487:
URL: https://github.com/apache/airflow/pull/50487#discussion_r2084855929


##
providers/standard/src/airflow/providers/standard/sensors/external_task.py:
##
@@ -443,6 +443,10 @@ def execute_complete(self, context, event=None):
 self.log.info("External tasks %s has executed successfully.", 
self.external_task_ids)
 elif event["status"] == "skipped":
 raise AirflowSkipException("External job has skipped skipping.")
+elif event["status"] == "failed":
+if self.soft_fail:
+raise AirflowSkipException("External job has failed skipping.")
+raise AirflowException("External job has failed.")

Review Comment:
   Updated. Hope everything good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-12 Thread via GitHub


guan404ming commented on code in PR #50487:
URL: https://github.com/apache/airflow/pull/50487#discussion_r2084841870


##
providers/standard/src/airflow/providers/standard/sensors/external_task.py:
##
@@ -443,6 +443,10 @@ def execute_complete(self, context, event=None):
 self.log.info("External tasks %s has executed successfully.", 
self.external_task_ids)
 elif event["status"] == "skipped":
 raise AirflowSkipException("External job has skipped skipping.")
+elif event["status"] == "failed":
+if self.soft_fail:
+raise AirflowSkipException("External job has failed skipping.")
+raise AirflowException("External job has failed.")

Review Comment:
   let me fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-12 Thread via GitHub


eladkal commented on code in PR #50487:
URL: https://github.com/apache/airflow/pull/50487#discussion_r2084837866


##
providers/standard/src/airflow/providers/standard/sensors/external_task.py:
##
@@ -443,6 +443,10 @@ def execute_complete(self, context, event=None):
 self.log.info("External tasks %s has executed successfully.", 
self.external_task_ids)
 elif event["status"] == "skipped":
 raise AirflowSkipException("External job has skipped skipping.")
+elif event["status"] == "failed":
+if self.soft_fail:
+raise AirflowSkipException("External job has failed skipping.")
+raise AirflowException("External job has failed.")

Review Comment:
   If we do please set it with exceptions.py in the provider itself (not in 
airflow-core)
   example: 
   
https://github.com/apache/airflow/blob/1cde11a447e60d0738b0c317c3d3e8265360014f/providers/databricks/src/airflow/providers/databricks/exceptions.py#L27-L28



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-12 Thread via GitHub


guan404ming commented on code in PR #50487:
URL: https://github.com/apache/airflow/pull/50487#discussion_r2084835463


##
providers/standard/src/airflow/providers/standard/sensors/external_task.py:
##
@@ -443,6 +443,10 @@ def execute_complete(self, context, event=None):
 self.log.info("External tasks %s has executed successfully.", 
self.external_task_ids)
 elif event["status"] == "skipped":
 raise AirflowSkipException("External job has skipped skipping.")
+elif event["status"] == "failed":
+if self.soft_fail:
+raise AirflowSkipException("External job has failed skipping.")
+raise AirflowException("External job has failed.")

Review Comment:
   Sure, I've updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-12 Thread via GitHub


Lee-W commented on PR #50487:
URL: https://github.com/apache/airflow/pull/50487#issuecomment-2872672617

   Looks like we might need to fix some compat issue 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update `WorkflowTrigger` to forward failed_stat [airflow]

2025-05-12 Thread via GitHub


Lee-W commented on code in PR #50487:
URL: https://github.com/apache/airflow/pull/50487#discussion_r2084726523


##
providers/standard/src/airflow/providers/standard/sensors/external_task.py:
##
@@ -443,6 +443,10 @@ def execute_complete(self, context, event=None):
 self.log.info("External tasks %s has executed successfully.", 
self.external_task_ids)
 elif event["status"] == "skipped":
 raise AirflowSkipException("External job has skipped skipping.")
+elif event["status"] == "failed":
+if self.soft_fail:
+raise AirflowSkipException("External job has failed skipping.")
+raise AirflowException("External job has failed.")

Review Comment:
   It would be better if we could add a new exception instead of using 
`AirflowExceptiuon`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org