a-meledin opened a new issue, #35267: URL: https://github.com/apache/airflow/issues/35267
### Apache Airflow version 2.7.2 ### What happened We have a Dag with 116 tasks each having some number of MappedTasks. Overall number of tasks instances is ~6000. Problem is that we encounter very slow execution. Investigation and testing with the code below has shown that we have ~110 open connections with SQL: SELECT dag_run.state AS dag_run_state, ... FROM dag_run WHERE dag_run.dag_id = ... AND dag_run.run_id = ... FOR UPDATE . In logs it's seen as delay caused by taskinstance.py: ![image](https://github.com/apache/airflow/assets/98175818/07316457-8f48-4f82-aac9-9c02a4831bd6) ` import logging from datetime import datetime import pandas as pd, numpy as np from airflow.decorators import dag, task, task_group from airflow.operators.python import get_current_context from airflow.models.param import Param @dag(start_date=datetime(2023, 10, 29), schedule=None,max_active_runs=1, max_active_tasks=16,tags=["TESTS", "MAPPED TASKS"], description="",params = {"MappedNumber": Param(50,type="integer",title="Number of dynamically created Mapped tasks",}, def mapped_tests(): @task(trigger_rule="none_failed") def mapped_instantiator(): context = get_current_context() MappedNumber= context["params"]["MappedNumber"] return [x for x in range(MappedNumber)] @task(trigger_rule="none_failed") def mapped_instance(num): print(f"!!!!!!!!!!!!!!!!!!!!!!!! I'M NUMBER: {num} !!!!!!!!!!!!!!!!!!!!!!!") return num @task def join_all(): num = 1 print(f"MY NUM: {num}") first = mapped_instantiator.override(task_id="first")() join_all = join_all() mapped_one = [] cascade_groups = [] for x in range(8): mapped_one.append(mapped_instance.override(task_id=f"first_level_{x}").expand(num=first)) # do cascading @task_group(group_id = f"cascade_group_{x}") def cascade_group(): for y in range(3): mapped_instance.override(task_id=f"cascade_{x}_{y}").expand(num=mapped_one[x]) cascade_groups.append(cascade_group()) mapped_one[x] >> cascade_groups[x] cascade_groups[x] >> join_all dag_id = "mapped_tests" globals()[dag_id] = mapped_tests() ` ### What you think should happen instead Invetigation required on taskinstance.py code. ### How to reproduce See code above. With pgbouncer allow 150 connections. Then check query stat by: `select query,wait_event_type,wait_event,count(*), min(now() - backend_start), max(now() - backend_start) from pg_stat_activity WHERE usename = 'airflow_user'::name GROUP BY 1,2,3 ORDER BY 4 DESC;` ### Operating System Ubuntu on Linux and under WSL2 ### Versions of Apache Airflow Providers 2.7.2 ### Deployment Docker-Compose ### Deployment details Tested on Docker, Python 3.8-310, under Celery and LocalExecutor. ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org