This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch backport-6da77b1-v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit eb446bb1cbb58b0c3ebfedb8d7e03ef6f18d5320 Author: Karen Braganza <[email protected]> AuthorDate: Tue Aug 12 06:00:35 2025 -0400 [v3-0-test] Allow failure callbacks for stuck in queued TIs that fail (#53435) In issue #51301, it was reported that failure callbacks do not run for task instances that get stuck in queued and fail in Airflow 2.10.5. This is happening due to the changes introduced in PR #43520 . In this PR, logic was introduced to requeue tasks that get stuck in queued (up to two times by default) before failing them. Previously, the executor's fail method was called when the task needed to be failed after max requeue attempts. This was replaced by the task instance's set_state method in the PR ti.set_state(TaskInstanceState.FAILED, session=session). Without the executor's fail method being called, failure callbacks will not be executed for such task instances. Therefore, I changed the code to call the executor's fail method instead in Airflow 3. (cherry picked from commit 6da77b1fdfc0b51762b47638489e752384911758) Co-authored-by: Karen Braganza <[email protected]> --- .../src/airflow/jobs/scheduler_job_runner.py | 38 ++++++++++++++++++++-- airflow-core/tests/unit/jobs/test_scheduler_job.py | 18 +++++++--- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index c46f99d703d..4564b83912d 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1997,6 +1997,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): self._maybe_requeue_stuck_ti( ti=ti, session=session, + executor=executor, ) session.commit() except NotImplementedError: @@ -2012,7 +2013,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): ) ) - def _maybe_requeue_stuck_ti(self, *, ti, session): + def _maybe_requeue_stuck_ti(self, *, ti, session, executor): """ Requeue task if it has not been attempted too many times. @@ -2037,14 +2038,45 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): "Task requeue attempts exceeded max; marking failed. task_instance=%s", ti, ) + msg = f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed." session.add( Log( event="stuck in queued tries exceeded", task_instance=ti.key, - extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.", + extra=msg, ) ) - ti.set_state(TaskInstanceState.FAILED, session=session) + + try: + dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=ti.dag_run, session=session) + task = dag.get_task(ti.task_id) + except Exception: + self.log.warning( + "The DAG or task could not be found. If a failure callback exists, it will not be run.", + exc_info=True, + ) + else: + if task.on_failure_callback: + if inspect(ti).detached: + ti = session.merge(ti) + request = TaskCallbackRequest( + filepath=ti.dag_model.relative_fileloc, + bundle_name=ti.dag_version.bundle_name, + bundle_version=ti.dag_version.bundle_version, + ti=ti, + msg=msg, + context_from_server=TIRunContext( + dag_run=ti.dag_run, + max_tries=ti.max_tries, + variables=[], + connections=[], + xcom_keys_to_clear=[], + ), + ) + executor.send_callback(request) + finally: + ti.set_state(TaskInstanceState.FAILED, session=session) + executor.fail(ti.key) def _reschedule_stuck_task(self, ti: TaskInstance, session: Session): session.execute( diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index c451f9d0a91..058504880af 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -2003,11 +2003,15 @@ class TestSchedulerJob: # Second executor called for ti3 mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3]) + @staticmethod + def mock_failure_callback(context): + pass + @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"}) def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker, session, mock_executors): """Verify that tasks stuck in queued will be rescheduled up to N times.""" with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"): - EmptyOperator(task_id="op1") + EmptyOperator(task_id="op1", on_failure_callback=TestSchedulerJob.mock_failure_callback) EmptyOperator(task_id="op2", executor="default_exec") def _queue_tasks(tis): @@ -2073,16 +2077,19 @@ class TestSchedulerJob: "stuck in queued tries exceeded", ] - mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method + mock_executors[ + 0 + ].send_callback.assert_called_once() # this should only be called for the task that has a callback states = [x.state for x in dr.get_task_instances(session=session)] assert states == ["failed", "failed"] + mock_executors[0].fail.assert_called() @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"}) def test_handle_stuck_queued_tasks_reschedule_sensors(self, dag_maker, session, mock_executors): """Reschedule sensors go in and out of running repeatedly using the same try_number Make sure that they get three attempts per reschedule, not 3 attempts per try_number""" with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"): - EmptyOperator(task_id="op1") + EmptyOperator(task_id="op1", on_failure_callback=TestSchedulerJob.mock_failure_callback) EmptyOperator(task_id="op2", executor="default_exec") def _queue_tasks(tis): @@ -2172,9 +2179,12 @@ class TestSchedulerJob: "stuck in queued tries exceeded", ] - mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method + mock_executors[ + 0 + ].send_callback.assert_called_once() # this should only be called for the task that has a callback states = [x.state for x in dr.get_task_instances(session=session)] assert states == ["failed", "failed"] + mock_executors[0].fail.assert_called() def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog): """Test that if executor no implement revoke_task then we don't blow up."""
