jedcunningham commented on code in PR #30375:
URL: https://github.com/apache/airflow/pull/30375#discussion_r1165739839


##########
airflow/executors/celery_executor.py:
##########
@@ -543,22 +417,28 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
 
         return not_adopted_tis
 
-    def _set_celery_pending_task_timeout(
-        self, key: TaskInstanceKey, timeout_type: 
_CeleryPendingTaskTimeoutType | None
-    ) -> None:
+    def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
         """
-        Set pending task timeout.
+        Handle remnants of tasks that were failed because they were stuck in 
queued.
+        Tasks can get stuck in queued. If such a task is detected, it will be 
marked
+        as `UP_FOR_RETRY` if the task instance has remaining retries or marked 
as `FAILED`
+        if it doesn't.
 
-        We use the fact that dicts maintain insertion order, and the the 
timeout for a
-        task is always "now + delta" to maintain the property that oldest item 
= first to
-        time out.
+        :param tis: List of Task Instances to clean up
+        :return: List of readable task instances for a warning message
         """
-        self.adopted_task_timeouts.pop(key, None)
-        self.stalled_task_timeouts.pop(key, None)
-        if timeout_type == _CeleryPendingTaskTimeoutType.ADOPTED and 
self.task_adoption_timeout:
-            self.adopted_task_timeouts[key] = utcnow() + 
self.task_adoption_timeout
-        elif timeout_type == _CeleryPendingTaskTimeoutType.STALLED and 
self.stalled_task_timeout:
-            self.stalled_task_timeouts[key] = utcnow() + 
self.stalled_task_timeout
+        readable_tis = []
+        for ti in tis:
+            readable_tis.append(repr(ti))
+            task_instance_key = ti.key

Review Comment:
   Okay, so I had a chat with @ephraimbuddy and I think we should give this a 
try instead. Let's remove it from running, and add the failure to the 
event_buffer explicitly. In theory, then we don't have to set the task as 
failed here.
   
   This is essentially what KE is doing indirectly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to