1fanwang commented on code in PR #66781:
URL: https://github.com/apache/airflow/pull/66781#discussion_r3232341972


##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4506,6 +4507,61 @@ class CustomOperator(BaseOperator):
             expected_exception_logs.insert(index, calls)
         assert log.exception.mock_calls == expected_exception_logs
 
+    def test_airflow_fail_exception_in_on_retry_callback_fails_task(
+        self, create_runtime_ti, mock_supervisor_comms
+    ):
+        """
+        AirflowFailException raised in on_retry_callback should fail the task 
without retrying.
+
+        Regression test for #60172.

Review Comment:
   Done in f59182e — dropped the docstring; the test name already describes the 
behavior.



##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4506,6 +4507,61 @@ class CustomOperator(BaseOperator):
             expected_exception_logs.insert(index, calls)
         assert log.exception.mock_calls == expected_exception_logs
 
+    def test_airflow_fail_exception_in_on_retry_callback_fails_task(
+        self, create_runtime_ti, mock_supervisor_comms
+    ):
+        """
+        AirflowFailException raised in on_retry_callback should fail the task 
without retrying.
+
+        Regression test for #60172.
+        """
+
+        def _execute_failure(context):
+            raise RuntimeError("transient")
+
+        retry_callback_calls = []
+        failure_callback_calls = []
+
+        def retry_callback(context):
+            retry_callback_calls.append(context["ti"].state)
+            raise AirflowFailException("give up")
+
+        def failure_callback(context):
+            failure_callback_calls.append(context["ti"].state)
+
+        class CustomOperator(BaseOperator):
+            execute = staticmethod(_execute_failure)
+
+        task = CustomOperator(
+            task_id="task",
+            on_retry_callback=retry_callback,
+            on_failure_callback=failure_callback,
+        )
+        # ``should_retry=True`` puts the task on the retry path; 
AirflowFailException raised in
+        # on_retry_callback must override that decision.
+        runtime_ti = create_runtime_ti(dag_id="dag", task=task, 
should_retry=True)
+        log = mock.MagicMock()
+        context = runtime_ti.get_template_context()
+        state, msg, error = run(runtime_ti, context, log)
+        finalize(runtime_ti, state, context, log, error, msg=msg)
+
+        assert runtime_ti.state == TaskInstanceState.FAILED
+        # Both callbacks should have run (retry callback first, then failure 
callback after
+        # AirflowFailException promoted the state to FAILED).
+        assert len(retry_callback_calls) == 1
+        assert len(failure_callback_calls) == 1

Review Comment:
   Done in f59182e — `retry_callback_calls == [UP_FOR_RETRY]` and 
`failure_callback_calls == [FAILED]` pin both the count and the state at 
callback time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to