This is an automated email from the ASF dual-hosted git repository.

pankajkoti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c7e13061bd Send explicit task logs  when marking tasks stuck in queued 
as failed (#35857)
c7e13061bd is described below

commit c7e13061bd472962ed571cb0d33784900217660a
Author: Pankaj Koti <pankajkoti...@gmail.com>
AuthorDate: Sun Nov 26 14:49:51 2023 +0530

    Send explicit task logs  when marking tasks stuck in queued as failed 
(#35857)
    
    Using the feature built in #32646, when the scheduler marks tasks
    stuck in queued as failed, send such an explicit log indicating
    the action to the task logs so that it helps users identify why
    exactly the task was marked failed in such a case.
    
    ---------
    
    Co-authored-by: Ephraim Anierobi <splendidzig...@gmail.com>
---
 airflow/jobs/scheduler_job_runner.py | 21 ++++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index b9d18098f6..fdf5f12edc 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1577,15 +1577,18 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
         ).all()
         try:
-            tis_for_warning_message = 
self.job.executor.cleanup_stuck_queued_tasks(tis=tasks_stuck_in_queued)
-            if tis_for_warning_message:
-                task_instance_str = "\n\t".join(tis_for_warning_message)
-                self.log.warning(
-                    "Marked the following %s task instances stuck in queued as 
failed. "
-                    "If the task instance has available retries, it will be 
retried.\n\t%s",
-                    len(tasks_stuck_in_queued),
-                    task_instance_str,
-                )
+            cleaned_up_task_instances = 
self.job.executor.cleanup_stuck_queued_tasks(
+                tis=tasks_stuck_in_queued
+            )
+            cleaned_up_task_instances = set(cleaned_up_task_instances)
+            for ti in tasks_stuck_in_queued:
+                if repr(ti) in cleaned_up_task_instances:
+                    self._task_context_logger.warning(
+                        "Marking task instance %s stuck in queued as failed. "
+                        "If the task instance has available retries, it will 
be retried.",
+                        ti,
+                        ti=ti,
+                    )
         except NotImplementedError:
             self.log.debug("Executor doesn't support cleanup of stuck queued 
tasks. Skipping.")
             ...

Reply via email to