Asquator commented on PR #53492: URL: https://github.com/apache/airflow/pull/53492#issuecomment-3155838953
> I printed out the updated query and posted it here > > https://gist.github.com/dstandish/a748988614f9f718f6b96b1330c04eec > > It's pretty ugly. I would guess performance is not gonna be good, and suspect there is room for optimization. > > Maybe try writing the query first in sql optimally, then figure out how to implement it in sqlalchemy. > > Among other things, it seems you nest many window functions over and over, but you likely can include most of them in the same select, i would bet. > > Lots of stuff in there that i think needs to be improved upon. Hope that is helpful. The query looks awful because SQLalchemy insists on listing all the existing fields when doing `select(TI)`. If we combine those, the query will be much shorter: ```sql WITH anon_5 AS (SELECT task_instance.dag_id AS dag_id, task_instance.run_id AS run_id, count(:count_1) AS now_running FROM task_instance WHERE task_instance.state IN (__[postcompile_state_4]) GROUP BY task_instance.dag_id, task_instance.run_id ), anon_6 AS (SELECT task_instance.dag_id AS dag_id, task_instance.task_id AS task_id, count(:count_2) AS now_running FROM task_instance WHERE task_instance.state IN (__[postcompile_state_5]) GROUP BY task_instance.dag_id, task_instance.task_id ), anon_7 AS (SELECT task_instance.dag_id AS dag_id, task_instance.run_id AS run_id, task_instance.task_id AS task_id, count(:count_3) AS now_running FROM task_instance WHERE task_instance.state IN (__[postcompile_state_6]) GROUP BY task_instance.dag_id, task_instance.run_id, task_instance.task_id ), anon_8 AS (SELECT task_instance.pool AS pool, count(:count_4) AS now_running FROM task_instance WHERE task_instance.state IN (__[postcompile_state_7]) GROUP BY task_instance.pool ) SELECT task_instance.* dag_run_1.* FROM task_instance JOIN (SELECT task_instance.* sum(task_instance.pool_slots) OVER (PARTITION BY task_instance.pool ORDER BY -task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) AS pool_slots_taken FROM task_instance JOIN (SELECT task_instance.* row_number() OVER (PARTITION BY task_instance.dag_id, task_instance.run_id, task_instance.task_id ORDER BY -task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) AS mapped_tis_per_dagrun_count FROM task_instance JOIN (SELECT task_instance.* row_number() OVER (PARTITION BY task_instance.dag_id, task_instance.task_id ORDER BY -task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) AS tis_per_dag_count FROM task_instance JOIN (SELECT task_instance.* row_number() OVER (PARTITION BY task_instance.dag_id, task_instance.run_id ORDER BY -task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) AS total_tis_per_dagrun_count FROM task_instance JOIN dag_run ON dag_run.dag_id = task_instance.dag_id AND dag_run.run_id = task_instance.run_id JOIN dag ON task_instance.dag_id = dag.dag_id WHERE dag_run.state = :state_2 AND NOT dag.is_paused AND task_instance.state = :state_3 AND dag.bundle_name IS NOT NULL ) AS anon_4 ON task_instance.id = anon_4.id LEFT OUTER JOIN anon_5 ON task_instance.dag_id = anon_5.dag_id AND task_instance.run_id = anon_5.run_id JOIN dag_run ON task_instance.run_id = dag_run.run_id JOIN dag ON task_instance.dag_id = dag.dag_id WHERE coalesce(anon_4.total_tis_per_dagrun_count, 0) + coalesce(anon_5.now_running, 0) <= coalesce(dag.max_active_tasks, :coalesce_1) ) AS anon_3 ON task_instance.id = anon_3.id LEFT OUTER JOIN anon_6 ON task_instance.dag_id = anon_6.dag_id AND task_instance.task_id = anon_6.task_id JOIN dag_run ON task_instance.run_id = dag_run.run_id WHERE coalesce(anon_3.tis_per_dag_count, 0) + coalesce(anon_6.now_running, 0) <= coalesce(task_instance.max_active_tis_per_dag, :coalesce_2) ) AS anon_2 ON task_instance.id = anon_2.id LEFT OUTER JOIN anon_7 ON task_instance.dag_id = anon_7.dag_id AND task_instance.run_id = anon_7.run_id AND task_instance.task_id = anon_7.task_id JOIN dag_run ON task_instance.run_id = dag_run.run_id WHERE coalesce(anon_2.mapped_tis_per_dagrun_count, 0) + coalesce(anon_7.now_running, 0) <= coalesce(task_instance.max_active_tis_per_dagrun, :coalesce_3) ) AS anon_1 ON task_instance.id = anon_1.id LEFT OUTER JOIN anon_8 ON task_instance.pool = anon_8.pool JOIN dag_run ON task_instance.run_id = dag_run.run_id JOIN slot_pool ON task_instance.pool = slot_pool.pool JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id WHERE coalesce(anon_1.pool_slots_taken, 0) + coalesce(anon_8.now_running, 0) <= coalesce(slot_pool.slots, :coalesce_4) LIMIT :param_1``` Regarding placing all windows in the same select - it won't work due to how filtering is done in the `where` clause. To correctly check all limits (an AND predicate), we have to nest them. I already referred to this issue and some performance considerations in https://github.com/apache/airflow/issues/45636. Anyway, I'm starting to work on the performance tests which will show us how slower this query will be. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
