a-meledin opened a new issue, #35267:
URL: https://github.com/apache/airflow/issues/35267

   ### Apache Airflow version
   
   2.7.2
   
   ### What happened
   
   We have a Dag with 116 tasks each having some number of MappedTasks. Overall 
number of tasks instances is ~6000.  Problem is that we encounter very slow 
execution. Investigation and testing with the code below has shown that we have 
~110 open connections with SQL: SELECT dag_run.state AS dag_run_state, ... FROM 
dag_run WHERE dag_run.dag_id = ... AND dag_run.run_id = ... FOR UPDATE .
   
   In logs it's seen as delay caused by taskinstance.py:
   
![image](https://github.com/apache/airflow/assets/98175818/07316457-8f48-4f82-aac9-9c02a4831bd6)
   
   `
   import logging
   from datetime import datetime
   import pandas as pd, numpy as np
   from airflow.decorators import dag, task, task_group
   from airflow.operators.python import get_current_context
   from airflow.models.param import Param
   
   @dag(start_date=datetime(2023, 10, 29), schedule=None,max_active_runs=1, 
max_active_tasks=16,tags=["TESTS", "MAPPED TASKS"], description="",params = 
{"MappedNumber": Param(50,type="integer",title="Number of dynamically created 
Mapped tasks",},
   def mapped_tests():
   
       @task(trigger_rule="none_failed")
       def mapped_instantiator():
           context = get_current_context()
           MappedNumber= context["params"]["MappedNumber"]
           return [x for x in range(MappedNumber)]
   
       @task(trigger_rule="none_failed")
       def mapped_instance(num):
           print(f"!!!!!!!!!!!!!!!!!!!!!!!! I'M NUMBER: {num} 
!!!!!!!!!!!!!!!!!!!!!!!")
           return num
       
       @task
       def join_all():
           num = 1
           print(f"MY NUM: {num}")
           
       first = mapped_instantiator.override(task_id="first")()    
       join_all = join_all()
   
       mapped_one = []
       cascade_groups = []    
       for x in range(8):
           
mapped_one.append(mapped_instance.override(task_id=f"first_level_{x}").expand(num=first))
    
   
           # do cascading
           @task_group(group_id = f"cascade_group_{x}")
           def cascade_group():
               for y in range(3):
                   
mapped_instance.override(task_id=f"cascade_{x}_{y}").expand(num=mapped_one[x])  
  
           cascade_groups.append(cascade_group())
           mapped_one[x] >> cascade_groups[x]
           
           cascade_groups[x] >> join_all
       
   dag_id = "mapped_tests"
   globals()[dag_id] = mapped_tests()
   `
   
   
   ### What you think should happen instead
   
   Invetigation required on taskinstance.py code.
   
   ### How to reproduce
   
   See code above. With pgbouncer allow 150 connections. Then check query stat 
by:
   `select query,wait_event_type,wait_event,count(*), min(now() - 
backend_start), max(now() - backend_start) from pg_stat_activity WHERE usename 
= 'airflow_user'::name GROUP BY 1,2,3 ORDER BY 4 DESC;`
   
   
   ### Operating System
   
   Ubuntu on Linux and under WSL2
   
   ### Versions of Apache Airflow Providers
   
   2.7.2
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Tested on Docker, Python 3.8-310, under Celery and LocalExecutor. 
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to