argibbs commented on issue #34339: URL: https://github.com/apache/airflow/issues/34339#issuecomment-1721176163
Ok, I have found the problem, and fixed it, for some value of fixed. I have Thoughts™️ on the problem, so apologies in advance for what might be quite a long update. I'll do my best to make it readable. ## The fix ### Smoking gun I found the following message in my scheduler logs: `Marked the following 85 task instances stuck in queued as failed.`, and then there followed a list of all the tasks which had been taken um die Ecke and shot. That message comes from [`scheduler_job_runner.py:1574`](https://github.com/apache/airflow/blob/88623acae867c2a9d34f5030809102379080641a/airflow/jobs/scheduler_job_runner.py#L1574). ### Solution If you follow through where the timeout is set, it's the config property [`task_queued_timeout`](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-queued-timeout). I upped this to 7200 (2 hours) in my config, and the problem instantly went away. ## Q&A ### Why now? That's easy: this config was added in 2.6.0; as noted above I jumped from 2.3.3 -> 2.7.0. This is an argument for not falling too far behind else when you do update, there's a lot of changes to work through when debugging. Mea culpa. ### What was going wrong? (I'm changing some numbers here to keep things round and simple) The way our system is configured, we have a celery queue called `data`. There are 20 workers serving this queue. At various times during the day, some data arrives, and several dags all start running at the same time as a result. There are 100 tasks which get dumped onto the `data` queue. These tasks take 6 minutes each to run. So that's (optimistically) 100 tasks * 6 minutes / 20 queue workers = 30 minutes until all tasks are processed. Notably, this means that after 10 minutes, we'll have completed 20 tasks, another 20 will be running, and the remaining 60 tasks will still be queued. 10 minutes is the default value for this new `task_queued_timeout`, so after 10 minutes, the scheduler kills all remaining 60 tasks. (Note by default the scheduler only checks every 2 minutes, but that's immaterial given the huge wait time on the queue - but remember this fact, I'll come back to it below). ### Why did you decide increasing the timeout was the right fix? Ah, I'm glad you asked. The (slightly simplified) lifecycle of a task is typically `NEW` -> `SCHEDULED` -> `QUEUED` -> `RUNNING` -> `SUCCESS`|`FAILED`. The main things that prevent a task getting queued are 1. upstream dependencies, and 2. available pool slots. So, an alternative solution I briefly considered was throwing a pool over these data tasks to prevent too many of them being queued at once. However, I then applied the following reasoning: 1. "I need to get the size of this pool right. Too many, and there's a risk things will be stuck on the queue and get killed. But too few, and I'm going to be under-utilising my pool. 25 seems like a safe number. Then in theory, no more than 5 tasks will be sat on the queue at any time, and those for no more than 6 minutes." 3. "Hmm, but hang on, what about other dags that happen to run at the same time on unrelated data? They'll also be queuing at the same time, and I run the risk of them also being killed." 4. "So, I guess I need to put every task that uses the `data` queue into the same pool? That's a lot of work, and brittle if someone adds more tasks in the future and forgets/doesn't know about this problem." 5. "Oh, and if down the line I get more workers, I'm going to have to remember to increase my pool, else I'll not be taking advantage of my new workers." 6. "Hang on again, what if someone makes a code change and the tasks each start taking 12 minutes? Then if even one task gets queued, it'll be on the queue for 12 minutes and will get killed. I need to set the pool to be no larger than the worker slots. I'm basically guaranteed to be under-utilising the workers now, given airflow scheduler overheads. Not all of my tasks take 6 minutes, and the shorter the tasks, the worse it is" 7. "Seems like I'm just putting a slightly smaller queue in front of my first queue." 8. "Why am I doing this? It's daft. Everything worked fine in 2.3.3 without this setting, let's just effectively turn it off" So, 7200. Does this mean I think this config option is pointless _for our setup_? So far, I have not seen any benefit. **But YMMV**. ### Weren't there timeouts in the old code? Yes, there were. I have checked, and the old setting `[celery].stalled_task_timeout` was its default value of 300. I haven't done a archaeological dive into the code, but the implementation has obviously changed somehow. It's kind of moot - all that really matters is what the _current_ code does. ### Wait, isn't the title of this issue "Tasks being marked as failed even _after_ running successfully"? They're not queued at that point. Another good question. I do not know why, but for whatever reason the scheduler is seeing actively running tasks as queued, and killing them. Maybe this is a classification thing - i.e. "queued" for the purposes of this config means "task-state-is-queued-or-running". Maybe this is an issue with database load, and although the task has started running it hasn't been recorded as such in the db. I dunno. This manifested as tasks being marked as failed despite having run successfully as outlined in this issue description. I also found examples of tasks that _were killed while running_. Their logs contained messages like: ``` [2023-09-15, 08:24:21 BST] {subprocess.py:93} INFO - Starting up and doing some stuff [2023-09-15, 08:25:17 BST] {subprocess.py:93} INFO - I'm still doing stuff [2023-09-15, 08:25:17 BST] {local_task_job_runner.py:294} WARNING - State of this instance has been externally set to up_for_retry. Terminating instance. [2023-09-15, 08:25:17 BST] {process_utils.py:131} INFO - Sending 15 to group 3257. PIDs of all processes in the group: [3258, 3257] [2023-09-15, 08:25:17 BST] {process_utils.py:86} INFO - Sending the signal 15 to group 3257 [2023-09-15, 08:25:17 BST] {taskinstance.py:1630} ERROR - Received SIGTERM. Terminating subprocesses. [2023-09-15, 08:25:17 BST] {subprocess.py:104} INFO - Sending SIGTERM signal to process group [2023-09-15, 08:25:17 BST] {taskinstance.py:1935} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/airflow/operators/bash.py", line 201, in execute result = self.subprocess_hook.run_command( File "/usr/local/lib/python3.10/site-packages/airflow/hooks/subprocess.py", line 91, in run_command for raw_line in iter(self.sub_process.stdout.readline, b""): File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1632, in signal_handler raise AirflowException("Task received SIGTERM signal") airflow.exceptions.AirflowException: Task received SIGTERM signal [2023-09-15, 08:25:17 BST] {taskinstance.py:1398} INFO - Marking task as UP_FOR_RETRY. dag_id=STOXX600_Index_1.ANC_110.AEE_110.QTR1.Realtime, task_id=Measures.PRR-consensus-and-events-signals, execution_date=20230914T130500, start_date=20230915T072420, end_date=20230915T072517 [2023-09-15, 08:25:17 BST] {standard_task_runner.py:104} ERROR - Failed to execute job 36648131 for task Measures.PRR-consensus-and-events-signals (Task received SIGTERM signal; 3257) [2023-09-15, 08:25:17 BST] {process_utils.py:79} INFO - Process psutil.Process(pid=3257, status='terminated', exitcode=1, started='07:24:20') (3257) terminated with exit code 1 [2023-09-15, 08:25:17 BST] {process_utils.py:79} INFO - Process psutil.Process(pid=3258, status='terminated', started='07:24:21') (3258) terminated with exit code None ``` What I can tell you is that empirically, every task I found that a) was killed while running, or b) was killed after completing successfully, was killed _less than 2 minutes after it started_. Remember I mentioned above that the queue timeout checks once every 2 minutes? It's almost as if the code only considers the state of the tasks as they were before it started its 2 minute wait. I haven't dug into the logic however, as this is moot: even if it wasn't whacking the tasks that were running / complete, I have enough tasks-that-are-really-actually-queued that it would still be killing them, and my solution would be the same, The only material impact is that killing running / completed tasks as well was a massive misdirect and sent me after quite a few red herrings. ## Potential improvements There are several things that have come out of this process that I feel are worth doing something about. However, I thought I'd list them here first, rather than run off and raise them as separate issues. If it's clear that I'm in a minority of 1 on their worth, we can just close this issue and everyone is happy. 1. Display warnings about deprecated configs in the GUI somewhere. (Maybe the new cluster activity page?). This would have alerted me to the fact that the old celery timeout config had changed. Maybe it wouldn't have got me to the solution any faster, but it couldn't have hurt. 2. Likewise, display warnings in the GUI if the dag processor is experiencing timeouts. Yes, as it turned out this was unrelated to my actual problem, but it's a pretty critical component; if you are getting timeouts, system performance can degrade in subtle ways, e.g. task schedule latency. Hat tip to @Taragolis - the new secrets cache in 2.7.0 that he linked to is a game changer here. 3. I like metrics - maybe add a statsd counter to capture the total number of queued tasks that have been killed? Obviously someone has to monitor that metric, but they can't do that if it's not there... 4. Make it clear in the GUI when tasks have timed out. a. Display warnings somewhere (maybe that cluster activity page again?) b. Differentiate tasks that have been killed due to task timeout, e.g. a different state such as `queue_timed_out` rather than plain old `errored`?) c. Maybe a different message in the task logs? 5. And of course, ideally once a task is actually _running or finished_, it won't get whacked by a queue timeout. ## Fin I think that's everything. Well done if you made it this far. Thanks as always to everyone who contributed in the thread. I plan on leaving this open for a bit in case there's more discussion on any of the above, but if one of the maintainers wants to close it unilaterally, I'm cool with that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
