gdevanla opened a new issue #8691: URL: https://github.com/apache/airflow/issues/8691
**Apache Airflow version**: 1.10.3 **Kubernetes version (if you are using kubernetes)** (use `kubectl version`): **Environment**: Python 3.7.4 - **Cloud provider or hardware configuration**: - **OS** (e.g. from /etc/os-release): Ubuntu xenial/bionic - **Kernel** (e.g. `uname -a`): Linux 4.15.0-45-generic #48~16.04.1-Ubuntu SMP Tue Jan 29 18:03:48 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux - **Install tools**: - **Others**: **What happened**: The task_instance gets stuck in `scheduled` state because of inconsistency in expectations of how queued_tasks (that have failed to be queued successfully in CeleryExecutor) are to be handled. Given a TaskInstance, `TI`, whose state is `None`, the following psuedo code is executed inside the 'scheduler_loop'. In this process, the `TI`, in some situations gets stuck in `scheduled` state (The indentations below depicts the call-stack) ``` Given, a task_instance `TI`, in `state == None`, execute_helper (scheduler loop) (first iteration of the scheduler loop) - calls `_execute_task_instances` - calls `_find_executable_task_instances()` that returns `TI` that has state == `None` - calls `_change_state_for_executable_task_instances` that updates `TI`s state = `queued` - calls `_enqueue_task_instances_with_queued_state`. This function adds `TI` to `Executor.queued_tasks` dictionary. - calls `CeleryExecutor.heartbeat` Tries to `send_task to worker`. If this succeeds, the `TI` is popped from `CeleryExecutor.queued_tasks`. But in our scenario, `CeleryExecutor`, just leaves the entry in`queued_tasks` intact beause either `Exception` was raised or `result` was `None`. The `CeleryExecutor` assumes the scheduler will handle this scenario. This is where the problem starts.(see second iteration below) (The link to this code is provided below) - calls `_change_state_for_tasks_failed_to_execute`. This function notices that the `TI` entry in `CeleryExecutor.queued_tasks`, and assumes something went wrong and therefore correctly updates status of `TI` back to `scheduled`. Note, that the entry of `TI` still is in the `queued_tasks` and that causes the current issue (see second iteration below) - other maintenance activities happen in the scheduler loop (not relevant to this issue) (second iteration of the scheduler loop) - calls `_execute_task_instances` - calls `_find_executable_task_instances()`. Now, this function is supposed to return `TI` since it is in `scheduled` state. But, it finds that an entry for `TI` already exists in `CeleryExecutor.queued_tasks` and therefore does not return `TI` (refer to link provided below which point to this case). This means `TI` will never be `queued` and is stuck in `scheduled` state. (https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/jobs/scheduler_job.py#L1033) ``` The only workaround for this currently, is to restart the scheduler. When the scheduler is restarted, the `CeleryExecutor.queued_tasks` is reset and therefore the `TI` instance is `queued` again. The code where `queue_tasks` entry is updated by poping the TI is here: https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/executors/celery_executor.py#L223 The code due to which `TI` gets stuck in `scheduled` state is here: https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/jobs/scheduler_job.py#L1033 I think the code here should only check if `CeleryExecutor.running' dictionary has `TI` in its entries. But, I am not sure how it affects other schedulers. **What you expected to happen**: The `_find_executable_task_instances()` function, should only check if `CeleryExecutor.running` contains an entry for `TI` and return `TI` as part of its list of tasks to be queued. **How to reproduce it**: It can be reproduced by forcing the `result` value in `CeleryExecutor.heartbeat` to return an `ExceptionTraceback' object or `None`. (Note: Links point to `master` branch. But, the problem applies to 1.10.3 and higher versions) **Anything else we need to know**: I am not able to see a scenario where in `CeleryExecutor.heartbeart` the `result` is `None`. Since, looking at the `Celery.app` module, it feels like the `result` can never be done. But, I suspect there are scenario's where the `result` is None and therefore the `CeleryExecutor` does not pop the `TI` from the queue. I am not able to prove this concretely. This also happens with later version's of Airflow. In the later version's of airflow, the `CeleryExecutor.trigger_dags' functions is performing the same set of operations. The code has been moved around between different versions but the logic remains the same and the problem exists in later versions as well. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org