SameerMesiah97 commented on code in PR #66781:
URL: https://github.com/apache/airflow/pull/66781#discussion_r3229986049
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1397,7 +1397,11 @@ def _on_term(signum, frame):
finally:
stats.incr("ti.finish", tags={**stats_tags, "state": state.value})
- if msg:
+ # For UP_FOR_RETRY, defer sending the message until after
on_retry_callback has run
+ # (finalize() sends it). This lets an AirflowFailException raised
inside the callback
+ # promote the state to FAILED instead of letting the supervisor record
a retry that
+ # the user explicitly asked to skip. See #60172.
Review Comment:
nit: this could be clearer:
```
# Delay reporting UP_FOR_RETRY to the supervisor until after
# on_retry_callback runs so AirflowFailException can promote
# the task to FAILED and suppress the retry.
```
I don't think the issue reference is needed. But this is a more subjective
point.
##########
airflow-core/newsfragments/60172.bugfix.rst:
##########
@@ -0,0 +1 @@
+Honor ``AirflowFailException`` raised inside ``on_retry_callback`` and mark
the task FAILED instead of retrying. Previously the exception was swallowed and
the task continued retrying, ignoring the explicit signal to stop.
Review Comment:
newsfragment filename should be the PR number followed by bugfix.rst i.e.
66781.bugfix.rst
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4506,6 +4507,61 @@ class CustomOperator(BaseOperator):
expected_exception_logs.insert(index, calls)
assert log.exception.mock_calls == expected_exception_logs
+ def test_airflow_fail_exception_in_on_retry_callback_fails_task(
+ self, create_runtime_ti, mock_supervisor_comms
+ ):
+ """
+ AirflowFailException raised in on_retry_callback should fail the task
without retrying.
+
+ Regression test for #60172.
+ """
+
+ def _execute_failure(context):
+ raise RuntimeError("transient")
+
+ retry_callback_calls = []
+ failure_callback_calls = []
+
+ def retry_callback(context):
+ retry_callback_calls.append(context["ti"].state)
+ raise AirflowFailException("give up")
+
+ def failure_callback(context):
+ failure_callback_calls.append(context["ti"].state)
+
+ class CustomOperator(BaseOperator):
+ execute = staticmethod(_execute_failure)
+
+ task = CustomOperator(
+ task_id="task",
+ on_retry_callback=retry_callback,
+ on_failure_callback=failure_callback,
+ )
+ # ``should_retry=True`` puts the task on the retry path;
AirflowFailException raised in
+ # on_retry_callback must override that decision.
+ runtime_ti = create_runtime_ti(dag_id="dag", task=task,
should_retry=True)
+ log = mock.MagicMock()
+ context = runtime_ti.get_template_context()
+ state, msg, error = run(runtime_ti, context, log)
+ finalize(runtime_ti, state, context, log, error, msg=msg)
+
+ assert runtime_ti.state == TaskInstanceState.FAILED
+ # Both callbacks should have run (retry callback first, then failure
callback after
+ # AirflowFailException promoted the state to FAILED).
+ assert len(retry_callback_calls) == 1
+ assert len(failure_callback_calls) == 1
Review Comment:
Maybe it would be better to assert the state transition here instead of the
number of calls:
```
assert retry_callback_calls == [TaskInstanceState.UP_FOR_RETRY]
assert failure_callback_calls == [TaskInstanceState.FAILED]
```
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4506,6 +4507,61 @@ class CustomOperator(BaseOperator):
expected_exception_logs.insert(index, calls)
assert log.exception.mock_calls == expected_exception_logs
+ def test_airflow_fail_exception_in_on_retry_callback_fails_task(
+ self, create_runtime_ti, mock_supervisor_comms
+ ):
+ """
+ AirflowFailException raised in on_retry_callback should fail the task
without retrying.
+
+ Regression test for #60172.
Review Comment:
Remove this.
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1920,15 +1937,43 @@ def finalize(
except Exception:
log.exception("error calling listener")
elif state == TaskInstanceState.UP_FOR_RETRY:
- _run_task_state_change_callbacks(task, "on_retry_callback", context,
log)
+ from airflow.sdk.exceptions import AirflowFailException
+
try:
- get_listener_manager().hook.on_task_instance_failed(
- previous_state=TaskInstanceState.RUNNING, task_instance=ti,
error=error
+ _run_task_state_change_callbacks(task, "on_retry_callback",
context, log)
+ except AirflowFailException as fail_exc:
+ # User explicitly asked to fail without retrying from inside
on_retry_callback.
+ # Promote the state to FAILED, replace any pending RetryTask with
TaskState(FAILED),
+ # and run the failure-path finalizers. See #60172.
+ log.info("AirflowFailException raised in on_retry_callback;
failing task without retry")
+ state = TaskInstanceState.FAILED
+ ti.state = state
+ error = fail_exc
+ context["exception"] = fail_exc
+ ti.end_date = datetime.now(tz=timezone.utc)
+ msg = TaskState(
+ state=TaskInstanceState.FAILED,
+ end_date=ti.end_date,
+ rendered_map_index=ti.rendered_map_index,
)
- except Exception:
- log.exception("error calling listener")
- if error and task.email_on_retry and task.email:
- _send_error_email_notification(task, ti, context, error, log)
+ _run_task_state_change_callbacks(task, "on_failure_callback",
context, log)
+ try:
+ get_listener_manager().hook.on_task_instance_failed(
+ previous_state=TaskInstanceState.RUNNING,
task_instance=ti, error=error
+ )
+ except Exception:
+ log.exception("error calling listener")
+ if task.email_on_failure and task.email:
+ _send_error_email_notification(task, ti, context, error, log)
+ else:
+ try:
+ get_listener_manager().hook.on_task_instance_failed(
+ previous_state=TaskInstanceState.RUNNING,
task_instance=ti, error=error
+ )
+ except Exception:
+ log.exception("error calling listener")
+ if error and task.email_on_retry and task.email:
+ _send_error_email_notification(task, ti, context, error, log)
Review Comment:
Right now, this is a bit hard to follow. I would recommend extracting the
listener and email notification into a helper like this:
```
def _handle_failure_notifications(
*,
task,
ti,
context,
error,
log,
send_email: bool,
) -> None:
try:
get_listener_manager().hook.on_task_instance_failed(
previous_state=TaskInstanceState.RUNNING,
task_instance=ti,
error=error,
)
except Exception:
log.exception("error calling listener")
if send_email and task.email:
_send_error_email_notification(task, ti, context, error, log)
```
Then plug `_handle_failure_notifications` in so that lines 1960-1976 are
replaced by this:
```
_handle_failure_notifications(
task=task,
ti=ti,
context=context,
error=error,
log=log,
send_email=task.email_on_failure,
)
else:
_handle_failure_notifications(
task=task,
ti=ti,
context=context,
error=error,
log=log,
send_email=bool(error and task.email_on_retry),
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]