1fanwang commented on code in PR #66781:
URL: https://github.com/apache/airflow/pull/66781#discussion_r3232164383
##########
airflow-core/newsfragments/66781.bugfix.rst:
##########
@@ -0,0 +1 @@
+Honor ``AirflowFailException`` raised inside ``on_retry_callback`` and mark
the task FAILED instead of retrying. Previously the exception was swallowed and
the task continued retrying, ignoring the explicit signal to stop.
Review Comment:
Done in 193d2d6 — renamed to `66781.bugfix.rst`.
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1397,7 +1397,11 @@ def _on_term(signum, frame):
finally:
stats.incr("ti.finish", tags={**stats_tags, "state": state.value})
- if msg:
+ # For UP_FOR_RETRY, defer sending the message until after
on_retry_callback has run
+ # (finalize() sends it). This lets an AirflowFailException raised
inside the callback
+ # promote the state to FAILED instead of letting the supervisor record
a retry that
+ # the user explicitly asked to skip. See #60172.
Review Comment:
Done in 193d2d6 — applied the shorter wording and dropped the issue
reference.
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1920,15 +1937,43 @@ def finalize(
except Exception:
log.exception("error calling listener")
elif state == TaskInstanceState.UP_FOR_RETRY:
- _run_task_state_change_callbacks(task, "on_retry_callback", context,
log)
+ from airflow.sdk.exceptions import AirflowFailException
+
try:
- get_listener_manager().hook.on_task_instance_failed(
- previous_state=TaskInstanceState.RUNNING, task_instance=ti,
error=error
+ _run_task_state_change_callbacks(task, "on_retry_callback",
context, log)
+ except AirflowFailException as fail_exc:
+ # User explicitly asked to fail without retrying from inside
on_retry_callback.
+ # Promote the state to FAILED, replace any pending RetryTask with
TaskState(FAILED),
+ # and run the failure-path finalizers. See #60172.
+ log.info("AirflowFailException raised in on_retry_callback;
failing task without retry")
+ state = TaskInstanceState.FAILED
+ ti.state = state
+ error = fail_exc
+ context["exception"] = fail_exc
+ ti.end_date = datetime.now(tz=timezone.utc)
+ msg = TaskState(
+ state=TaskInstanceState.FAILED,
+ end_date=ti.end_date,
+ rendered_map_index=ti.rendered_map_index,
)
- except Exception:
- log.exception("error calling listener")
- if error and task.email_on_retry and task.email:
- _send_error_email_notification(task, ti, context, error, log)
+ _run_task_state_change_callbacks(task, "on_failure_callback",
context, log)
+ try:
+ get_listener_manager().hook.on_task_instance_failed(
+ previous_state=TaskInstanceState.RUNNING,
task_instance=ti, error=error
+ )
+ except Exception:
+ log.exception("error calling listener")
+ if task.email_on_failure and task.email:
+ _send_error_email_notification(task, ti, context, error, log)
+ else:
+ try:
+ get_listener_manager().hook.on_task_instance_failed(
+ previous_state=TaskInstanceState.RUNNING,
task_instance=ti, error=error
+ )
+ except Exception:
+ log.exception("error calling listener")
+ if error and task.email_on_retry and task.email:
+ _send_error_email_notification(task, ti, context, error, log)
Review Comment:
Done in 193d2d6 — added `_handle_failure_notifications` and applied it
across all three failure-path branches in `finalize` (AirflowFailException,
retry, and FAILED) so the listener+email pattern only lives in one place.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]