zhenghanyang opened a new issue, #37059:
URL: https://github.com/apache/airflow/issues/37059

   ### Apache Airflow version
   
   2.8.1
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When start more than 20 thousands trigger jobs, the triggerer may report 
following error and shutdown.
   
   ```
   [2024-01-28T21:14:50.981+0800] {triggerer_job_runner.py:481} INFO - 20373 
triggers currently running
   [2024-01-28T21:14:51.451+0800] {triggerer_job_runner.py:576} INFO - 
Triggerer's async thread was blocked for 0.23 seconds, likely by a 
badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on 
overrunning coroutines.
   [2024-01-28T21:14:53.754+0800] {triggerer_job_runner.py:598} INFO - trigger 
test_trigger_job_03/scheduled__2024-01-28T12:10:00+00:00/wait_for_file_zhy685/-1/1
 (ID 20384) starting
   [2024-01-28T21:14:53.754+0800] {triggerer_job_runner.py:598} INFO - trigger 
test_trigger_job_03/scheduled__2024-01-28T12:10:00+00:00/wait_for_file_zhy662/-1/1
 (ID 20385) starting
   
   [2024-01-28T21:14:53.773+0800] {triggerer_job_runner.py:341} ERROR - 
Exception when executing TriggererJobRunner._run_trigger_loop
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 339, in _execute
       self._run_trigger_loop()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 362, in _run_trigger_loop
       self.load_triggers()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 377, in load_triggers
       self.trigger_runner.update_triggers(set(ids))
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 650, in update_triggers
       running_trigger_ids.union(x[0] for x in self.events)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 652, in <genexpr>
       .union(x[0] for x in self.to_create)
   RuntimeError: deque mutated during iteration
   [2024-01-28T21:14:53.773+0800] {triggerer_job_runner.py:344} INFO - Waiting 
for triggers to clean up
   ```
   
   ### What you think should happen instead?
   
   Error message shows that to_create deque has been mutated while iterating.
   The code in error message is in triggerer main thread _run_trigger_loop(), 
main thread iterate to_create queue and add new trigger to queue, that works 
fine. But, there is another thread TriggerRunner, which create trigger task 
from the same queue.
   Also find that the queue iterate is add by
   
https://github.com/apache/airflow/commit/16b8c476518ed76e3689966ec4b0b788be935410#diff-a5a8f5ab18cc034fa7a64181443415a1cd86aca1ce43a3aaa39e73262941118b
   
   
   ### How to reproduce
   
   Just run large numbers of trigger job.
   
   ### Operating System
   
   Redhat 8.9
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   Maybe we can iterate a copy of queue?
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to