1fanwang commented on code in PR #66781:
URL: https://github.com/apache/airflow/pull/66781#discussion_r3232164383


##########
airflow-core/newsfragments/66781.bugfix.rst:
##########
@@ -0,0 +1 @@
+Honor ``AirflowFailException`` raised inside ``on_retry_callback`` and mark 
the task FAILED instead of retrying. Previously the exception was swallowed and 
the task continued retrying, ignoring the explicit signal to stop.

Review Comment:
   Done in 193d2d6 — renamed to `66781.bugfix.rst`.



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1397,7 +1397,11 @@ def _on_term(signum, frame):
     finally:
         stats.incr("ti.finish", tags={**stats_tags, "state": state.value})
 
-        if msg:
+        # For UP_FOR_RETRY, defer sending the message until after 
on_retry_callback has run
+        # (finalize() sends it). This lets an AirflowFailException raised 
inside the callback
+        # promote the state to FAILED instead of letting the supervisor record 
a retry that
+        # the user explicitly asked to skip. See #60172.

Review Comment:
   Done in 193d2d6 — applied the shorter wording and dropped the issue 
reference.



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1920,15 +1937,43 @@ def finalize(
         except Exception:
             log.exception("error calling listener")
     elif state == TaskInstanceState.UP_FOR_RETRY:
-        _run_task_state_change_callbacks(task, "on_retry_callback", context, 
log)
+        from airflow.sdk.exceptions import AirflowFailException
+
         try:
-            get_listener_manager().hook.on_task_instance_failed(
-                previous_state=TaskInstanceState.RUNNING, task_instance=ti, 
error=error
+            _run_task_state_change_callbacks(task, "on_retry_callback", 
context, log)
+        except AirflowFailException as fail_exc:
+            # User explicitly asked to fail without retrying from inside 
on_retry_callback.
+            # Promote the state to FAILED, replace any pending RetryTask with 
TaskState(FAILED),
+            # and run the failure-path finalizers. See #60172.
+            log.info("AirflowFailException raised in on_retry_callback; 
failing task without retry")
+            state = TaskInstanceState.FAILED
+            ti.state = state
+            error = fail_exc
+            context["exception"] = fail_exc
+            ti.end_date = datetime.now(tz=timezone.utc)
+            msg = TaskState(
+                state=TaskInstanceState.FAILED,
+                end_date=ti.end_date,
+                rendered_map_index=ti.rendered_map_index,
             )
-        except Exception:
-            log.exception("error calling listener")
-        if error and task.email_on_retry and task.email:
-            _send_error_email_notification(task, ti, context, error, log)
+            _run_task_state_change_callbacks(task, "on_failure_callback", 
context, log)
+            try:
+                get_listener_manager().hook.on_task_instance_failed(
+                    previous_state=TaskInstanceState.RUNNING, 
task_instance=ti, error=error
+                )
+            except Exception:
+                log.exception("error calling listener")
+            if task.email_on_failure and task.email:
+                _send_error_email_notification(task, ti, context, error, log)
+        else:
+            try:
+                get_listener_manager().hook.on_task_instance_failed(
+                    previous_state=TaskInstanceState.RUNNING, 
task_instance=ti, error=error
+                )
+            except Exception:
+                log.exception("error calling listener")
+            if error and task.email_on_retry and task.email:
+                _send_error_email_notification(task, ti, context, error, log)

Review Comment:
   Done in 193d2d6 — added `_handle_failure_notifications` and applied it 
across all three failure-path branches in `finalize` (AirflowFailException, 
retry, and FAILED) so the listener+email pattern only lives in one place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to