jedcunningham commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1826054815


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1817,17 +1828,59 @@ def _fail_tasks_stuck_in_queued(self, session: Session 
= NEW_SESSION) -> None:
                         )
                         session.add(
                             Log(
-                                event="stuck in queued",
+                                event=RETRY_STUCK_IN_QUEUED_EVENT,
                                 task_instance=ti.key,
                                 extra=(
-                                    "Task will be marked as failed. If the 
task instance has "
+                                    f"Task was stuck in queued and will be 
retried, once it has hit {num_allowed_retries} attempts"

Review Comment:
   ```suggestion
                                       f"Task was stuck in queued and will be 
requeued, once it has hit {num_allowed_retries} attempts"
   ```
   
   It might be good to avoid "retry" here to avoid confusion between 
dag-author-configured-retries and these retries.



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1805,6 +1815,7 @@ def _fail_tasks_stuck_in_queued(self, session: Session = 
NEW_SESSION) -> None:
             )
         ).all()
 
+        num_allowed_retries = conf.getint("core", "num_stuck_retries", 
fallback=2)

Review Comment:
   ```suggestion
           num_allowed_retries = conf.getint("core", "num_stuck_retries")
   ```
   
   You need to add this to 
[config.yml](https://github.com/apache/airflow/blob/main/airflow/config_templates/config.yml)
 (and then won't need a fallback).



##########
uv.lock:
##########


Review Comment:
   I assume you didn't intend to add this :)



##########
providers/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -450,7 +450,6 @@ def cleanup_stuck_queued_tasks(self, tis: 
list[TaskInstance]) -> list[str]:
         for ti in tis:
             readable_tis.append(repr(ti))
             task_instance_key = ti.key
-            self.fail(task_instance_key, None)

Review Comment:
   This is problematic - folks could install the provider with this code onto 
2.8.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to