jedcunningham commented on code in PR #30375: URL: https://github.com/apache/airflow/pull/30375#discussion_r1165739839
########## airflow/executors/celery_executor.py: ########## @@ -543,22 +417,28 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task return not_adopted_tis - def _set_celery_pending_task_timeout( - self, key: TaskInstanceKey, timeout_type: _CeleryPendingTaskTimeoutType | None - ) -> None: + def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: """ - Set pending task timeout. + Handle remnants of tasks that were failed because they were stuck in queued. + Tasks can get stuck in queued. If such a task is detected, it will be marked + as `UP_FOR_RETRY` if the task instance has remaining retries or marked as `FAILED` + if it doesn't. - We use the fact that dicts maintain insertion order, and the the timeout for a - task is always "now + delta" to maintain the property that oldest item = first to - time out. + :param tis: List of Task Instances to clean up + :return: List of readable task instances for a warning message """ - self.adopted_task_timeouts.pop(key, None) - self.stalled_task_timeouts.pop(key, None) - if timeout_type == _CeleryPendingTaskTimeoutType.ADOPTED and self.task_adoption_timeout: - self.adopted_task_timeouts[key] = utcnow() + self.task_adoption_timeout - elif timeout_type == _CeleryPendingTaskTimeoutType.STALLED and self.stalled_task_timeout: - self.stalled_task_timeouts[key] = utcnow() + self.stalled_task_timeout + readable_tis = [] + for ti in tis: + readable_tis.append(repr(ti)) + task_instance_key = ti.key Review Comment: Okay, so I had a chat with @ephraimbuddy and I think we should give this a try instead. Let's remove it from running, and add the failure to the event_buffer explicitly. In theory, then we don't have to set the task as failed here. This is essentially what KE is doing indirectly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org