Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-28 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3812809548

   @kaxil @vatsrahul1001 @shahar1 @ashb Thank you for the help, the reviews and 
the testing!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-28 Thread via GitHub


kaxil commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3812806631

   Great work here @xBis7 and also for persisting for this long (~6 months).
   
   #protm


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-28 Thread via GitHub


kaxil commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3812803341

   🎉 Merged to let it bake in -- so we will know if something is up before 3.2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-28 Thread via GitHub


kaxil merged PR #54103:
URL: https://github.com/apache/airflow/pull/54103


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-28 Thread via GitHub


vatsrahul1001 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3812745284

   > Hi @vatsrahul1001, has there been any update on your testing?
   
   @xBis7 - Apologies for the delay. Completed testing now - this is a 
significant improvement!
   
   I used 10 DAGs with 100 parallel tasks each, `max_active_tasks=10`
   
   | Metric | main | PR | 
   ||--|-|
   | Starved DAG events | 141 | 0 |
   
   With this PR, the scheduler no longer fetches tasks it cannot queue. On 
main, tasks get fetched and then rejected when the DAG's `max_active_tasks` 
limit is already reached - with the PR, that check happens in the SQL query 
itself.
   
   Looking at the logs:
   - **Before:** Query pulls tasks mostly from same DAG, most get rejected
   - **After:** Query spreads task selection across multiple DAGs, all get 
queued
   
   Looks like a big win for scheduler efficiency.
   
   Also I do not see any regression from this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-28 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3810840079

   Hi @vatsrahul1001, has there been any update on your testing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-22 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2718328310


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()

Review Comment:
   You don't have to do it concurrently. If we run the starvation checks first 
and them apply the `max_active_tasks` limit, that gives us the desired behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-22 Thread via GitHub


kaxil commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3787281253

   @vatsrahul1001 Only pending your review/test now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-22 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3786303807

   @kaxil Thank you again for the review! I added the unit tests that you 
requested.
   
   I ran some tests and you were right about the starvation limits. I moved the 
checks in the inner query right before computing the row_num and now it works 
as expected. The new unit test verifies it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-22 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2718320758


##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -194,6 +194,52 @@ def _create_dagrun(
 return _create_dagrun
 
 
+def task_maker(

Review Comment:
   Added unit tests for both cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-20 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2708823289


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <= 
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)

Review Comment:
   I made the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-20 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2708777179


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <= 
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)

Review Comment:
   That's a valid point and an oversight on my behalf. Both of these options 
exist on the inner query but need to be applied on the outer one.
   
   ```
   .options(selectinload(TI.dag_model))
   ```
   
   doesn't have any effect unless applied on the outer query.
   
   About 
   
   ```
   .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
   ```
   
   I'll leave it in the inner and add to the outer as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-20 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2708414793


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -194,6 +207,16 @@ def _is_parent_process() -> bool:
 return multiprocessing.current_process().name == "MainProcess"
 
 
+def _get_current_dr_task_concurrency(states: Iterable[TaskInstanceState]) -> 
Subquery:
+"""Get the dag_run IDs and how many tasks are in the provided states for 
each one."""

Review Comment:
   I think you are referring to this part on lines 488-490
   
   ```python
   # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks.
   concurrency_map = ConcurrencyMap()
   concurrency_map.load(session=session)
   ```
   
   It immediately executes the query and gets the results while 
`_get_current_dr_task_concurrency` creates a subquery that gets included in the 
main query. The subquery will be executed in a later point which could be 
immediate but it could also be after a while. The results could be different 
even after 3 seconds.
   
   Is that what you mean? And what I should explain in a comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


Asquator commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3769492158

   Overall I see it as a nice improvement, because eventually it DOES give us 
better throughput at the moment. In the long term, we'll probably have to look 
at a more universal solution that prevents starvation with all the concurrency 
limits. I would like to see the benchmarks I published in the procedures PR run 
on this approach. The one thing that bothers me - what if we include both 
`max_active_tasks` and pool/per task/across dagruns constraints?
   
   I'm looking forward to see this merged and continue investigating into more 
enhancements for the scheduler.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


Asquator commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705612502


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()

Review Comment:
   I think it's inherent to this solution as it's impossible to check both 
simultaneously. This PR does prevent starvation, but only for 
`max_active_tasks` limit. Bumped into this too when trying to consider all the 
concurrency limits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3769380365

   > @kaxil Thank you for the review! I'm going to add the tests and then run 
some other tests of my own for your other comments.
   
   Sounds good, we are soo-close. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705400293


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <= 
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)

Review Comment:
   Not sure what happened to my earlier comment so re-adding: This new query is 
missing `.options(selectinload(TI.dag_model))` and `.with_hint(TI, "USE INDEX 
(ti_state)", dialect_name="mysql")` which were on the original query. Every 
`ti.dag_model` access later will trigger a separate query - with 50 TIs that's 
50+ extra queries per loop.



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -194,6 +207,16 @@ def _is_parent_process() -> bool:
 return multiprocessing.current_process().name == "MainProcess"
 
 
+def _get_current_dr_task_concurrency(states: Iterable[TaskInstanceState]) -> 
Subquery:
+"""Get the dag_run IDs and how many tasks are in the provided states for 
each one."""

Review Comment:
   This queries the same data as `ConcurrencyMap.load()` which is still called 
and used for the check at lines ~680-695. Worth adding a comment explaining why 
we keep both? (race condition protection between query time and check time?)



##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -194,6 +194,52 @@ def _create_dagrun(
 return _create_dagrun
 
 
+def task_maker(

Review Comment:
   Couple test cases worth adding:
   
   1. **Starvation filter ordering**: dag run with tasks in mixed pools (some 
starved, some not). Verify non-starved pool tasks aren't excluded because 
starved-pool tasks consumed row_number slots.
   
   2. **Partial capacity**: dag run with `max_active_tasks=4` where 2 are 
already RUNNING + 10 SCHEDULED. Verify query returns only 2 (not 4) for that 
run.



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+

Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705372034


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <= 
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)

Review Comment:
   (Missed eager loading of `selectinload`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705351661


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.

Review Comment:
   The row_number ranking happens here before starvation filters 
(starved_pools, etc) are applied below. In the original code, those filters 
were applied BEFORE the limit.
   
   Tasks in starved pools will consume row_number slots and then get filtered 
out, potentially excluding schedulable tasks from the same dag run. Should we 
apply starvation filters to the base query before building ranked_query?



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -194,6 +207,16 @@ def _is_parent_process() -> bool:
 return multiprocessing.current_process().name == "MainProcess"
 

Review Comment:
   This queries the same data as `ConcurrencyMap.load()` which is still called 
and used for the check at lines ~665-680. With SQL-level filtering now in 
place, that Python check should mostly pass (barring race conditions). Worth 
adding a comment explaining why we keep both?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705351657


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <= 
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,

Review Comment:
   This doesn't account for already running/queued tasks. If `max_active_tasks 
= 4` and there are already 2 running, we still return up to 4 scheduled tasks, 
but can only queue 2.
   
   Should be:
   ```python
   .where(ranked_query.c.row_num <= (ranked_query.c.dr_max_active_tasks - 
func.coalesce(ranked_query.c.task_per_dr_count, 0)))
   ```
   
   You'd need to add `task_per_dr_count` as a column in ranked_query.



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label(

Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705351672


##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -1266,6 +1312,71 @@ def 
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
 ]
 assert len(b_tis_in_wrong_executor) == 0
 
+@conf_vars(
+{
+("scheduler", "max_tis_per_query"): "100",
+("scheduler", "max_dagruns_to_create_per_loop"): "10",
+("scheduler", "max_dagruns_per_loop_to_schedule"): "20",
+("core", "parallelism"): "100",
+("core", "max_active_tasks_per_dag"): "4",
+("core", "max_active_runs_per_dag"): "10",
+("core", "default_pool_task_slot_count"): "64",
+}
+)
+def test_per_dr_limit_applied_in_task_query(self, dag_maker, 
mock_executors):
+scheduler_job = Job()
+scheduler_job.executor.parallelism = 100
+scheduler_job.executor.slots_available = 70
+scheduler_job.max_tis_per_query = 100
+self.job_runner = SchedulerJobRunner(job=scheduler_job)
+session = settings.Session()
+
+# Use the same run_id.
+task_maker(dag_maker, session, "dag_1300_tasks", 1300, 4, "run1")
+task_maker(dag_maker, session, "dag_1200_tasks", 1200, 4, "run1")
+task_maker(dag_maker, session, "dag_1100_tasks", 1100, 4, "run1")
+task_maker(dag_maker, session, "dag_100_tasks", 100, 4, "run1")
+task_maker(dag_maker, session, "dag_90_tasks", 90, 4, "run1")
+task_maker(dag_maker, session, "dag_80_tasks", 80, 4, "run1")
+
+count = 0
+iterations = 0
+
+from airflow.configuration import conf
+
+task_num = conf.getint("core", "max_active_tasks_per_dag") * 6
+
+# 6 dags * 4 = 24.
+assert task_num == 24
+
+queued_tis = None
+while count < task_num:
+# Use `_executable_task_instances_to_queued` because it returns a 
list of TIs
+# while `_critical_section_enqueue_task_instances` just returns 
the number of the TIs.
+queued_tis = self.job_runner._executable_task_instances_to_queued(
+max_tis=scheduler_job.executor.slots_available, session=session
+)
+count += len(queued_tis)
+iterations += 1
+
+assert iterations == 1
+assert count == task_num
+
+assert queued_tis is not None
+
+dag_counts = Counter(ti.dag_id for ti in queued_tis)
+
+# Tasks from all 6 dags should have been queued.
+assert len(dag_counts) == 6
+assert dag_counts == {
+"dag_1300_tasks": 4,
+"dag_1200_tasks": 4,
+"dag_1100_tasks": 4,
+"dag_100_tasks": 4,
+"dag_90_tasks": 4,
+"dag_80_tasks": 4,
+}, "Count for each dag_id should be 4 but it isn't"
+
 def test_find_executable_task_instances_order_priority_with_pools(self, 
dag_maker):

Review Comment:
   Couple test cases worth adding:
   
   1. **Starvation filter ordering**: dag run with tasks in mixed pools (some 
starved, some not). Verify non-starved pool tasks aren't excluded because 
starved-pool tasks consumed row_number slots.
   
   2. **Partial capacity**: dag run with `max_active_tasks=4` where 2 are 
already RUNNING + 10 SCHEDULED. Verify query returns only 2 (not 4) for that 
run.



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -515,7 +590,13 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 
 try:
 locked_query = with_row_locks(query, of=TI, session=session, 
skip_locked=True)
-task_instances_to_examine: list[TI] = 
list(session.scalars(locked_query).all())
+task_instances_to_examine = session.scalars(locked_query).all()
+
+self.log.debug("Length of the tis to examine is %d", 
len(task_instances_to_examine))
+self.log.debug(
+"TaskInstance selection is: %s",
+dict(Counter(ti.dag_id for ti in 
task_instances_to_examine)),
+)
 
 timer.stop(send=True)

Review Comment:
   nit: The `Counter()` iteration runs even when debug logging is disabled. If 
we're optimizing for perf:
   ```python
   if self.log.isEnabledFor(logging.DEBUG):
   self.log.debug(...)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705351655


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <= 
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)
+
 if starved_pools:
 query = query.where(TI.pool.not_in(starved_pools))

Review Comment:
   This new query is missing `.options(selectinload(TI.dag_model))` which was 
on the original query above. When we rebuild the query here, we lose the eager 
loading - so every access to `ti.dag_model` later will trigger a separate 
query. With 50 TIs that's 50+ extra queries per loop, partially negating the 
perf gains.
   
   Also missing `.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")` 
- should add both here.



##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -1266,6 +1312,71 @@ def 
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
 ]
 assert len(b_tis_in_wrong_executor) == 0
 
+@conf_vars(
+{
+("scheduler", "max_tis_per_query"): "100",
+("scheduler", "max_dagruns_to_create_per_loop"): "10",
+("scheduler", "max_dagruns_per_loop_to_schedule"): "20",
+("core", "parallelism"): "100",
+("core", "max_active_tasks_per_dag"): "4",
+("core", "max_active_runs_per_dag"): "10",
+("core", "default_pool_task_slot_count"): "64",
+}
+)
+def test_per_dr_limit_applied_in_task_query(self, dag_maker, 
mock_executors):
+scheduler_job = Job()
+scheduler_job.executor.parallelism = 100
+scheduler_job.executor.slots_available = 70
+scheduler_job.max_tis_per_query = 100
+self.job_runner = SchedulerJobRunner(job=scheduler_job)
+session = settings.Session()
+
+# Use the same run_id.
+task_maker(dag_maker, session, "dag_1300_tasks", 1300, 4, "run1")
+task_maker(dag_maker, session, "dag_1200_tasks", 1200, 4, "run1")
+task_maker(dag_maker, session, "dag_1100_tasks", 1100, 4, "run1")
+task_maker(dag_maker, session, "dag_100_tasks", 100, 4, "run1")
+task_maker(dag_maker, session, "dag_90_tasks", 90, 4, "run1")
+ 

Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705247075


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <= 
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)

Review Comment:
   This new query is missing `.options(selectinload(TI.dag_model))` which was 
on the original query above. When we rebuild the query here, we lose the eager 
loading - so every access to `ti.dag_model` later will trigger a separate 
query. With 50 TIs that's 50+ extra queries per loop, which partially negates 
the perf gains from this PR.
   
   Also missing the `.with_hint(TI, "USE INDEX (ti_state)", 
dialect_name="mysql")` - should add both here.



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -515,7 +590,13 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 
 try:
 locked_query = with_row_locks(query, of=TI, session=session, 
skip_locked=True)
-task_instances_to_examine: list[TI] = 
list(session.scalars(locked_query).all())
+task_instances_to_examine = session.scalars(locked_query).all()
+
+self.log.debug("Length of the tis to examine is %d", 
len(task_instances_to_examine))
+self.log.debug(
+"TaskInstance selection is: %s",

Review Comment:
   nit: The `Counter()` iteration happens even when debug logging is disabled. 
Not a big deal but if we're optimizing for perf, might want to guard this:
   ```python
   if self.log.isEnabledFor(logging.DEBUG):
   self.log.debug(...)
   ```



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -194,6 +207,16 @@ def _is_parent_process() -> bool:
 return multiprocessing.current_process().name == "MainProcess"

Review Comment:
   This queries the same data as `ConcurrencyMap.load()` which is still called 
and used for the check at lines 617-634. With the SQL-level filtering now in 
place, that Python check should always pass (barring race conditions). Worth 
adding a comment explaining why we keep both - race condition protection 
between query time and check time?



##
airflow-core/src/airflow/jobs/scheduler_jo

Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705324006


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -515,7 +590,13 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 
 try:
 locked_query = with_row_locks(query, of=TI, session=session, 
skip_locked=True)
-task_instances_to_examine: list[TI] = 
list(session.scalars(locked_query).all())
+task_instances_to_examine = session.scalars(locked_query).all()
+
+self.log.debug("Length of the tis to examine is %d", 
len(task_instances_to_examine))
+self.log.debug(
+"TaskInstance selection is: %s",

Review Comment:
   It makes sense. I fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3769053071

   @kaxil Thank you for the review! I'm going to add the tests and then run 
some other tests of my own for your other comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705274366


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.

Review Comment:
   The row_number ranking is applied here before the starvation filters 
(starved_pools, starved_dags, etc) are added below. In the original code, those 
filters were applied BEFORE the limit.
   
   This means tasks in starved pools will consume row_number slots and then get 
filtered out later, potentially excluding schedulable tasks from the same dag 
run that would have fit within the limit.
   
   Should we apply starvation filters to the base query before building 
ranked_query?



##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -1266,6 +1312,71 @@ def 
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
 ]
 assert len(b_tis_in_wrong_executor) == 0
 
+@conf_vars(
+{
+("scheduler", "max_tis_per_query"): "100",
+("scheduler", "max_dagruns_to_create_per_loop"): "10",
+("scheduler", "max_dagruns_per_loop_to_schedule"): "20",
+("core", "parallelism"): "100",
+("core", "max_active_tasks_per_dag"): "4",
+("core", "max_active_runs_per_dag"): "10",
+("core", "default_pool_task_slot_count"): "64",
+}
+)
+def test_per_dr_limit_applied_in_task_query(self, dag_maker, 
mock_executors):
+scheduler_job = Job()
+scheduler_job.executor.parallelism = 100
+scheduler_job.executor.slots_available = 70
+scheduler_job.max_tis_per_query = 100
+self.job_runner = SchedulerJobRunner(job=scheduler_job)
+session = settings.Session()
+
+# Use the same run_id.
+task_maker(dag_maker, session, "dag_1300_tasks", 1300, 4, "run1")
+task_maker(dag_maker, session, "dag_1200_tasks", 1200, 4, "run1")
+task_maker(dag_maker, session, "dag_1100_tasks", 1100, 4, "run1")
+task_maker(dag_maker, session, "dag_100_tasks", 100, 4, "run1")
+task_maker(dag_maker, session, "dag_90_tasks", 90, 4, "run1")
+task_maker(dag_maker, session, "dag_80_tasks", 80, 4, "run1")
+
+count = 0
+iterations = 0
+
+from airflow.configuration import conf
+
+task_num = conf.getint("core", "max_active_tasks_per_dag") * 6
+
+# 6 dags * 4 = 24.
+assert task_num == 24
+
+queued_tis = None
+while count < task_num:
+# Use `_executable_task_instances_to_queued` because it returns a 
list of TIs
+# while `_critical_section_enqueue_task_instances` just returns 
the number of the TIs.
+queued_tis = self.job_runner._executable_task_instances_to_queued(
+max_tis=scheduler_job.executor.slots_available, session=session
+)
+count += len(queued_tis)
+iterations += 1
+
+assert iterations == 1
+assert count == task_num
+
+assert queued_tis is not None
+
+dag_counts = Counter(ti.dag_id for ti in queued_tis)
+
+# Tasks from all 6 dags should have been queued.
+assert len(dag_counts) == 6
+assert dag_counts == {
+"dag_1300_tasks": 4,
+"dag_1200_tasks": 4,
+"dag_1100_tasks": 4,
+

Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705256990


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <= 
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)

Review Comment:
   So, probably
   ```py
   query = (
   select(TI)
   .select_from(ranked_query)
   .join(
   TI,
   (TI.dag_id == ranked_query.c.dag_id)
   & (TI.task_id == ranked_query.c.task_id)
   & (TI.run_id == ranked_query.c.run_id)
   & (TI.map_index == ranked_query.c.map_index),
   )
   .where(ranked_query.c.row_num <= ranked_query.c.dr_max_active_tasks)
   .order_by(
   -ranked_query.c.priority_weight_for_ordering,
   ranked_query.c.logical_date_for_ordering,
   ranked_query.c.map_index_for_ordering,
   )
   .options(selectinload(TI.dag_model))
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705247082


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,

Review Comment:
   Nice - the outer join with COALESCE handles dag runs with 0 executing tasks 
cleanly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705235070


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)

Review Comment:
   Somewhere here I think you need `.options(selectinload(TI.dag_model))` 
otherwise every access to `task_instance.dag_model` later in the code will 
trigger a separate query.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705247082


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,

Review Comment:
   Nice - the outer join with COALESCE handles dag runs with 0 executing tasks 
cleanly.



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -194,6 +207,16 @@ def _is_parent_process() -> bool:
 return multiprocessing.current_process().name == "MainProcess"

Review Comment:
   This queries the same data as `ConcurrencyMap.load()` which is still called 
and used for the check at lines 617-634. With the SQL-level filtering now in 
place, that Python check should always pass (barring race conditions). Worth 
adding a comment explaining why we keep both - race condition protection 
between query time and check time?



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <= 
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)

Review Comment:
   This new query is missing `.options(selectinload(TI.dag_model))` which was 
on the original query above. When we rebuild the query here, we lose the eager 
loading - so every access to `ti.dag_model` later will trigger a separate 
query. With 50 TIs that's 50+ extra queries per loop, which partially negates 
the perf gains from this PR.
   
   Also missing the `.with_hint(TI, "USE INDEX (ti_state)", 
dialect_name="mysql")` - should add both here.



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -515,7 +590,13 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 
 try:
 locked_query = with_row_locks(query, of=TI, session=session, 
skip_locked=True)
-task_instances_to_examine: list[TI] = 
list(ses

Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705235070


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) < 
DM.max_active_tasks
+)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)

Review Comment:
   Somewhere here I think you need `.options(selectinload(TI.dag_model))` 
otherwise every access to `task_instance.dag_model` later in the code will 
trigger a separate query.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-19 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3767137353

   @Nataneljpwd Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-13 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3745522411

   @vatsrahul1001 Thanks! Please let me know if there is something I can do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2026-01-13 Thread via GitHub


vatsrahul1001 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3745504675

   @xBis7 Thanks for the PR. I plan to run some test on this before we merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-12-23 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2644030886


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -488,10 +513,58 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 
0) < DM.max_active_tasks)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <= 
ranked_query.c.dr_max_active_tasks)

Review Comment:
   I've run tests and this change doesn't seem to make any difference so far.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-12-22 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2640505174


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -488,10 +513,58 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 
0) < DM.max_active_tasks)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <= 
ranked_query.c.dr_max_active_tasks)

Review Comment:
   I'm going to give this a try and run some tests. If it alters the scheduler 
behavior too much then it might not be worth it for now.
   
   The current patch doesn't fix everything but provides certain performance 
improvements and it has been very thoroughly tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-12-22 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2639198443


##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -1180,6 +1226,102 @@ def 
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
 ]
 assert len(b_tis_in_wrong_executor) == 0
 
+def task_helper(self, dag_maker, session, dag_id: str, task_num: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0:  # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}", 
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] = 
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# add
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = 4
+
+session.flush()
+
+return tis_list

Review Comment:
   This was a rebase mixup. It has been fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-12-21 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3678912476

   I've been running various tests and while doing so, I noticed the metrics 
that I was getting weren't consistent between consecutive runs. So, I started 
logging the same values that I was sending to OTel for creating the metrics and 
I found out that in certain cases, the final numbers were different from what 
we would see in the Grafana dashboards. For example, I would see in Grafana 
that the maximum number of active concurrent dag runs was 5 while the actual 
number was higher and it was 7. The reason for that is that we export the 
current value at an interval and not all past and present values. The collector 
gets samples and not a complete list of the values.
   
   In order to capture more values for the diagrams, I decreased the sampling 
step and the export interval by a lot. That made even instant values visible in 
the diagrams.
   
   The higher the load, the more the time the tests take to run, and so we end 
up with more samples for the metrics which in return lead to a more reliable 
and representative diagram.
   
   A very simple test case that I ran, was triggering 2 dag runs of the same 
dag with and without the patch. In that scenario, the results are random. 
Sometimes the patch is faster while others it's not but in all cases the 
difference is negligible. For example, the scheduler might do 4 iterations with 
the patch and 5 without, or 5 in both cases or 4 without and 6 with. The 
timings end up +-1s.
   
   ## Testing different topologies
   
   I used the following dags
   
   * 5 linear dags with 10 tasks each, all in a sequence
   
   ```
   0 -> 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8 -> 9 -> 10
   ```
   
   * 2 dags with a single root task and 100 downstream tasks all running in 
parallel
   
   ```
   0 -> (1_1, 1_2, 1_3, 1_4, ..., 1_100 -- parallel)
   ```
   
   * 5 branching dags with 1 root task, that has 5 children tasks, and each 
child task has 3 children tasks
   
   ```
 0
 |

___
   ||| 
|   |
  1_1  1_2  1_3   
1_4 1_5
   ||| 
|   |
 ___
___ ___
|  |  |  |  |  |  |  |  |  |   
|   |   |   |   |
   2_12_22_32_42_52_62_72_82_92_10
2_112_122_132_142_15
   ```
   
   I triggered all of them and also created multiple dag_runs for most of them. 
There wasn't much difference in the number of scheduler iterations but it 
consistently queued the number of tasks that it examined, in contrast to 
examining a very high number of tasks, only to queue just a few which is what 
happens without the patch.
   
   The difference in the time needed to run the dags, is noticeable.
   
   * with the patch
   * scheduler iterations: 150
   * total time: 248.28s
   * without the patch
   * scheduler iterations: 162
   * total time: 345.64s
   
   https://github.com/user-attachments/assets/89793622-2308-4429-bfeb-a6fd24317663";
 />
   
   
   ## Testing heavy load
   
   This is the original test where I created multiple dags with all tasks 
running in parallel without any dependencies between them.
   
   * dag_45_tasks
   * 45 parallel tasks
   * dag_250_tasks
   * 250 parallel tasks
   * dag_470_tasks
   * 470 parallel tasks
   * dag_1000_tasks
   * 1000 parallel tasks
   * dag_1100_tasks
   * 1100 parallel tasks
   * dag_1200_tasks
   * 1200 parallel tasks
   
   I reran this test because I made a lot of changes in my testing 
infrastructure and I wanted to verify that the new metrics are in line with the 
old ones that I previously shared.
   
   * with the patch
   * scheduler iterations: 402
   * max number of cuncurrent DRs: 6
   * total time: 675.25s
   * without the patch
   * scheduler iterations: 1343
   * max number of cuncurrent DRs: 3
   * total time: 1024.49s
   
   https://github.com/user-attachments/assets/8f18cfca-3203-4505-aa42-c9769d54d1a3";
 />
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-12-06 Thread via GitHub


Copilot commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2594698573


##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -1180,6 +1226,102 @@ def 
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
 ]
 assert len(b_tis_in_wrong_executor) == 0
 
+def task_helper(self, dag_maker, session, dag_id: str, task_num: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0:  # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}", 
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] = 
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# add
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = 4
+
+session.flush()
+
+return tis_list

Review Comment:
   The `task_helper` method (lines 1229-1258) is nearly identical to the 
module-level `task_maker` function (lines 184-227). The only significant 
differences are:
   1. `task_helper` has a hardcoded `max_active_tasks = 4` (line 1254) instead 
of accepting it as a parameter
   2. `task_helper` doesn't support custom `run_id` parameter
   
   Consider removing this duplicate method and using the `task_maker` function 
instead, or refactor to reduce code duplication. For example, the test at line 
1271 could use `task_maker` directly.



##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -181,6 +181,52 @@ def _create_dagrun(
 return _create_dagrun
 
 
+def task_maker(
+dag_maker, session, dag_id: str, task_num: int, max_active_tasks: int, 
run_id: str | None = None
+):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0:  # 10, 20, 30, 40, 50, ...

Review Comment:
   The comment suggests the condition matches tasks at indices 10, 20, 30, 
etc., but the condition `(i % 10) == 0` actually matches indices 0, 10, 20, 30, 
etc. The comment should be updated to "# 0, 10, 20, 30, 40, ..." to accurately 
reflect when the condition is true.
   ```suggestion
   if (i % 10) == 0:  # 0, 10, 20, 30, 40, ...
   ```



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -488,10 +513,58 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 
0) < DM.max_active_tasks)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by dag_id and 
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+  

Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-11-24 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3569749637

   @uranusjr I made the changes. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-11-24 Thread via GitHub


uranusjr commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2555020477


##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -1178,6 +1224,97 @@ def 
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
 ]
 assert len(b_tis_in_wrong_executor) == 0
 
+def task_helper(self, dag_maker, session, dag_id: str, task_num: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0:  # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}", 
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] = 
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# add
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = 4
+
+session.flush()
+
+return tis_list
+
+@conf_vars(
+{
+("scheduler", "max_tis_per_query"): "100",
+("scheduler", "max_dagruns_to_create_per_loop"): "10",
+("scheduler", "max_dagruns_per_loop_to_schedule"): "20",
+("core", "parallelism"): "100",
+("core", "max_active_tasks_per_dag"): "4",
+("core", "max_active_runs_per_dag"): "10",
+("core", "default_pool_task_slot_count"): "64",
+}
+)
+def test_per_dr_limit_applied_in_task_query(self, dag_maker, 
mock_executors):
+scheduler_job = Job()
+scheduler_job.executor.parallelism = 100
+scheduler_job.executor.slots_available = 70
+scheduler_job.max_tis_per_query = 100
+self.job_runner = SchedulerJobRunner(job=scheduler_job)
+session = settings.Session()
+
+# Use the same run_id.
+task_maker(dag_maker, session, "dag_1300_tasks", 1300, 4, "run1")
+task_maker(dag_maker, session, "dag_1200_tasks", 1200, 4, "run1")
+task_maker(dag_maker, session, "dag_1100_tasks", 1100, 4, "run1")
+task_maker(dag_maker, session, "dag_100_tasks", 100, 4, "run1")
+task_maker(dag_maker, session, "dag_90_tasks", 90, 4, "run1")
+task_maker(dag_maker, session, "dag_80_tasks", 80, 4, "run1")
+
+count = 0
+iterations = 0
+
+from airflow.configuration import conf
+
+task_num = conf.getint("core", "max_active_tasks_per_dag") * 6
+
+# 6 dags * 4 = 24.
+assert task_num == 24
+
+queued_tis = None
+while count < task_num:
+# Use `_executable_task_instances_to_queued` because it returns a 
list of TIs
+# while `_critical_section_enqueue_task_instances` just returns 
the number of the TIs.
+queued_tis = self.job_runner._executable_task_instances_to_queued(
+max_tis=scheduler_job.executor.slots_available, session=session
+)
+count += len(queued_tis)
+iterations += 1
+
+assert iterations == 1
+assert count == task_num
+
+assert queued_tis is not None
+
+dag_counts = Counter(ti.dag_id for ti in queued_tis)
+
+# Tasks from all 6 dags should have been queued.
+assert len(dag_counts) == 6
+assert all(count == 4 for count in dag_counts.values()), (
+"Count for each dag_id should be 4 but it isn't"
+)

Review Comment:
   It’d probably be easier for debugging if this just do
   
   ```python
   assert dag_counts == {
   "dag_1300_tasks": 4,
   "dag_1200_tasks": 4,
   ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-11-24 Thread via GitHub


uranusjr commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2555010941


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -515,6 +588,12 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 locked_query = with_row_locks(query, of=TI, session=session, 
skip_locked=True)
 task_instances_to_examine: list[TI] = 
list(session.scalars(locked_query).all())

Review Comment:
   ```suggestion
   task_instances_to_examine = 
session.scalars(locked_query).all()
   ```
   
   Not really related, but let’s fix this potentially wasteful line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-11-19 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3553535226

   Hi @ashb, is there any update on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-11-04 Thread via GitHub


ashb commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3486498022

   I'm still (re-)running some tests, but so far I've found this change is 
either a net benefit to task throughput, or equal.
   
   I'm leaning towards this not being behind a config flag right now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-11-03 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2487438728


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -121,12 +121,11 @@ core:
   default: "32"
 max_active_tasks_per_dag:
   description: |
-The maximum number of task instances allowed to run concurrently in 
each DAG. To calculate
-the number of tasks that is running concurrently for a DAG, add up the 
number of running
-tasks for all DAG runs of the DAG. This is configurable at the DAG 
level with ``max_active_tasks``,
+The maximum number of task instances allowed to run concurrently in 
each dag run.
+This is also configurable per-dag with ``max_active_tasks``,

Review Comment:
   No problem. I reverted the changes and created a separate PR
   
   https://github.com/apache/airflow/pull/57770



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-11-03 Thread via GitHub


ashb commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2487399540


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -121,12 +121,11 @@ core:
   default: "32"
 max_active_tasks_per_dag:
   description: |
-The maximum number of task instances allowed to run concurrently in 
each DAG. To calculate
-the number of tasks that is running concurrently for a DAG, add up the 
number of running
-tasks for all DAG runs of the DAG. This is configurable at the DAG 
level with ``max_active_tasks``,
+The maximum number of task instances allowed to run concurrently in 
each dag run.
+This is also configurable per-dag with ``max_active_tasks``,

Review Comment:
   A bit of a pain, but could you pull that out in to a separate PR please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-11-02 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2485361359


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -121,12 +121,11 @@ core:
   default: "32"
 max_active_tasks_per_dag:
   description: |
-The maximum number of task instances allowed to run concurrently in 
each DAG. To calculate
-the number of tasks that is running concurrently for a DAG, add up the 
number of running
-tasks for all DAG runs of the DAG. This is configurable at the DAG 
level with ``max_active_tasks``,
+The maximum number of task instances allowed to run concurrently in 
each dag run.
+This is also configurable per-dag with ``max_active_tasks``,

Review Comment:
   `max_active_tasks_per_dag` has actually been changed by a previous patch to 
be applied on a per dag_run base and not per dag. That patch neglected to 
update the docs as well.
   
   When I first made the changes for the current PR I thought that it was per 
dag until I ran the test suite and realized that it's per dag_run.
   
   I just updated the description to make it reflect how it has been working.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-11-02 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2485361359


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -121,12 +121,11 @@ core:
   default: "32"
 max_active_tasks_per_dag:
   description: |
-The maximum number of task instances allowed to run concurrently in 
each DAG. To calculate
-the number of tasks that is running concurrently for a DAG, add up the 
number of running
-tasks for all DAG runs of the DAG. This is configurable at the DAG 
level with ``max_active_tasks``,
+The maximum number of task instances allowed to run concurrently in 
each dag run.
+This is also configurable per-dag with ``max_active_tasks``,

Review Comment:
   `max_active_tasks_per_dag` has actually been changed by a previous patch to 
be applied on a per dag_run base and not per dag. That patch neglected to 
update the docs as well.
   
   When I first made the changes for the current PR I thought that it was per 
dag until I ran the test suite and realized that it's per dag_run.
   
   I just updated the description to make it reflect how it currently works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-11-02 Thread via GitHub


uranusjr commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2485314962


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -121,12 +121,11 @@ core:
   default: "32"
 max_active_tasks_per_dag:
   description: |
-The maximum number of task instances allowed to run concurrently in 
each DAG. To calculate
-the number of tasks that is running concurrently for a DAG, add up the 
number of running
-tasks for all DAG runs of the DAG. This is configurable at the DAG 
level with ``max_active_tasks``,
+The maximum number of task instances allowed to run concurrently in 
each dag run.
+This is also configurable per-dag with ``max_active_tasks``,

Review Comment:
   My understanding is this controls how many tasks _in the entire system_ are 
running concurrently (for the dag). Is this PR changing this, or just fixing an 
incorrect description?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-11-01 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2483790152


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+isouter=True,
+)
+.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 
0) < DM.max_active_tasks)

Review Comment:
   I did some testing and here is what I found.
   
   Let's assume that we have 3 dags with the same `run_id` and all the checks 
use only the `run_id`.
   
   * the query will return `max_active_tasks` from the 1st dag and ignore the 
other two because they have the same `run_id`
   * every consecutive iteration until the tasks from the 1st dag finish, won't 
pick any new tasks for examining and queueing
   * once the tasks finish, then it will pick `max_active_tasks` but it will 
again concentrate on 1 dag
   
   It would group all tasks from all dags as if they all belonged to the same 
dag_run. But the `dag_id, run_id` pair is unique and now the tasks are properly 
separated.
   
   This was a serious logical bug. I don't know how common it will be for 
multiple `dag_runs` to have the same `run_id` but it's good to have already 
accounted for that edge case. Thanks @kaxil, great catch! With the suggested 
changes it works as it should.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-10-31 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2480724286


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+isouter=True,
+)
+.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 
0) < DM.max_active_tasks)

Review Comment:
   You are right. I overlooked that the `run_id` isn't unique across all DAGs. 
Initially, the query used the `dag_id` until I realized that the behavior for 
the `max_active_tasks` has been changed and the limit is applied on a per 
dag_run base and not per dag. I'll do some testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-10-31 Thread via GitHub


ashb commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2480697365


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+isouter=True,
+)
+.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 
0) < DM.max_active_tasks)

Review Comment:
   Yup, you are likely right.
   
   I'm testing now with
   
   ```
   for dag in dag_45_tasks dag_250_tasks dag_470_tasks dag_10_tasks 
dag_1000_tasks ;do airflowctl dagrun trigger --dag-id $dag --dag-run-id run1; 
done
   for dag in dag_45_tasks dag_250_tasks dag_470_tasks dag_10_tasks 
dag_1000_tasks ;do airflowctl dagrun trigger --dag-id $dag --dag-run-id run2; 
done
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-10-30 Thread via GitHub


kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2479689911


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+isouter=True,
+)
+.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 
0) < DM.max_active_tasks)

Review Comment:
   Hmm, `run_id` is only unique per DAG, but this subquery groups by `run_id` 
alone and the join uses just `TI.run_id`. When one DAG reaches 
`max_active_tasks`, every other DAG that happens to share the same `run_id` 
string (common for scheduled runs) now fails the `< DM.max_active_tasks` check 
and never gets considered. 
   
   Better to group by `(dag_id, run_id)` and join on both columns instead?
   
   
https://github.com/apache/airflow/blob/085b0edfe7c7151517cd727020c13a2834409210/airflow-core/src/airflow/models/dagrun.py#L233



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
 .where(~DM.is_paused)
 .where(TI.state == TaskInstanceState.SCHEDULED)
 .where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+isouter=True,
+)
+.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 
0) < DM.max_active_tasks)
 .options(selectinload(TI.dag_model))
 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
 )
 
+# Create a subquery with row numbers partitioned by run_id.
+#
+# run_id| task_id | priority_weight | row_num
+# --|-|-|
+# dag1_dr1  | task1   | 100 | 1
+# dag1_dr1  | task22  | 90  | 2
+# dag1_dr1  | task5   | 80  | 3
+# dag1_dr1  | task13  | 70  | 4
+# dag2_dr1  | task3   | 95  | 1
+# dag2_dr1  | task1   | 85  | 2
+# dag2_dr1  | task5   | 75  | 3
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=TI.run_id,
+order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <= 
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)
+

Review Comment:
   Similar thing here: The window only partitions on `run_id`, which collides 
across DAGs. Once one DAG consumes its `max_active_tasks`, every other DAG with 
the same `run_id` is filtered out by `row_num <= dr_max_active_tasks`, even if 
they have zero active tasks. 
   
   Better would be to partition by both `TI.dag_id` and `TI.run_id` (or the dag 
run PK) before applying the limit so each DAG run keeps its own quota.
   



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session

Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-10-30 Thread via GitHub


ashb commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3470289106

   This is looking super promising. With this enabled it managed to schedule 
22k TIs across 24 different dags in 40min, where as without it the same TI+dag 
count took the scheduler 2hr10m
   
   (Green with, yellow without)
   
   https://github.com/user-attachments/assets/984ef69b-e027-4746-a4d2-87da8a79c833";
 />
   
   https://github.com/user-attachments/assets/ffd0f53f-b959-4cc7-9c40-5d406c2b831a";
 />
   
   Right now it's looking super likely that we just always turn this on. I will 
run more test with different dag heirachy/task dependency layouts patterns 
tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-10-24 Thread via GitHub


ashb commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3444757988

   I'll take a good look at this starting Monday.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-10-18 Thread via GitHub


Asquator commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3418225617

   > The issues is that the algo has to be performant (at least as performant 
as the current code if not better)
   
   Good point, can you do a control-group benchmark for a workload with no 
limits at all? Similar to what I did here:
   https://github.com/apache/airflow/pull/55537#issuecomment-3325641701
   
   You've shown above that it does a great improvement for your case, but we 
have to understand the general case too. From my experience with WFs/lateral 
joins in the scheduler, they are somewhat slower.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-10-18 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3418600414

   @Asquator I ran my benchmarks with no limits applied.
   
   Total iterations
   * with the patch: 361
   * original code: 344
   
   Time needed to run 4000 tasks
   * with the patch: 240 secs (4 mins)
   * original code: 255 secs (4 mins and 15 secs)
   
   Everything else was pretty much the same as you can see in the screenshot.
   
   https://github.com/user-attachments/assets/5ead6ed2-3304-42f0-86de-6eb1f5d9aaf9";
 />
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-10-18 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3417939313

   @Asquator This patch improves the existing implementation and increases 
performance.
   
   From all the discussions so far, the way I see it, there is no nice way to 
extend the current approach to the point of accounting for all the edge cases. 
So, we have to implement a new selection algo from scratch. We can't do that 
overnight, so why not accept the improvement for now while looking for a new 
way?
   
   The issues is that the algo has to be performant (at least as performant as 
the current code if not better) and simple enough for people to understand it 
and be able to debug it. This is open-source and maintainability comes first. 
Performance can later be improved as long as the next guy understands the code 
and can work on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-10-18 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3415686639

   > Static checks still :(
   
   Weird, it's due to the rebase.
   
   @shahar1 Regarding the config flag, I'm waiting to see what others think 
before making the change.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-10-17 Thread via GitHub


Asquator commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3416696572

   Config flag is a debate, and as I understood, it's not desirable because we 
don't want to fix the API.
   
   Anyway, the thing that concerns me the most here is that this implementation 
doesn't fix other starvation cases, and neither it provides a clear path to 
fixing all of them in the future.
   
   Especially, I don't see how https://github.com/apache/airflow/issues/45636 
can be fixed after this one is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-10-17 Thread via GitHub


shahar1 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3415286180

   Static checks still :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-20 Thread via GitHub


uranusjr commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2341843237


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -152,6 +164,16 @@ def _is_parent_process() -> bool:
 return multiprocessing.current_process().name == "MainProcess"
 
 
+def _get_current_dr_task_concurrency(states: Iterable[TaskInstanceState]) -> 
Query:

Review Comment:
   I _think_ this return value is a Subquery, not a Query (honestly the return 
type annotation is pretty useless anyway due to the lack of SQLAlchemy support)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-20 Thread via GitHub


uranusjr commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2341840062


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -121,12 +121,11 @@ core:
   default: "32"
 max_active_tasks_per_dag:
   description: |
-The maximum number of task instances allowed to run concurrently in 
each DAG. To calculate
-the number of tasks that is running concurrently for a DAG, add up the 
number of running
-tasks for all DAG runs of the DAG. This is configurable at the DAG 
level with ``max_active_tasks``,
+The maximum number of task instances allowed to run concurrently in 
each DAG_RUN.
+This is configurable at the DAG level with ``max_active_tasks``,
 which is defaulted as ``[core] max_active_tasks_per_dag``.
 
-An example scenario when this would be useful is when you want to stop 
a new dag with an early
+An example scenario when this would be useful is when you want to stop 
a new dag_run with an early

Review Comment:
   ```suggestion
   An example scenario when this would be useful is when you want to 
stop a new dag run with an early
   ```



##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -121,12 +121,11 @@ core:
   default: "32"
 max_active_tasks_per_dag:
   description: |
-The maximum number of task instances allowed to run concurrently in 
each DAG. To calculate
-the number of tasks that is running concurrently for a DAG, add up the 
number of running
-tasks for all DAG runs of the DAG. This is configurable at the DAG 
level with ``max_active_tasks``,
+The maximum number of task instances allowed to run concurrently in 
each DAG_RUN.
+This is configurable at the DAG level with ``max_active_tasks``,

Review Comment:
   ```suggestion
   The maximum number of task instances allowed to run concurrently in 
each dag run.
   This is also configurable per-dag with ``max_active_tasks``,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-20 Thread via GitHub


Asquator commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3315173200

   > @shahar1 Thank you for the review!
   > 
   > > configurable with a flag (opt-in),
   > 
   > That's fine by me. I had a flag in my initial POC, I can add it back.
   > 
   > 
[e940864](https://github.com/apache/airflow/commit/e940864d175c0876c11c6362f6eff1167a6d5982)
   
   I think it's a great opportunity to also take this logic out of the 
scheduler job class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-20 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3314974917

   @shahar1 Thank you for the review!
   
   > configurable with a flag (opt-in),
   
   That's fine by me. I had a flag in my initial POC, I can add it back.
   
   
https://github.com/apache/airflow/pull/54103/commits/e940864d175c0876c11c6362f6eff1167a6d5982
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-17 Thread via GitHub


Asquator commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3304768333

   Hey @xBis7,
   This is definitely a starvation problem that causes the scheduler to queue 
less than `max_tis` tasks in every iteration. The starvation simply comes from 
the edge cases where you have SO much tasks that this inefficiency turns to a 
complete hell where tasks are created faster than they can be queued, simply 
because the scheduler picks 1-2 tasks in every cycle (priorities and similar 
sorting patterns may be involved).
   
   We originally experienced this issue with pools: 
https://github.com/apache/airflow/issues/45636
   It's conceptually the same problem you presented, but one that stems from 
the pool slots limit rather than `max_active_tasks`. We wanted to open a very 
similar PR to address our case, but soon realized the problem is a bit wider, 
and our fix wouldn't have solved your problem for instance.
   
   We started digging deeper and found out that starvation may come from all 
kinds the limits:
   1. pool slots (our case)
   2. `max_active_tasks` (your case)
   3. `max_active_tis_per_dag`
   4. `max_active_tis_per_dagrun`
   5. _executor slots_ (slightly different problem, local per scheduler)
   
   So we gave up the idea of solving one specific case with pools simply 
because it would benefit only part of the users. We wanted a global solution. 
https://github.com/apache/airflow/pull/53492 was born to do the thing you've 
done here, but for ALL the limits using window functions/lateral joins. It 
turned out in the end that applying this logic to multiple limits that are 
orthogonal to each other is kind of impossible in SQL (for instance,  
`max_active_tis_per_dagrun` is a sub-limit to `max_active_tis_per_dag` and 
`max_active_tasks`, but pool slots is a limit orthogonal to all others - 
meaning there's no logical nesting between them). Won't bring up all the 
reasoning here, but we tried VERY hard to make it work. There are still cases 
where this logic fails, in addition to SQL being uncapable of optimizing 
multiple WFs so it can run fast.
   
   This way https://github.com/apache/airflow/pull/55537 was born, and I'll 
soon give a notice of it on mail (need some time to make it robust and ready 
for review). It seems to solve all cases 1-4, but has some drawbacks that have 
to be addressed.
   
   Could you please kindly share the exact workloads used for benchmarks? I 
mean the source code.
   It would be great to agree on some "generic" workloads so we can test 
different proposals using the same DAGs.
   Numbers will be different due to hardware differences, but we only care 
about the ratios here.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-17 Thread via GitHub


Nataneljpwd commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3300201564

   Hello @xBis7, what is the goal of the PR? it seems to solve the issue that 
someone on the mailing list had with the starvation, however, even here it does 
not fully do it.
   If the goal is to increase throughput, why not delegate ALL the work to the 
sql query?
   It seems weird to only delegate 1 check to sql, which is used equally as 
much as `max_active_tis_per_dagrun` and `dag_max_active_tasks`, no?
   
   It just looks like you are trying to solve a single issue that someone in 
the mailing list was experiencing, which happens to increase the scheduler 
throughput, and it seems like a sub problem of the greater problem of general 
starvation in airflow.
   
   Why shouldn't we add the other checks? 
   Why did you decide specifically to add the `max_active_tasks` check only? it 
seems weird to me that you chose one limit, did you try other limits first? or 
did you ever try to do it on multiple limits together?
   
   And it seems to increase scheduler throughput, up until the point you have 
mapped tasks, or a lot of tasks, or tasks which belong to a starved pool, where 
you will still choose multiple tasks that will be dropped in the loop, and 
cause more iterations, however, this time the query takes twice the amount of 
time.
   
   An example is as follows:
   Task A goes to pool A, 128 mapped tasks, short running
   Task B goes to pool B 50 mapped tasks
   Pool A 80 free slots
   Pool B 1 free slot
   Same dag, and dagrun, max_active_tasks set to 10
   
   Here, the tasks will sort in an alternating pattern, lets suppose Task A is 
first for whatever reason, does not really matter.
   We will get 10 tasks, 5 from A, 5 from B, run only one from B, and suppose, 
Task A falls because of mapped task count limit, and we only schedule 1 task.
   Next run, another task (or two) from Pool B is freed, the same thing happens 
again, we schedule only 1 or 2 tasks, and drop the other selected tasks.
   
   This issue can occur for any other concurrency limit such as 
`max_active_tis_per_dag`, or pool slot count.
   
   I just don't think I fully understood the goal of the PR, and so, if you 
coud clarify, it would be appreciated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-16 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3301379665

   > it seems to solve the issue that someone on the mailing list had with the 
starvation, however, even here it does not fully do it.
   >
   >
   > It just looks like you are trying to solve a single issue that someone in 
the mailing list was experiencing, which happens to increase the scheduler 
throughput, and it seems like a sub problem of the greater problem of general 
starvation in airflow.
   
   Hi @Nataneljpwd, we haven't experienced any noticeable starvation issues and 
we hadn't been concerned about it until you brought it up on the mailing list. 
From what I see, the scheduler is designed to select tasks in FIFO order while 
it prioritizes some tasks that have a higher priority weight. All tasks are 
running sooner or later. I haven't been trying to change the order by which 
tasks are selected.
   
   > If the goal is to increase throughput, why not delegate ALL the work to 
the sql query?
   > 
   > Why shouldn't we add the other checks?
   > Why did you decide specifically to add the max_active_tasks check only? it 
seems weird to me that you chose one limit, did you try other limits first? or 
did you ever try to do it on multiple limits together?
   
   Let me give you a bit of context. We noticed that the scheduler and the 
workers didn't use as much CPU as expected, even when configured properly to do 
so. After some research, we found a few inefficiencies in the scheduler queuing 
process and noticed that a lot of iterations aren't doing much work even though 
there is load on the system.
   
   Here is an example, assuming that the scheduler can get 30 tasks from the 
query, examine them and queue them. There are 3-5 dags with 100 tasks each. We 
can run at max 10 tasks for each dag_run.
   
   The current query will get 30 tasks from the 1st dag, queue 10 and discard 
the rest. It will do a few more iterations to realize that we have reached max 
tasks for that dag_run and should examine the next in line. But if it takes 
into consideration the limit along with the current number of active tasks, it 
will only get the number of tasks that can actually be queued and then move on 
to the next dag_run based on FIFO and priorities. It will keep doing that until 
it reaches 30 tasks which all will be valid for queuing, all in 1 iteration 
instead of multiple ones.
   
   > And it seems to increase scheduler throughput, up until the point you have 
mapped tasks, or a lot of tasks, or tasks which belong to a starved pool, where 
you will still choose multiple tasks that will be dropped in the loop, and 
cause more iterations, however, this time the query takes twice the amount of 
time.
   
   If you take a look at the unit tests and the benchmarks, you will see that 
that's not the case. Yes, the query takes twice the time but the iterations are 
far less because it actually queues everything the query returns. The more the 
tasks, the greater the performance improvement.
   
   Additionally, we have tested this in production with a huge load of tasks. 
One of our dags with mapped tasks might end up with 10.000 tasks.
   
   > Task A goes to pool A, 128 mapped tasks, short running
   > Task B goes to pool B 50 mapped tasks
   
   I think here you mean `dag A` and `dag B`.
   
   > Here, the tasks will sort in an alternating pattern, lets suppose Task A 
is first for whatever reason, does not really matter.
   > We will get 10 tasks, 5 from A, 5 from B, run only one from B, and 
suppose, Task A falls because of mapped task count limit, and we only schedule 
1 task.
   > Next run, another task (or two) from Pool B is freed, the same thing 
happens again, we schedule only 1 or 2 tasks, and drop the other selected tasks.
   > 
   > This issue can occur for any other concurrency limit such as 
max_active_tis_per_dag, or pool slot count.
   
   I'm not trying to address this in the current PR. It will continue working 
as it is currently working in the scheduler. The point is that we already have 
these limits and we check them after getting the tasks from the db. If we also 
check them while getting the tasks from the db, we won't spend time looking 
into tasks that can't be queued and the number of required iterations will 
decrease.
   
   > I just don't think I fully understood the goal of the PR, and so, if you 
coud clarify, it would be appreciated.
   
   I hope all the above make sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-12 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3284485855

   > I guess the scheduler loop duration is longer as each one is now doing 
more useful work?
   
   @ashb Yes, it's doing more work because there is an extra condition on the 
query, checking the max number of tasks that we can queue. But it's also more 
efficient, 1 new iteration queues almost as many tasks as 3 old iterations 
would.
   
   If we have reached the max active tasks for a dag run, we stop getting tasks 
to examine from the db and move on to examine the next dag run. That way, the 
iteration doesn't concentrate on examining tasks that it can't queue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-12 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2343521922


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -152,6 +164,16 @@ def _is_parent_process() -> bool:
 return multiprocessing.current_process().name == "MainProcess"
 
 
+def _get_current_dr_task_concurrency(states: Iterable[TaskInstanceState]) -> 
Query:

Review Comment:
   You are right. I fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-12 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3284396016

   @uranusjr Thank you for the review! I addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-12 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2343454124


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -121,12 +121,11 @@ core:
   default: "32"
 max_active_tasks_per_dag:
   description: |
-The maximum number of task instances allowed to run concurrently in 
each DAG. To calculate
-the number of tasks that is running concurrently for a DAG, add up the 
number of running
-tasks for all DAG runs of the DAG. This is configurable at the DAG 
level with ``max_active_tasks``,
+The maximum number of task instances allowed to run concurrently in 
each DAG_RUN.
+This is configurable at the DAG level with ``max_active_tasks``,
 which is defaulted as ``[core] max_active_tasks_per_dag``.
 
-An example scenario when this would be useful is when you want to stop 
a new dag with an early
+An example scenario when this would be useful is when you want to stop 
a new dag_run with an early

Review Comment:
   Done.



##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -121,12 +121,11 @@ core:
   default: "32"
 max_active_tasks_per_dag:
   description: |
-The maximum number of task instances allowed to run concurrently in 
each DAG. To calculate
-the number of tasks that is running concurrently for a DAG, add up the 
number of running
-tasks for all DAG runs of the DAG. This is configurable at the DAG 
level with ``max_active_tasks``,
+The maximum number of task instances allowed to run concurrently in 
each DAG_RUN.
+This is configurable at the DAG level with ``max_active_tasks``,

Review Comment:
   Made the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-12 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2343372833


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -121,12 +121,11 @@ core:
   default: "32"
 max_active_tasks_per_dag:
   description: |
-The maximum number of task instances allowed to run concurrently in 
each DAG. To calculate
-the number of tasks that is running concurrently for a DAG, add up the 
number of running
-tasks for all DAG runs of the DAG. This is configurable at the DAG 
level with ``max_active_tasks``,
+The maximum number of task instances allowed to run concurrently in 
each DAG_RUN.
+This is configurable at the DAG level with ``max_active_tasks``,

Review Comment:
   `This is configurable at the DAG level with ``max_active_tasks``,` is 
directly from main.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-12 Thread via GitHub


ashb commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3284201824

   I guess the scheduler loop duration is longer as each one is now doing more 
useful work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-11 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3281650048

   > That gave me a testing idea. I'm going to load 100 dags with at least 500 
tasks each on the db and then re-capture the metrics to see how that affects 
the loop's performance.
   
   I added 99 dags with 1000 tasks and each dag had 1 more task than the 
previous, e.g. 1001, 1002, 1003, ..., 1099. The scheduler had roughly 100.000 
tasks to go over from the db. 
   
   I triggered the same 6 dags as before and the numbers were pretty much the 
same.
   
   * total number of scheduler iterations
   * with the patch, 
   * try 1: 517 iterations
   * try 2: 550 iterations
   * try 3: 542 iterations
   * original code, 
   * try 1: 1445 iterations
   * try 2: 1507 iterations
   * try 3: 1463 iterations
   * total time
   * with the patch, 
   * try 1: 514.31 s
   * try 2: 498.51 s
   * try 3: 500.37 s
   * original code, 
   * try 1: 792.8 s
   * try 2: 811.82 s
   * try 3: 798.96 s
   * average time per iteration
   * with the patch, 
   * try 1: 0.99 s
   * try 2: 0.9 s
   * try 3: 0.92 s
   * original code,
   * try 1: 0.54 s
   * try 2: 0.538 s
   * try 3: 0,546 s
   
   I ran the test 3 times and as you can see there wasn't much deviation in the 
timings. 
   
   https://github.com/user-attachments/assets/1cca83a5-39a9-4683-a606-484142f4ab0c";
 />
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-09-11 Thread via GitHub


uranusjr commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2341843237


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -152,6 +164,16 @@ def _is_parent_process() -> bool:
 return multiprocessing.current_process().name == "MainProcess"
 
 
+def _get_current_dr_task_concurrency(states: Iterable[TaskInstanceState]) -> 
Query:

Review Comment:
   I _think_ this return value is a Statement, not a Query (honestly the return 
type annotation is pretty useless anyway due to the lack of SQLAlchemy support)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-08-19 Thread via GitHub


ashb commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2285132956


##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -180,6 +180,39 @@ def _create_dagrun(
 return _create_dagrun
 
 
+def task_maker(dag_maker, session, dag_id: str, task_num: int, 
max_active_tasks: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0:  # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}", 
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+# 'logical_date' is used to create the 'run_id'. Set it to 'now', in order 
to get distinct run ids.
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
logical_date=timezone.utcnow())
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] = 
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# Add to the list.
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = max_active_tasks
+
+session.flush()
+
+return tis_list

Review Comment:
   Yeah that makes sense. I first read it as a test that wasn't testing much 
and was confused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-08-18 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3199126239

   @ashb Thank you for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-08-18 Thread via GitHub


xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2284001691


##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -180,6 +180,39 @@ def _create_dagrun(
 return _create_dagrun
 
 
+def task_maker(dag_maker, session, dag_id: str, task_num: int, 
max_active_tasks: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0:  # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}", 
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+# 'logical_date' is used to create the 'run_id'. Set it to 'now', in order 
to get distinct run ids.
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
logical_date=timezone.utcnow())
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] = 
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# Add to the list.
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = max_active_tasks
+
+session.flush()
+
+return tis_list

Review Comment:
   It's similar to the `dag_maker` but for creating tasks. Apart from the 
creation, it also sets them all to `SCHEDULED` and assigns a higher priority to 
every 10th task.
   
   I used the priority weights to verify that higher priority tasks get 
examined and queued first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-08-18 Thread via GitHub


ashb commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2283387540


##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -180,6 +180,39 @@ def _create_dagrun(
 return _create_dagrun
 
 
+def task_maker(dag_maker, session, dag_id: str, task_num: int, 
max_active_tasks: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0:  # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}", 
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+# 'logical_date' is used to create the 'run_id'. Set it to 'now', in order 
to get distinct run ids.
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
logical_date=timezone.utcnow())
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] = 
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# Add to the list.
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = max_active_tasks
+
+session.flush()
+
+return tis_list

Review Comment:
   Ohhh. I read that as `test_maker` 🤦🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-08-18 Thread via GitHub


ashb commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2283385792


##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -180,6 +180,39 @@ def _create_dagrun(
 return _create_dagrun
 
 
+def task_maker(dag_maker, session, dag_id: str, task_num: int, 
max_active_tasks: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0:  # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}", 
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+# 'logical_date' is used to create the 'run_id'. Set it to 'now', in order 
to get distinct run ids.
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
logical_date=timezone.utcnow())
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] = 
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# Add to the list.
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = max_active_tasks
+
+session.flush()
+
+return tis_list

Review Comment:
   Is this worth keeping?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-08-15 Thread via GitHub


ashb commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3191128408

   Yes, that was fixed on main, a rebase should fix it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-08-15 Thread via GitHub


ashb commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3191129722

   @dstandish Do you have context to usefully review this? I think you were 
looking at this area recently?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]

2025-08-14 Thread via GitHub


xBis7 commented on PR #54103:
URL: https://github.com/apache/airflow/pull/54103#issuecomment-3190567184

   This failure on the CI
   
   ```
   FAILED 
airflow-core/tests/unit/models/test_renderedtifields.py::TestRenderedTaskInstanceFields::test_rtif_deletion_stale_data_error
 - Failed: Timeout >60.0s
   ```
   
   seems unrelated. I ran this test locally with breeze for postgres, mysql and 
sqlite and it passed without errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]