Asquator commented on PR #53492:
URL: https://github.com/apache/airflow/pull/53492#issuecomment-3165220892

   Optimized the query:
   - No more nested windows, all of them are computed in a flat manner
   - No more passing task instance.* fields across subqueries
   - Fixed flawed joins
   
   The runnable query version for max_tis=128:
   
   ```sql
   WITH dag_run_active_tasks AS (
       SELECT 
           task_instance.dag_id AS dag_id, 
           task_instance.run_id AS run_id, 
           count(*) AS now_running
       FROM task_instance
       WHERE task_instance.state IN ('queued', 'running')
       GROUP BY task_instance.dag_id, task_instance.run_id
   ), 
   active_tis_across_dag_runs AS (
       SELECT 
           task_instance.dag_id AS dag_id, 
           task_instance.task_id AS task_id, 
           count(*) AS now_running
       FROM task_instance
       WHERE task_instance.state IN ('queued', 'running')
       GROUP BY task_instance.dag_id, task_instance.task_id
   ), 
   active_tis_in_one_task AS (
       SELECT 
           task_instance.dag_id AS dag_id, 
           task_instance.run_id AS run_id, 
           task_instance.task_id AS task_id, 
           count(*) AS now_running
       FROM task_instance
       WHERE task_instance.state IN ('queued', 'running')
       GROUP BY task_instance.dag_id, task_instance.run_id, 
task_instance.task_id
   ), 
   pool_active_tasks AS (
       SELECT 
           task_instance.pool AS pool, 
           count(*) AS now_running
       FROM task_instance
       WHERE task_instance.state IN ('queued', 'running', 'deferred')
       GROUP BY task_instance.pool
   )
   SELECT 
       task_instance.*,
       dag_run_1.*
   FROM 
       task_instance
       JOIN (
           SELECT 
               task_instance.id AS id, 
               row_number() OVER (
                   PARTITION BY task_instance.dag_id, task_instance.run_id 
                   ORDER BY -task_instance.priority_weight, 
dag_run.logical_date, task_instance.map_index
               ) AS total_tis_per_dagrun_count, 
               row_number() OVER (
                   PARTITION BY task_instance.dag_id, task_instance.task_id 
                   ORDER BY -task_instance.priority_weight, 
dag_run.logical_date, task_instance.map_index
               ) AS tis_per_dag_count, 
               row_number() OVER (
                   PARTITION BY task_instance.dag_id, task_instance.run_id, 
task_instance.task_id 
                   ORDER BY -task_instance.priority_weight, 
dag_run.logical_date, task_instance.map_index
               ) AS mapped_tis_per_dagrun_count, 
               sum(task_instance.pool_slots) OVER (
                   PARTITION BY task_instance.pool 
                   ORDER BY -task_instance.priority_weight, 
dag_run.logical_date, task_instance.map_index
               ) AS pool_slots_taken_sum 
           FROM 
               task_instance
               JOIN dag_run ON dag_run.dag_id = task_instance.dag_id AND 
dag_run.run_id = task_instance.run_id 
               JOIN dag ON task_instance.dag_id = dag.dag_id
           WHERE 
               dag_run.state = 'running' 
               AND NOT dag.is_paused 
               AND task_instance.state = 'scheduled' 
               AND dag.bundle_name IS NOT NULL
       ) AS windowed_tis ON task_instance.id = windowed_tis.id
       JOIN dag ON task_instance.dag_id = dag.dag_id
       JOIN slot_pool ON task_instance.pool = slot_pool.pool
       LEFT OUTER JOIN dag_run_active_tasks ON task_instance.dag_id = 
dag_run_active_tasks.dag_id AND task_instance.run_id = 
dag_run_active_tasks.run_id
       LEFT OUTER JOIN active_tis_across_dag_runs ON task_instance.dag_id = 
active_tis_across_dag_runs.dag_id AND task_instance.task_id = 
active_tis_across_dag_runs.task_id
       LEFT OUTER JOIN active_tis_in_one_task ON task_instance.dag_id = 
active_tis_in_one_task.dag_id AND task_instance.run_id = 
active_tis_in_one_task.run_id AND task_instance.task_id = 
active_tis_in_one_task.task_id
       LEFT OUTER JOIN pool_active_tasks ON task_instance.pool = 
pool_active_tasks.pool
       JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND 
dag_run_1.run_id = task_instance.run_id 
   WHERE 
       coalesce(windowed_tis.total_tis_per_dagrun_count, 0) + 
coalesce(dag_run_active_tasks.now_running, 0) <= coalesce(dag.max_active_tasks, 
128)
       AND coalesce(windowed_tis.tis_per_dag_count, 0) + 
coalesce(active_tis_across_dag_runs.now_running, 0) <= 
coalesce(task_instance.max_active_tis_per_dag, 128)
       AND coalesce(windowed_tis.mapped_tis_per_dagrun_count, 0) + 
coalesce(active_tis_in_one_task.now_running, 0) <= 
coalesce(task_instance.max_active_tis_per_dagrun, 128)
       AND coalesce(windowed_tis.pool_slots_taken_sum, 0) + 
coalesce(pool_active_tasks.now_running, 0) <= coalesce(slot_pool.slots, 128)
   LIMIT 128;
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to