argibbs commented on issue #34339:
URL: https://github.com/apache/airflow/issues/34339#issuecomment-1721176163

   Ok, I have found the problem, and fixed it, for some value of fixed. I have 
Thoughts™️ on the problem, so apologies in advance for what might be quite a 
long update. I'll do my best to make it readable.
   
   ## The fix
   ### Smoking gun
   I found the following message in my scheduler logs: `Marked the following 85 
task instances stuck in queued as failed.`, and then there followed a list of 
all the tasks which had been taken um die Ecke and shot.
   
   That message comes from 
[`scheduler_job_runner.py:1574`](https://github.com/apache/airflow/blob/88623acae867c2a9d34f5030809102379080641a/airflow/jobs/scheduler_job_runner.py#L1574).
   ### Solution
   If you follow through where the timeout is set, it's the config property 
[`task_queued_timeout`](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-queued-timeout).
   
   I upped this to 7200 (2 hours) in my config, and the problem instantly went 
away.
   
   ## Q&A
   ### Why now?
   That's easy: this config was added in 2.6.0; as noted above I jumped from 
2.3.3 -> 2.7.0. This is an argument for not falling too far behind else when 
you do update, there's a lot of changes to work through when debugging. Mea 
culpa.
   ### What was going wrong?
   (I'm changing some numbers here to keep things round and simple) 
   
   The way our system is configured, we have a celery queue called `data`. 
There are 20 workers serving this queue. At various times during the day, some 
data arrives, and several dags all start running at the same time as a result. 
There are 100 tasks which get dumped onto the `data` queue. These tasks take 6 
minutes each to run. So that's (optimistically) 100 tasks * 6 minutes / 20 
queue workers = 30 minutes until all tasks are processed. 
   
   Notably, this means that after 10 minutes, we'll have completed 20 tasks, 
another 20 will be running, and the remaining 60 tasks will still be queued. 10 
minutes is the default value for this new `task_queued_timeout`, so after 10 
minutes, the scheduler kills all remaining 60 tasks. (Note by default the 
scheduler only checks every 2 minutes, but that's immaterial given the huge 
wait time on the queue - but remember this fact, I'll come back to it below).
   ### Why did you decide increasing the timeout was the right fix?
   Ah, I'm glad you asked.
   
   The (slightly simplified) lifecycle of a task is typically `NEW` -> 
`SCHEDULED` -> `QUEUED` -> `RUNNING` -> `SUCCESS`|`FAILED`. The main things 
that prevent a task getting queued are 1. upstream dependencies, and 2. 
available pool slots. So, an alternative solution I briefly considered was 
throwing a pool over these data tasks to prevent too many of them being queued 
at once.  However, I then applied the following reasoning:
   1. "I need to get the size of this pool right. Too many, and there's a risk 
things will be stuck on the queue and get killed. But too few, and I'm going to 
be under-utilising my pool. 25 seems like a safe number. Then in theory, no 
more than 5 tasks will be sat on the queue at any time, and those for no more 
than 6 minutes."
   3. "Hmm, but hang on, what about other dags that happen to run at the same 
time on unrelated data? They'll also be queuing at the same time, and I run the 
risk of them also being killed."
   4. "So, I guess I need to put every task that uses the `data` queue into the 
same pool? That's a lot of work, and brittle if someone adds more tasks in the 
future and forgets/doesn't know about this problem."
   5. "Oh, and if down the line I get more workers, I'm going to have to 
remember to increase my pool, else I'll not be taking advantage of my new 
workers."
   6. "Hang on again, what if someone makes a code change and the tasks each 
start taking 12 minutes? Then if even one task gets queued, it'll be on the 
queue for 12 minutes and will get killed. I need to set the pool to be no 
larger than the worker slots. I'm basically guaranteed to be under-utilising 
the workers now, given airflow scheduler overheads. Not all of my tasks take 6 
minutes, and the shorter the tasks, the worse it is"
   7. "Seems like I'm just putting a slightly smaller queue in front of my 
first queue."
   8. "Why am I doing this? It's daft. Everything worked fine in 2.3.3 without 
this setting, let's just effectively turn it off"
   
   So, 7200. Does this mean I think this config option is pointless _for our 
setup_? So far, I have not seen any benefit. **But YMMV**.
   ### Weren't there timeouts in the old code?
   Yes, there were. I have checked, and the old setting 
`[celery].stalled_task_timeout` was its default value of 300. I haven't done a 
archaeological dive into the code, but the implementation has obviously changed 
somehow. It's kind of moot - all that really matters is what the _current_ code 
does.
   ### Wait, isn't the title of this issue "Tasks being marked as failed even 
_after_ running successfully"? They're not queued at that point.
   Another good question. I do not know why, but for whatever reason the 
scheduler is seeing actively running tasks as queued, and killing them. Maybe 
this is a classification thing - i.e. "queued" for the purposes of this config 
means "task-state-is-queued-or-running". Maybe this is an issue with database 
load, and although the task has started running it hasn't been recorded as such 
in the db. I dunno.
   
   This manifested as tasks being marked as failed despite having run 
successfully as outlined in this issue description. I also found examples of 
tasks that _were killed while running_. Their logs contained messages like:
   ```
   [2023-09-15, 08:24:21 BST] {subprocess.py:93} INFO - Starting up and doing 
some stuff
   [2023-09-15, 08:25:17 BST] {subprocess.py:93} INFO - I'm still doing stuff
   [2023-09-15, 08:25:17 BST] {local_task_job_runner.py:294} WARNING - State of 
this instance has been externally set to up_for_retry. Terminating instance.
   [2023-09-15, 08:25:17 BST] {process_utils.py:131} INFO - Sending 15 to group 
3257. PIDs of all processes in the group: [3258, 3257]
   [2023-09-15, 08:25:17 BST] {process_utils.py:86} INFO - Sending the signal 
15 to group 3257
   [2023-09-15, 08:25:17 BST] {taskinstance.py:1630} ERROR - Received SIGTERM. 
Terminating subprocesses.
   [2023-09-15, 08:25:17 BST] {subprocess.py:104} INFO - Sending SIGTERM signal 
to process group
   [2023-09-15, 08:25:17 BST] {taskinstance.py:1935} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File "/usr/local/lib/python3.10/site-packages/airflow/operators/bash.py", 
line 201, in execute
       result = self.subprocess_hook.run_command(
     File 
"/usr/local/lib/python3.10/site-packages/airflow/hooks/subprocess.py", line 91, 
in run_command
       for raw_line in iter(self.sub_process.stdout.readline, b""):
     File 
"/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 
1632, in signal_handler
       raise AirflowException("Task received SIGTERM signal")
   airflow.exceptions.AirflowException: Task received SIGTERM signal
   [2023-09-15, 08:25:17 BST] {taskinstance.py:1398} INFO - Marking task as 
UP_FOR_RETRY. dag_id=STOXX600_Index_1.ANC_110.AEE_110.QTR1.Realtime, 
task_id=Measures.PRR-consensus-and-events-signals, 
execution_date=20230914T130500, start_date=20230915T072420, 
end_date=20230915T072517
   [2023-09-15, 08:25:17 BST] {standard_task_runner.py:104} ERROR - Failed to 
execute job 36648131 for task Measures.PRR-consensus-and-events-signals (Task 
received SIGTERM signal; 3257)
   [2023-09-15, 08:25:17 BST] {process_utils.py:79} INFO - Process 
psutil.Process(pid=3257, status='terminated', exitcode=1, started='07:24:20') 
(3257) terminated with exit code 1
   [2023-09-15, 08:25:17 BST] {process_utils.py:79} INFO - Process 
psutil.Process(pid=3258, status='terminated', started='07:24:21') (3258) 
terminated with exit code None
   ```
   
   What I can tell you is that empirically, every task I found that a) was 
killed while running, or b) was killed after completing successfully, was 
killed _less than 2 minutes after it started_. Remember I mentioned above that 
the queue timeout checks once every 2 minutes? It's almost as if the code only 
considers the state of the tasks as they were before it started its 2 minute 
wait.
   
   I haven't dug into the logic however, as this is moot: even if it wasn't 
whacking the tasks that were running / complete, I have enough 
tasks-that-are-really-actually-queued that it would still be killing them, and 
my solution would be the same, The only material impact is that killing running 
/ completed tasks as well was a massive misdirect and sent me after quite a few 
red herrings.
   
   ## Potential improvements
   There are several things that have come out of this process that I feel are 
worth doing something about. However, I thought I'd list them here first, 
rather than run off and raise them as separate issues. If it's clear that I'm 
in a minority of 1 on their worth, we can just close this issue and everyone is 
happy.
   
   1. Display warnings about deprecated configs in the GUI somewhere. (Maybe 
the new cluster activity page?). This would have alerted me to the fact that 
the old celery timeout config had changed. Maybe it wouldn't have got me to the 
solution any faster, but it couldn't have hurt.
   2. Likewise, display warnings in the GUI if the dag processor is 
experiencing timeouts. Yes, as it turned out this was unrelated to my actual 
problem, but it's a pretty critical component; if you are getting timeouts, 
system performance can degrade in subtle ways, e.g. task schedule latency. Hat 
tip to @Taragolis - the new secrets cache in 2.7.0 that he linked to is a game 
changer here.
   3. I like metrics - maybe add a statsd counter to capture the total number 
of queued tasks that have been killed? Obviously someone has to monitor that 
metric, but they can't do that if it's not there...
   4. Make it clear in the GUI when tasks have timed out.
       a. Display warnings somewhere (maybe that cluster activity page again?)
       b. Differentiate tasks that have been killed due to task timeout, e.g. a 
different state such as `queue_timed_out` rather than plain old `errored`?)
       c. Maybe a different message in the task logs?
   5. And of course, ideally once a task is actually _running or finished_, it 
won't get whacked by a queue timeout.
   
   ## Fin
   I think that's everything. Well done if you made it this far. Thanks as 
always to everyone who contributed in the thread. I plan on leaving this open 
for a bit in case there's more discussion on any of the above, but if one of 
the maintainers wants to close it unilaterally, I'm cool with that.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to