Asquator commented on PR #53492:
URL: https://github.com/apache/airflow/pull/53492#issuecomment-3165220892
Optimized the query:
- No more nested windows, all of them are computed in a flat manner
- No more passing task instance.* fields across subqueries
- Fixed flawed joins
The runnable query version for max_tis=128:
```sql
WITH dag_run_active_tasks AS (
SELECT
task_instance.dag_id AS dag_id,
task_instance.run_id AS run_id,
count(*) AS now_running
FROM task_instance
WHERE task_instance.state IN ('queued', 'running')
GROUP BY task_instance.dag_id, task_instance.run_id
),
active_tis_across_dag_runs AS (
SELECT
task_instance.dag_id AS dag_id,
task_instance.task_id AS task_id,
count(*) AS now_running
FROM task_instance
WHERE task_instance.state IN ('queued', 'running')
GROUP BY task_instance.dag_id, task_instance.task_id
),
active_tis_in_one_task AS (
SELECT
task_instance.dag_id AS dag_id,
task_instance.run_id AS run_id,
task_instance.task_id AS task_id,
count(*) AS now_running
FROM task_instance
WHERE task_instance.state IN ('queued', 'running')
GROUP BY task_instance.dag_id, task_instance.run_id,
task_instance.task_id
),
pool_active_tasks AS (
SELECT
task_instance.pool AS pool,
count(*) AS now_running
FROM task_instance
WHERE task_instance.state IN ('queued', 'running', 'deferred')
GROUP BY task_instance.pool
)
SELECT
task_instance.*,
dag_run_1.*
FROM
task_instance
JOIN (
SELECT
task_instance.id AS id,
row_number() OVER (
PARTITION BY task_instance.dag_id, task_instance.run_id
ORDER BY -task_instance.priority_weight,
dag_run.logical_date, task_instance.map_index
) AS total_tis_per_dagrun_count,
row_number() OVER (
PARTITION BY task_instance.dag_id, task_instance.task_id
ORDER BY -task_instance.priority_weight,
dag_run.logical_date, task_instance.map_index
) AS tis_per_dag_count,
row_number() OVER (
PARTITION BY task_instance.dag_id, task_instance.run_id,
task_instance.task_id
ORDER BY -task_instance.priority_weight,
dag_run.logical_date, task_instance.map_index
) AS mapped_tis_per_dagrun_count,
sum(task_instance.pool_slots) OVER (
PARTITION BY task_instance.pool
ORDER BY -task_instance.priority_weight,
dag_run.logical_date, task_instance.map_index
) AS pool_slots_taken_sum
FROM
task_instance
JOIN dag_run ON dag_run.dag_id = task_instance.dag_id AND
dag_run.run_id = task_instance.run_id
JOIN dag ON task_instance.dag_id = dag.dag_id
WHERE
dag_run.state = 'running'
AND NOT dag.is_paused
AND task_instance.state = 'scheduled'
AND dag.bundle_name IS NOT NULL
) AS windowed_tis ON task_instance.id = windowed_tis.id
JOIN dag ON task_instance.dag_id = dag.dag_id
JOIN slot_pool ON task_instance.pool = slot_pool.pool
LEFT OUTER JOIN dag_run_active_tasks ON task_instance.dag_id =
dag_run_active_tasks.dag_id AND task_instance.run_id =
dag_run_active_tasks.run_id
LEFT OUTER JOIN active_tis_across_dag_runs ON task_instance.dag_id =
active_tis_across_dag_runs.dag_id AND task_instance.task_id =
active_tis_across_dag_runs.task_id
LEFT OUTER JOIN active_tis_in_one_task ON task_instance.dag_id =
active_tis_in_one_task.dag_id AND task_instance.run_id =
active_tis_in_one_task.run_id AND task_instance.task_id =
active_tis_in_one_task.task_id
LEFT OUTER JOIN pool_active_tasks ON task_instance.pool =
pool_active_tasks.pool
JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND
dag_run_1.run_id = task_instance.run_id
WHERE
coalesce(windowed_tis.total_tis_per_dagrun_count, 0) +
coalesce(dag_run_active_tasks.now_running, 0) <= coalesce(dag.max_active_tasks,
128)
AND coalesce(windowed_tis.tis_per_dag_count, 0) +
coalesce(active_tis_across_dag_runs.now_running, 0) <=
coalesce(task_instance.max_active_tis_per_dag, 128)
AND coalesce(windowed_tis.mapped_tis_per_dagrun_count, 0) +
coalesce(active_tis_in_one_task.now_running, 0) <=
coalesce(task_instance.max_active_tis_per_dagrun, 128)
AND coalesce(windowed_tis.pool_slots_taken_sum, 0) +
coalesce(pool_active_tasks.now_running, 0) <= coalesce(slot_pool.slots, 128)
LIMIT 128;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]