ashb commented on code in PR #23690:
URL: https://github.com/apache/airflow/pull/23690#discussion_r874923950


##########
airflow/executors/celery_executor.py:
##########
@@ -343,20 +346,53 @@ def _check_for_stalled_adopted_tasks(self):
             # already finished, then it will be removed from this list -- so
             # the only time it's still in this list is when it a) never made it
             # to celery in the first place (i.e. race condition somewhere in
-            # the dying executor) or b) a really long celery queue and it just
+            # the dying executor), b) celery lost the task before execution
+            # started, or  c) a really long celery queue and it just
             # hasn't started yet -- better cancel it and let the scheduler
             # re-queue rather than have this task risk stalling for ever
             timedout_keys.append(key)
 
-        if timedout_keys:
-            self.log.error(
-                "Adopted tasks were still pending after %s, assuming they 
never made it to celery and "
-                "clearing:\n\t%s",
-                self.task_adoption_timeout,
-                "\n\t".join(repr(x) for x in timedout_keys),
+        if not timedout_keys:
+            return
+
+        self.log.error(
+            "Tasks were still pending after configured timeout (adopted: %s, 
all: %s), "
+            "assuming they never made it to celery and clearing:\n\t%s",
+            self.task_adoption_timeout,
+            self.stalled_task_timeout,
+            "\n\t".join(repr(x) for x in timedout_keys),
+        )
+
+        try:
+            filter_for_tis = TaskInstance.filter_for_tis(timedout_keys)
+            session.query(TaskInstance).filter(
+                filter_for_tis,
+                TaskInstance.state == State.QUEUED,
+                TaskInstance.queued_by_job_id == self.job_id,

Review Comment:
   Question (that I should be able to answer myself as I wrote this feature, 
but I've forgotten): When we adopt a task do we set/overwrite the 
`queued_by_job_id`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to