Asquator commented on PR #53492:
URL: https://github.com/apache/airflow/pull/53492#issuecomment-3155838953

   > I printed out the updated query and posted it here
   > 
   > https://gist.github.com/dstandish/a748988614f9f718f6b96b1330c04eec
   > 
   > It's pretty ugly. I would guess performance is not gonna be good, and 
suspect there is room for optimization.
   > 
   > Maybe try writing the query first in sql optimally, then figure out how to 
implement it in sqlalchemy.
   > 
   > Among other things, it seems you nest many window functions over and over, 
but you likely can include most of them in the same select, i would bet.
   > 
   > Lots of stuff in there that i think needs to be improved upon. Hope that 
is helpful.
   
   The query looks awful because SQLalchemy insists on listing all the existing 
fields when doing `select(TI)`.
   If we combine those, the query will be much shorter:
   ```sql
   WITH
       anon_5 AS
           (SELECT
                task_instance.dag_id AS dag_id,
                task_instance.run_id AS run_id,
                count(:count_1) AS now_running
            FROM task_instance
            WHERE
                task_instance.state IN (__[postcompile_state_4])
            GROUP BY task_instance.dag_id, task_instance.run_id
           ),
       anon_6 AS
           (SELECT
                task_instance.dag_id AS dag_id,
                task_instance.task_id AS task_id,
                count(:count_2) AS now_running
            FROM task_instance
            WHERE
                task_instance.state IN (__[postcompile_state_5])
            GROUP BY task_instance.dag_id, task_instance.task_id
           ),
       anon_7 AS
           (SELECT
                task_instance.dag_id AS dag_id,
                task_instance.run_id AS run_id,
                task_instance.task_id AS task_id,
                count(:count_3) AS now_running
            FROM task_instance
            WHERE
                task_instance.state IN (__[postcompile_state_6])
            GROUP BY task_instance.dag_id, task_instance.run_id, 
task_instance.task_id
           ),
       anon_8 AS
           (SELECT
                task_instance.pool AS pool,
                count(:count_4) AS now_running
            FROM task_instance
            WHERE
                task_instance.state IN (__[postcompile_state_7])
            GROUP BY task_instance.pool
           )
   SELECT
       task_instance.*
       dag_run_1.*
   FROM
       task_instance
       JOIN (SELECT
                 task_instance.*
                     sum(task_instance.pool_slots)
                     OVER (PARTITION BY task_instance.pool ORDER BY 
-task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) 
AS pool_slots_taken
             FROM
                 task_instance
                 JOIN (SELECT
                           task_instance.*
                               row_number()
                               OVER (PARTITION BY task_instance.dag_id, 
task_instance.run_id, task_instance.task_id ORDER BY 
-task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) 
AS mapped_tis_per_dagrun_count
                       FROM
                           task_instance
                           JOIN (SELECT
                                     task_instance.*
                                         row_number()
                                         OVER (PARTITION BY 
task_instance.dag_id, task_instance.task_id ORDER BY 
-task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) 
AS tis_per_dag_count
                                 FROM
                                     task_instance
                                     JOIN (SELECT
                                               task_instance.*
                                                   row_number()
                                                   OVER (PARTITION BY 
task_instance.dag_id, task_instance.run_id ORDER BY 
-task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) 
AS total_tis_per_dagrun_count
                                           FROM
                                               task_instance
                                               JOIN dag_run ON dag_run.dag_id = 
task_instance.dag_id
                                                   AND dag_run.run_id = 
task_instance.run_id
                                               JOIN dag ON task_instance.dag_id 
= dag.dag_id
                                           WHERE
                                               dag_run.state = :state_2
                                               AND NOT dag.is_paused
                                               AND task_instance.state = 
:state_3
                                               AND dag.bundle_name IS NOT NULL
                                     ) AS anon_4 ON task_instance.id = anon_4.id
                                     LEFT OUTER JOIN anon_5
                                         ON task_instance.dag_id = 
anon_5.dag_id AND task_instance.run_id = anon_5.run_id
                                     JOIN dag_run ON task_instance.run_id = 
dag_run.run_id
                                     JOIN dag ON task_instance.dag_id = 
dag.dag_id
                                 WHERE
                                     
coalesce(anon_4.total_tis_per_dagrun_count, 0) + coalesce(anon_5.now_running, 0)
                                         <= coalesce(dag.max_active_tasks, 
:coalesce_1)
                           ) AS anon_3 ON task_instance.id = anon_3.id
                           LEFT OUTER JOIN anon_6
                               ON task_instance.dag_id = anon_6.dag_id AND 
task_instance.task_id = anon_6.task_id
                           JOIN dag_run ON task_instance.run_id = dag_run.run_id
                       WHERE
                           coalesce(anon_3.tis_per_dag_count, 0) + 
coalesce(anon_6.now_running, 0)
                               <= 
coalesce(task_instance.max_active_tis_per_dag, :coalesce_2)
                 ) AS anon_2 ON task_instance.id = anon_2.id
                 LEFT OUTER JOIN anon_7 ON task_instance.dag_id = anon_7.dag_id 
AND task_instance.run_id = anon_7.run_id
                     AND task_instance.task_id = anon_7.task_id
                 JOIN dag_run ON task_instance.run_id = dag_run.run_id
             WHERE
                 coalesce(anon_2.mapped_tis_per_dagrun_count, 0) + 
coalesce(anon_7.now_running, 0)
                     <= coalesce(task_instance.max_active_tis_per_dagrun, 
:coalesce_3)
       ) AS anon_1 ON task_instance.id = anon_1.id
       LEFT OUTER JOIN anon_8 ON task_instance.pool = anon_8.pool
       JOIN dag_run ON task_instance.run_id = dag_run.run_id
       JOIN slot_pool ON task_instance.pool = slot_pool.pool
       JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND 
dag_run_1.run_id = task_instance.run_id
   WHERE
       coalesce(anon_1.pool_slots_taken, 0) + coalesce(anon_8.now_running, 0) 
<= coalesce(slot_pool.slots, :coalesce_4)
   LIMIT :param_1```
   
   Regarding placing all windows in the same select - it won't work due to how 
filtering is done in the `where` clause. To correctly check all limits (an AND 
predicate), we have to nest them.
   I already referred to this issue and some performance considerations in 
https://github.com/apache/airflow/issues/45636.
   
   Anyway, I'm starting to work on the performance tests which will show us how 
slower this query will be.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to