Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3812809548 @kaxil @vatsrahul1001 @shahar1 @ashb Thank you for the help, the reviews and the testing! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3812806631 Great work here @xBis7 and also for persisting for this long (~6 months). #protm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3812803341 🎉 Merged to let it bake in -- so we will know if something is up before 3.2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil merged PR #54103: URL: https://github.com/apache/airflow/pull/54103 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
vatsrahul1001 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3812745284 > Hi @vatsrahul1001, has there been any update on your testing? @xBis7 - Apologies for the delay. Completed testing now - this is a significant improvement! I used 10 DAGs with 100 parallel tasks each, `max_active_tasks=10` | Metric | main | PR | ||--|-| | Starved DAG events | 141 | 0 | With this PR, the scheduler no longer fetches tasks it cannot queue. On main, tasks get fetched and then rejected when the DAG's `max_active_tasks` limit is already reached - with the PR, that check happens in the SQL query itself. Looking at the logs: - **Before:** Query pulls tasks mostly from same DAG, most get rejected - **After:** Query spreads task selection across multiple DAGs, all get queued Looks like a big win for scheduler efficiency. Also I do not see any regression from this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3810840079 Hi @vatsrahul1001, has there been any update on your testing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2718328310
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
Review Comment:
You don't have to do it concurrently. If we run the starvation checks first
and them apply the `max_active_tasks` limit, that gives us the desired behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3787281253 @vatsrahul1001 Only pending your review/test now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3786303807 @kaxil Thank you again for the review! I added the unit tests that you requested. I ran some tests and you were right about the starvation limits. I moved the checks in the inner query right before computing the row_num and now it works as expected. The new unit test verifies it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2718320758 ## airflow-core/tests/unit/jobs/test_scheduler_job.py: ## @@ -194,6 +194,52 @@ def _create_dagrun( return _create_dagrun +def task_maker( Review Comment: Added unit tests for both cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2708823289
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)
Review Comment:
I made the changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2708777179
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)
Review Comment:
That's a valid point and an oversight on my behalf. Both of these options
exist on the inner query but need to be applied on the outer one.
```
.options(selectinload(TI.dag_model))
```
doesn't have any effect unless applied on the outer query.
About
```
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
```
I'll leave it in the inner and add to the outer as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2708414793 ## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ## @@ -194,6 +207,16 @@ def _is_parent_process() -> bool: return multiprocessing.current_process().name == "MainProcess" +def _get_current_dr_task_concurrency(states: Iterable[TaskInstanceState]) -> Subquery: +"""Get the dag_run IDs and how many tasks are in the provided states for each one.""" Review Comment: I think you are referring to this part on lines 488-490 ```python # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks. concurrency_map = ConcurrencyMap() concurrency_map.load(session=session) ``` It immediately executes the query and gets the results while `_get_current_dr_task_concurrency` creates a subquery that gets included in the main query. The subquery will be executed in a later point which could be immediate but it could also be after a while. The results could be different even after 3 seconds. Is that what you mean? And what I should explain in a comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
Asquator commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3769492158 Overall I see it as a nice improvement, because eventually it DOES give us better throughput at the moment. In the long term, we'll probably have to look at a more universal solution that prevents starvation with all the concurrency limits. I would like to see the benchmarks I published in the procedures PR run on this approach. The one thing that bothers me - what if we include both `max_active_tasks` and pool/per task/across dagruns constraints? I'm looking forward to see this merged and continue investigating into more enhancements for the scheduler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
Asquator commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705612502
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
Review Comment:
I think it's inherent to this solution as it's impossible to check both
simultaneously. This PR does prevent starvation, but only for
`max_active_tasks` limit. Bumped into this too when trying to consider all the
concurrency limits.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3769380365 > @kaxil Thank you for the review! I'm going to add the tests and then run some other tests of my own for your other comments. Sounds good, we are soo-close. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705400293
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)
Review Comment:
Not sure what happened to my earlier comment so re-adding: This new query is
missing `.options(selectinload(TI.dag_model))` and `.with_hint(TI, "USE INDEX
(ti_state)", dialect_name="mysql")` which were on the original query. Every
`ti.dag_model` access later will trigger a separate query - with 50 TIs that's
50+ extra queries per loop.
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -194,6 +207,16 @@ def _is_parent_process() -> bool:
return multiprocessing.current_process().name == "MainProcess"
+def _get_current_dr_task_concurrency(states: Iterable[TaskInstanceState]) ->
Subquery:
+"""Get the dag_run IDs and how many tasks are in the provided states for
each one."""
Review Comment:
This queries the same data as `ConcurrencyMap.load()` which is still called
and used for the check at lines ~680-695. Worth adding a comment explaining why
we keep both? (race condition protection between query time and check time?)
##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -194,6 +194,52 @@ def _create_dagrun(
return _create_dagrun
+def task_maker(
Review Comment:
Couple test cases worth adding:
1. **Starvation filter ordering**: dag run with tasks in mixed pools (some
starved, some not). Verify non-starved pool tasks aren't excluded because
starved-pool tasks consumed row_number slots.
2. **Partial capacity**: dag run with `max_active_tasks=4` where 2 are
already RUNNING + 10 SCHEDULED. Verify query returns only 2 (not 4) for that
run.
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705372034
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)
Review Comment:
(Missed eager loading of `selectinload`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705351661
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
Review Comment:
The row_number ranking happens here before starvation filters
(starved_pools, etc) are applied below. In the original code, those filters
were applied BEFORE the limit.
Tasks in starved pools will consume row_number slots and then get filtered
out, potentially excluding schedulable tasks from the same dag run. Should we
apply starvation filters to the base query before building ranked_query?
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -194,6 +207,16 @@ def _is_parent_process() -> bool:
return multiprocessing.current_process().name == "MainProcess"
Review Comment:
This queries the same data as `ConcurrencyMap.load()` which is still called
and used for the check at lines ~665-680. With SQL-level filtering now in
place, that Python check should mostly pass (barring race conditions). Worth
adding a comment explaining why we keep both?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705351657
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
Review Comment:
This doesn't account for already running/queued tasks. If `max_active_tasks
= 4` and there are already 2 running, we still return up to 4 scheduled tasks,
but can only queue 2.
Should be:
```python
.where(ranked_query.c.row_num <= (ranked_query.c.dr_max_active_tasks -
func.coalesce(ranked_query.c.task_per_dr_count, 0)))
```
You'd need to add `task_per_dr_count` as a column in ranked_query.
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label(
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705351672
##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -1266,6 +1312,71 @@ def
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
]
assert len(b_tis_in_wrong_executor) == 0
+@conf_vars(
+{
+("scheduler", "max_tis_per_query"): "100",
+("scheduler", "max_dagruns_to_create_per_loop"): "10",
+("scheduler", "max_dagruns_per_loop_to_schedule"): "20",
+("core", "parallelism"): "100",
+("core", "max_active_tasks_per_dag"): "4",
+("core", "max_active_runs_per_dag"): "10",
+("core", "default_pool_task_slot_count"): "64",
+}
+)
+def test_per_dr_limit_applied_in_task_query(self, dag_maker,
mock_executors):
+scheduler_job = Job()
+scheduler_job.executor.parallelism = 100
+scheduler_job.executor.slots_available = 70
+scheduler_job.max_tis_per_query = 100
+self.job_runner = SchedulerJobRunner(job=scheduler_job)
+session = settings.Session()
+
+# Use the same run_id.
+task_maker(dag_maker, session, "dag_1300_tasks", 1300, 4, "run1")
+task_maker(dag_maker, session, "dag_1200_tasks", 1200, 4, "run1")
+task_maker(dag_maker, session, "dag_1100_tasks", 1100, 4, "run1")
+task_maker(dag_maker, session, "dag_100_tasks", 100, 4, "run1")
+task_maker(dag_maker, session, "dag_90_tasks", 90, 4, "run1")
+task_maker(dag_maker, session, "dag_80_tasks", 80, 4, "run1")
+
+count = 0
+iterations = 0
+
+from airflow.configuration import conf
+
+task_num = conf.getint("core", "max_active_tasks_per_dag") * 6
+
+# 6 dags * 4 = 24.
+assert task_num == 24
+
+queued_tis = None
+while count < task_num:
+# Use `_executable_task_instances_to_queued` because it returns a
list of TIs
+# while `_critical_section_enqueue_task_instances` just returns
the number of the TIs.
+queued_tis = self.job_runner._executable_task_instances_to_queued(
+max_tis=scheduler_job.executor.slots_available, session=session
+)
+count += len(queued_tis)
+iterations += 1
+
+assert iterations == 1
+assert count == task_num
+
+assert queued_tis is not None
+
+dag_counts = Counter(ti.dag_id for ti in queued_tis)
+
+# Tasks from all 6 dags should have been queued.
+assert len(dag_counts) == 6
+assert dag_counts == {
+"dag_1300_tasks": 4,
+"dag_1200_tasks": 4,
+"dag_1100_tasks": 4,
+"dag_100_tasks": 4,
+"dag_90_tasks": 4,
+"dag_80_tasks": 4,
+}, "Count for each dag_id should be 4 but it isn't"
+
def test_find_executable_task_instances_order_priority_with_pools(self,
dag_maker):
Review Comment:
Couple test cases worth adding:
1. **Starvation filter ordering**: dag run with tasks in mixed pools (some
starved, some not). Verify non-starved pool tasks aren't excluded because
starved-pool tasks consumed row_number slots.
2. **Partial capacity**: dag run with `max_active_tasks=4` where 2 are
already RUNNING + 10 SCHEDULED. Verify query returns only 2 (not 4) for that
run.
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -515,7 +590,13 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
try:
locked_query = with_row_locks(query, of=TI, session=session,
skip_locked=True)
-task_instances_to_examine: list[TI] =
list(session.scalars(locked_query).all())
+task_instances_to_examine = session.scalars(locked_query).all()
+
+self.log.debug("Length of the tis to examine is %d",
len(task_instances_to_examine))
+self.log.debug(
+"TaskInstance selection is: %s",
+dict(Counter(ti.dag_id for ti in
task_instances_to_examine)),
+)
timer.stop(send=True)
Review Comment:
nit: The `Counter()` iteration runs even when debug logging is disabled. If
we're optimizing for perf:
```python
if self.log.isEnabledFor(logging.DEBUG):
self.log.debug(...)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705351655
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)
+
if starved_pools:
query = query.where(TI.pool.not_in(starved_pools))
Review Comment:
This new query is missing `.options(selectinload(TI.dag_model))` which was
on the original query above. When we rebuild the query here, we lose the eager
loading - so every access to `ti.dag_model` later will trigger a separate
query. With 50 TIs that's 50+ extra queries per loop, partially negating the
perf gains.
Also missing `.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")`
- should add both here.
##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -1266,6 +1312,71 @@ def
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
]
assert len(b_tis_in_wrong_executor) == 0
+@conf_vars(
+{
+("scheduler", "max_tis_per_query"): "100",
+("scheduler", "max_dagruns_to_create_per_loop"): "10",
+("scheduler", "max_dagruns_per_loop_to_schedule"): "20",
+("core", "parallelism"): "100",
+("core", "max_active_tasks_per_dag"): "4",
+("core", "max_active_runs_per_dag"): "10",
+("core", "default_pool_task_slot_count"): "64",
+}
+)
+def test_per_dr_limit_applied_in_task_query(self, dag_maker,
mock_executors):
+scheduler_job = Job()
+scheduler_job.executor.parallelism = 100
+scheduler_job.executor.slots_available = 70
+scheduler_job.max_tis_per_query = 100
+self.job_runner = SchedulerJobRunner(job=scheduler_job)
+session = settings.Session()
+
+# Use the same run_id.
+task_maker(dag_maker, session, "dag_1300_tasks", 1300, 4, "run1")
+task_maker(dag_maker, session, "dag_1200_tasks", 1200, 4, "run1")
+task_maker(dag_maker, session, "dag_1100_tasks", 1100, 4, "run1")
+task_maker(dag_maker, session, "dag_100_tasks", 100, 4, "run1")
+task_maker(dag_maker, session, "dag_90_tasks", 90, 4, "run1")
+
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705247075
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)
Review Comment:
This new query is missing `.options(selectinload(TI.dag_model))` which was
on the original query above. When we rebuild the query here, we lose the eager
loading - so every access to `ti.dag_model` later will trigger a separate
query. With 50 TIs that's 50+ extra queries per loop, which partially negates
the perf gains from this PR.
Also missing the `.with_hint(TI, "USE INDEX (ti_state)",
dialect_name="mysql")` - should add both here.
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -515,7 +590,13 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
try:
locked_query = with_row_locks(query, of=TI, session=session,
skip_locked=True)
-task_instances_to_examine: list[TI] =
list(session.scalars(locked_query).all())
+task_instances_to_examine = session.scalars(locked_query).all()
+
+self.log.debug("Length of the tis to examine is %d",
len(task_instances_to_examine))
+self.log.debug(
+"TaskInstance selection is: %s",
Review Comment:
nit: The `Counter()` iteration happens even when debug logging is disabled.
Not a big deal but if we're optimizing for perf, might want to guard this:
```python
if self.log.isEnabledFor(logging.DEBUG):
self.log.debug(...)
```
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -194,6 +207,16 @@ def _is_parent_process() -> bool:
return multiprocessing.current_process().name == "MainProcess"
Review Comment:
This queries the same data as `ConcurrencyMap.load()` which is still called
and used for the check at lines 617-634. With the SQL-level filtering now in
place, that Python check should always pass (barring race conditions). Worth
adding a comment explaining why we keep both - race condition protection
between query time and check time?
##
airflow-core/src/airflow/jobs/scheduler_jo
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705324006
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -515,7 +590,13 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
try:
locked_query = with_row_locks(query, of=TI, session=session,
skip_locked=True)
-task_instances_to_examine: list[TI] =
list(session.scalars(locked_query).all())
+task_instances_to_examine = session.scalars(locked_query).all()
+
+self.log.debug("Length of the tis to examine is %d",
len(task_instances_to_examine))
+self.log.debug(
+"TaskInstance selection is: %s",
Review Comment:
It makes sense. I fixed it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3769053071 @kaxil Thank you for the review! I'm going to add the tests and then run some other tests of my own for your other comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705274366
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
Review Comment:
The row_number ranking is applied here before the starvation filters
(starved_pools, starved_dags, etc) are added below. In the original code, those
filters were applied BEFORE the limit.
This means tasks in starved pools will consume row_number slots and then get
filtered out later, potentially excluding schedulable tasks from the same dag
run that would have fit within the limit.
Should we apply starvation filters to the base query before building
ranked_query?
##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -1266,6 +1312,71 @@ def
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
]
assert len(b_tis_in_wrong_executor) == 0
+@conf_vars(
+{
+("scheduler", "max_tis_per_query"): "100",
+("scheduler", "max_dagruns_to_create_per_loop"): "10",
+("scheduler", "max_dagruns_per_loop_to_schedule"): "20",
+("core", "parallelism"): "100",
+("core", "max_active_tasks_per_dag"): "4",
+("core", "max_active_runs_per_dag"): "10",
+("core", "default_pool_task_slot_count"): "64",
+}
+)
+def test_per_dr_limit_applied_in_task_query(self, dag_maker,
mock_executors):
+scheduler_job = Job()
+scheduler_job.executor.parallelism = 100
+scheduler_job.executor.slots_available = 70
+scheduler_job.max_tis_per_query = 100
+self.job_runner = SchedulerJobRunner(job=scheduler_job)
+session = settings.Session()
+
+# Use the same run_id.
+task_maker(dag_maker, session, "dag_1300_tasks", 1300, 4, "run1")
+task_maker(dag_maker, session, "dag_1200_tasks", 1200, 4, "run1")
+task_maker(dag_maker, session, "dag_1100_tasks", 1100, 4, "run1")
+task_maker(dag_maker, session, "dag_100_tasks", 100, 4, "run1")
+task_maker(dag_maker, session, "dag_90_tasks", 90, 4, "run1")
+task_maker(dag_maker, session, "dag_80_tasks", 80, 4, "run1")
+
+count = 0
+iterations = 0
+
+from airflow.configuration import conf
+
+task_num = conf.getint("core", "max_active_tasks_per_dag") * 6
+
+# 6 dags * 4 = 24.
+assert task_num == 24
+
+queued_tis = None
+while count < task_num:
+# Use `_executable_task_instances_to_queued` because it returns a
list of TIs
+# while `_critical_section_enqueue_task_instances` just returns
the number of the TIs.
+queued_tis = self.job_runner._executable_task_instances_to_queued(
+max_tis=scheduler_job.executor.slots_available, session=session
+)
+count += len(queued_tis)
+iterations += 1
+
+assert iterations == 1
+assert count == task_num
+
+assert queued_tis is not None
+
+dag_counts = Counter(ti.dag_id for ti in queued_tis)
+
+# Tasks from all 6 dags should have been queued.
+assert len(dag_counts) == 6
+assert dag_counts == {
+"dag_1300_tasks": 4,
+"dag_1200_tasks": 4,
+"dag_1100_tasks": 4,
+
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705256990
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)
Review Comment:
So, probably
```py
query = (
select(TI)
.select_from(ranked_query)
.join(
TI,
(TI.dag_id == ranked_query.c.dag_id)
& (TI.task_id == ranked_query.c.task_id)
& (TI.run_id == ranked_query.c.run_id)
& (TI.map_index == ranked_query.c.map_index),
)
.where(ranked_query.c.row_num <= ranked_query.c.dr_max_active_tasks)
.order_by(
-ranked_query.c.priority_weight_for_ordering,
ranked_query.c.logical_date_for_ordering,
ranked_query.c.map_index_for_ordering,
)
.options(selectinload(TI.dag_model))
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2705247082 ## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ## @@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - .where(~DM.is_paused) .where(TI.state == TaskInstanceState.SCHEDULED) .where(DM.bundle_name.is_not(None)) +.join( +dr_task_concurrency_subquery, +and_( +TI.dag_id == dr_task_concurrency_subquery.c.dag_id, Review Comment: Nice - the outer join with COALESCE handles dag runs with 0 executing tasks cleanly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705235070
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
Review Comment:
Somewhere here I think you need `.options(selectinload(TI.dag_model))`
otherwise every access to `task_instance.dag_model` later in the code will
trigger a separate query.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705247082
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
Review Comment:
Nice - the outer join with COALESCE handles dag runs with 0 executing tasks
cleanly.
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -194,6 +207,16 @@ def _is_parent_process() -> bool:
return multiprocessing.current_process().name == "MainProcess"
Review Comment:
This queries the same data as `ConcurrencyMap.load()` which is still called
and used for the check at lines 617-634. With the SQL-level filtering now in
place, that Python check should always pass (barring race conditions). Worth
adding a comment explaining why we keep both - race condition protection
between query time and check time?
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)
Review Comment:
This new query is missing `.options(selectinload(TI.dag_model))` which was
on the original query above. When we rebuild the query here, we lose the eager
loading - so every access to `ti.dag_model` later will trigger a separate
query. With 50 TIs that's 50+ extra queries per loop, which partially negates
the perf gains from this PR.
Also missing the `.with_hint(TI, "USE INDEX (ti_state)",
dialect_name="mysql")` - should add both here.
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -515,7 +590,13 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
try:
locked_query = with_row_locks(query, of=TI, session=session,
skip_locked=True)
-task_instances_to_examine: list[TI] =
list(ses
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705235070
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
Review Comment:
Somewhere here I think you need `.options(selectinload(TI.dag_model))`
otherwise every access to `task_instance.dag_model` later in the code will
trigger a separate query.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3767137353 @Nataneljpwd Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3745522411 @vatsrahul1001 Thanks! Please let me know if there is something I can do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
vatsrahul1001 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3745504675 @xBis7 Thanks for the PR. I plan to run some test on this before we merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2644030886
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -488,10 +513,58 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count,
0) < DM.max_active_tasks)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
Review Comment:
I've run tests and this change doesn't seem to make any difference so far.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2640505174
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -488,10 +513,58 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count,
0) < DM.max_active_tasks)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
Review Comment:
I'm going to give this a try and run some tests. If it alters the scheduler
behavior too much then it might not be worth it for now.
The current patch doesn't fix everything but provides certain performance
improvements and it has been very thoroughly tested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2639198443
##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -1180,6 +1226,102 @@ def
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
]
assert len(b_tis_in_wrong_executor) == 0
+def task_helper(self, dag_maker, session, dag_id: str, task_num: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0: # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}",
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] =
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# add
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = 4
+
+session.flush()
+
+return tis_list
Review Comment:
This was a rebase mixup. It has been fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3678912476 I've been running various tests and while doing so, I noticed the metrics that I was getting weren't consistent between consecutive runs. So, I started logging the same values that I was sending to OTel for creating the metrics and I found out that in certain cases, the final numbers were different from what we would see in the Grafana dashboards. For example, I would see in Grafana that the maximum number of active concurrent dag runs was 5 while the actual number was higher and it was 7. The reason for that is that we export the current value at an interval and not all past and present values. The collector gets samples and not a complete list of the values. In order to capture more values for the diagrams, I decreased the sampling step and the export interval by a lot. That made even instant values visible in the diagrams. The higher the load, the more the time the tests take to run, and so we end up with more samples for the metrics which in return lead to a more reliable and representative diagram. A very simple test case that I ran, was triggering 2 dag runs of the same dag with and without the patch. In that scenario, the results are random. Sometimes the patch is faster while others it's not but in all cases the difference is negligible. For example, the scheduler might do 4 iterations with the patch and 5 without, or 5 in both cases or 4 without and 6 with. The timings end up +-1s. ## Testing different topologies I used the following dags * 5 linear dags with 10 tasks each, all in a sequence ``` 0 -> 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8 -> 9 -> 10 ``` * 2 dags with a single root task and 100 downstream tasks all running in parallel ``` 0 -> (1_1, 1_2, 1_3, 1_4, ..., 1_100 -- parallel) ``` * 5 branching dags with 1 root task, that has 5 children tasks, and each child task has 3 children tasks ``` 0 | ___ ||| | | 1_1 1_2 1_3 1_4 1_5 ||| | | ___ ___ ___ | | | | | | | | | | | | | | | 2_12_22_32_42_52_62_72_82_92_10 2_112_122_132_142_15 ``` I triggered all of them and also created multiple dag_runs for most of them. There wasn't much difference in the number of scheduler iterations but it consistently queued the number of tasks that it examined, in contrast to examining a very high number of tasks, only to queue just a few which is what happens without the patch. The difference in the time needed to run the dags, is noticeable. * with the patch * scheduler iterations: 150 * total time: 248.28s * without the patch * scheduler iterations: 162 * total time: 345.64s https://github.com/user-attachments/assets/89793622-2308-4429-bfeb-a6fd24317663"; /> ## Testing heavy load This is the original test where I created multiple dags with all tasks running in parallel without any dependencies between them. * dag_45_tasks * 45 parallel tasks * dag_250_tasks * 250 parallel tasks * dag_470_tasks * 470 parallel tasks * dag_1000_tasks * 1000 parallel tasks * dag_1100_tasks * 1100 parallel tasks * dag_1200_tasks * 1200 parallel tasks I reran this test because I made a lot of changes in my testing infrastructure and I wanted to verify that the new metrics are in line with the old ones that I previously shared. * with the patch * scheduler iterations: 402 * max number of cuncurrent DRs: 6 * total time: 675.25s * without the patch * scheduler iterations: 1343 * max number of cuncurrent DRs: 3 * total time: 1024.49s https://github.com/user-attachments/assets/8f18cfca-3203-4505-aa42-c9769d54d1a3"; /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
Copilot commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2594698573
##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -1180,6 +1226,102 @@ def
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
]
assert len(b_tis_in_wrong_executor) == 0
+def task_helper(self, dag_maker, session, dag_id: str, task_num: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0: # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}",
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] =
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# add
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = 4
+
+session.flush()
+
+return tis_list
Review Comment:
The `task_helper` method (lines 1229-1258) is nearly identical to the
module-level `task_maker` function (lines 184-227). The only significant
differences are:
1. `task_helper` has a hardcoded `max_active_tasks = 4` (line 1254) instead
of accepting it as a parameter
2. `task_helper` doesn't support custom `run_id` parameter
Consider removing this duplicate method and using the `task_maker` function
instead, or refactor to reduce code duplication. For example, the test at line
1271 could use `task_maker` directly.
##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -181,6 +181,52 @@ def _create_dagrun(
return _create_dagrun
+def task_maker(
+dag_maker, session, dag_id: str, task_num: int, max_active_tasks: int,
run_id: str | None = None
+):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0: # 10, 20, 30, 40, 50, ...
Review Comment:
The comment suggests the condition matches tasks at indices 10, 20, 30,
etc., but the condition `(i % 10) == 0` actually matches indices 0, 10, 20, 30,
etc. The comment should be updated to "# 0, 10, 20, 30, 40, ..." to accurately
reflect when the condition is true.
```suggestion
if (i % 10) == 0: # 0, 10, 20, 30, 40, ...
```
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -488,10 +513,58 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+and_(
+TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+),
+isouter=True,
+)
+.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count,
0) < DM.max_active_tasks)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by dag_id and
run_id.
+# Different dags can have the same run_id but
+# the dag_id combined with the run_id uniquely identify a run.
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=[TI.dag_id, TI.run_id],
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3569749637 @uranusjr I made the changes. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
uranusjr commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2555020477
##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -1178,6 +1224,97 @@ def
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
]
assert len(b_tis_in_wrong_executor) == 0
+def task_helper(self, dag_maker, session, dag_id: str, task_num: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0: # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}",
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] =
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# add
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = 4
+
+session.flush()
+
+return tis_list
+
+@conf_vars(
+{
+("scheduler", "max_tis_per_query"): "100",
+("scheduler", "max_dagruns_to_create_per_loop"): "10",
+("scheduler", "max_dagruns_per_loop_to_schedule"): "20",
+("core", "parallelism"): "100",
+("core", "max_active_tasks_per_dag"): "4",
+("core", "max_active_runs_per_dag"): "10",
+("core", "default_pool_task_slot_count"): "64",
+}
+)
+def test_per_dr_limit_applied_in_task_query(self, dag_maker,
mock_executors):
+scheduler_job = Job()
+scheduler_job.executor.parallelism = 100
+scheduler_job.executor.slots_available = 70
+scheduler_job.max_tis_per_query = 100
+self.job_runner = SchedulerJobRunner(job=scheduler_job)
+session = settings.Session()
+
+# Use the same run_id.
+task_maker(dag_maker, session, "dag_1300_tasks", 1300, 4, "run1")
+task_maker(dag_maker, session, "dag_1200_tasks", 1200, 4, "run1")
+task_maker(dag_maker, session, "dag_1100_tasks", 1100, 4, "run1")
+task_maker(dag_maker, session, "dag_100_tasks", 100, 4, "run1")
+task_maker(dag_maker, session, "dag_90_tasks", 90, 4, "run1")
+task_maker(dag_maker, session, "dag_80_tasks", 80, 4, "run1")
+
+count = 0
+iterations = 0
+
+from airflow.configuration import conf
+
+task_num = conf.getint("core", "max_active_tasks_per_dag") * 6
+
+# 6 dags * 4 = 24.
+assert task_num == 24
+
+queued_tis = None
+while count < task_num:
+# Use `_executable_task_instances_to_queued` because it returns a
list of TIs
+# while `_critical_section_enqueue_task_instances` just returns
the number of the TIs.
+queued_tis = self.job_runner._executable_task_instances_to_queued(
+max_tis=scheduler_job.executor.slots_available, session=session
+)
+count += len(queued_tis)
+iterations += 1
+
+assert iterations == 1
+assert count == task_num
+
+assert queued_tis is not None
+
+dag_counts = Counter(ti.dag_id for ti in queued_tis)
+
+# Tasks from all 6 dags should have been queued.
+assert len(dag_counts) == 6
+assert all(count == 4 for count in dag_counts.values()), (
+"Count for each dag_id should be 4 but it isn't"
+)
Review Comment:
It’d probably be easier for debugging if this just do
```python
assert dag_counts == {
"dag_1300_tasks": 4,
"dag_1200_tasks": 4,
...
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
uranusjr commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2555010941 ## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ## @@ -515,6 +588,12 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - locked_query = with_row_locks(query, of=TI, session=session, skip_locked=True) task_instances_to_examine: list[TI] = list(session.scalars(locked_query).all()) Review Comment: ```suggestion task_instances_to_examine = session.scalars(locked_query).all() ``` Not really related, but let’s fix this potentially wasteful line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3553535226 Hi @ashb, is there any update on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
ashb commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3486498022 I'm still (re-)running some tests, but so far I've found this change is either a net benefit to task throughput, or equal. I'm leaning towards this not being behind a config flag right now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2487438728 ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -121,12 +121,11 @@ core: default: "32" max_active_tasks_per_dag: description: | -The maximum number of task instances allowed to run concurrently in each DAG. To calculate -the number of tasks that is running concurrently for a DAG, add up the number of running -tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``, +The maximum number of task instances allowed to run concurrently in each dag run. +This is also configurable per-dag with ``max_active_tasks``, Review Comment: No problem. I reverted the changes and created a separate PR https://github.com/apache/airflow/pull/57770 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
ashb commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2487399540 ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -121,12 +121,11 @@ core: default: "32" max_active_tasks_per_dag: description: | -The maximum number of task instances allowed to run concurrently in each DAG. To calculate -the number of tasks that is running concurrently for a DAG, add up the number of running -tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``, +The maximum number of task instances allowed to run concurrently in each dag run. +This is also configurable per-dag with ``max_active_tasks``, Review Comment: A bit of a pain, but could you pull that out in to a separate PR please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2485361359 ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -121,12 +121,11 @@ core: default: "32" max_active_tasks_per_dag: description: | -The maximum number of task instances allowed to run concurrently in each DAG. To calculate -the number of tasks that is running concurrently for a DAG, add up the number of running -tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``, +The maximum number of task instances allowed to run concurrently in each dag run. +This is also configurable per-dag with ``max_active_tasks``, Review Comment: `max_active_tasks_per_dag` has actually been changed by a previous patch to be applied on a per dag_run base and not per dag. That patch neglected to update the docs as well. When I first made the changes for the current PR I thought that it was per dag until I ran the test suite and realized that it's per dag_run. I just updated the description to make it reflect how it has been working. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2485361359 ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -121,12 +121,11 @@ core: default: "32" max_active_tasks_per_dag: description: | -The maximum number of task instances allowed to run concurrently in each DAG. To calculate -the number of tasks that is running concurrently for a DAG, add up the number of running -tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``, +The maximum number of task instances allowed to run concurrently in each dag run. +This is also configurable per-dag with ``max_active_tasks``, Review Comment: `max_active_tasks_per_dag` has actually been changed by a previous patch to be applied on a per dag_run base and not per dag. That patch neglected to update the docs as well. When I first made the changes for the current PR I thought that it was per dag until I ran the test suite and realized that it's per dag_run. I just updated the description to make it reflect how it currently works. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
uranusjr commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2485314962 ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -121,12 +121,11 @@ core: default: "32" max_active_tasks_per_dag: description: | -The maximum number of task instances allowed to run concurrently in each DAG. To calculate -the number of tasks that is running concurrently for a DAG, add up the number of running -tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``, +The maximum number of task instances allowed to run concurrently in each dag run. +This is also configurable per-dag with ``max_active_tasks``, Review Comment: My understanding is this controls how many tasks _in the entire system_ are running concurrently (for the dag). Is this PR changing this, or just fixing an incorrect description? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2483790152 ## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ## @@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - .where(~DM.is_paused) .where(TI.state == TaskInstanceState.SCHEDULED) .where(DM.bundle_name.is_not(None)) +.join( +dr_task_concurrency_subquery, +TI.run_id == dr_task_concurrency_subquery.c.run_id, +isouter=True, +) +.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 0) < DM.max_active_tasks) Review Comment: I did some testing and here is what I found. Let's assume that we have 3 dags with the same `run_id` and all the checks use only the `run_id`. * the query will return `max_active_tasks` from the 1st dag and ignore the other two because they have the same `run_id` * every consecutive iteration until the tasks from the 1st dag finish, won't pick any new tasks for examining and queueing * once the tasks finish, then it will pick `max_active_tasks` but it will again concentrate on 1 dag It would group all tasks from all dags as if they all belonged to the same dag_run. But the `dag_id, run_id` pair is unique and now the tasks are properly separated. This was a serious logical bug. I don't know how common it will be for multiple `dag_runs` to have the same `run_id` but it's good to have already accounted for that edge case. Thanks @kaxil, great catch! With the suggested changes it works as it should. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2480724286 ## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ## @@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - .where(~DM.is_paused) .where(TI.state == TaskInstanceState.SCHEDULED) .where(DM.bundle_name.is_not(None)) +.join( +dr_task_concurrency_subquery, +TI.run_id == dr_task_concurrency_subquery.c.run_id, +isouter=True, +) +.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 0) < DM.max_active_tasks) Review Comment: You are right. I overlooked that the `run_id` isn't unique across all DAGs. Initially, the query used the `dag_id` until I realized that the behavior for the `max_active_tasks` has been changed and the limit is applied on a per dag_run base and not per dag. I'll do some testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
ashb commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2480697365 ## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ## @@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - .where(~DM.is_paused) .where(TI.state == TaskInstanceState.SCHEDULED) .where(DM.bundle_name.is_not(None)) +.join( +dr_task_concurrency_subquery, +TI.run_id == dr_task_concurrency_subquery.c.run_id, +isouter=True, +) +.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 0) < DM.max_active_tasks) Review Comment: Yup, you are likely right. I'm testing now with ``` for dag in dag_45_tasks dag_250_tasks dag_470_tasks dag_10_tasks dag_1000_tasks ;do airflowctl dagrun trigger --dag-id $dag --dag-run-id run1; done for dag in dag_45_tasks dag_250_tasks dag_470_tasks dag_10_tasks dag_1000_tasks ;do airflowctl dagrun trigger --dag-id $dag --dag-run-id run2; done ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2479689911
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+isouter=True,
+)
+.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count,
0) < DM.max_active_tasks)
Review Comment:
Hmm, `run_id` is only unique per DAG, but this subquery groups by `run_id`
alone and the join uses just `TI.run_id`. When one DAG reaches
`max_active_tasks`, every other DAG that happens to share the same `run_id`
string (common for scheduled runs) now fails the `< DM.max_active_tasks` check
and never gets considered.
Better to group by `(dag_id, run_id)` and join on both columns instead?
https://github.com/apache/airflow/blob/085b0edfe7c7151517cd727020c13a2834409210/airflow-core/src/airflow/models/dagrun.py#L233
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+.join(
+dr_task_concurrency_subquery,
+TI.run_id == dr_task_concurrency_subquery.c.run_id,
+isouter=True,
+)
+.where(func.coalesce(dr_task_concurrency_subquery.c.dr_count,
0) < DM.max_active_tasks)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+# Create a subquery with row numbers partitioned by run_id.
+#
+# run_id| task_id | priority_weight | row_num
+# --|-|-|
+# dag1_dr1 | task1 | 100 | 1
+# dag1_dr1 | task22 | 90 | 2
+# dag1_dr1 | task5 | 80 | 3
+# dag1_dr1 | task13 | 70 | 4
+# dag2_dr1 | task3 | 95 | 1
+# dag2_dr1 | task1 | 85 | 2
+# dag2_dr1 | task5 | 75 | 3
+ranked_query = (
+query.add_columns(
+func.row_number()
+.over(
+partition_by=TI.run_id,
+order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+)
+.label("row_num"),
+DM.max_active_tasks.label("dr_max_active_tasks"),
+# Create columns for the order_by checks here for sqlite.
+TI.priority_weight.label("priority_weight_for_ordering"),
+DR.logical_date.label("logical_date_for_ordering"),
+TI.map_index.label("map_index_for_ordering"),
+)
+).subquery()
+
+# Select only rows where row_number <= max_active_tasks.
+query = (
+select(TI)
+.select_from(ranked_query)
+.join(
+TI,
+(TI.dag_id == ranked_query.c.dag_id)
+& (TI.task_id == ranked_query.c.task_id)
+& (TI.run_id == ranked_query.c.run_id)
+& (TI.map_index == ranked_query.c.map_index),
+)
+.where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+# Add the order_by columns from the ranked query for sqlite.
+.order_by(
+-ranked_query.c.priority_weight_for_ordering,
+ranked_query.c.logical_date_for_ordering,
+ranked_query.c.map_index_for_ordering,
+)
+)
+
Review Comment:
Similar thing here: The window only partitions on `run_id`, which collides
across DAGs. Once one DAG consumes its `max_active_tasks`, every other DAG with
the same `run_id` is filtered out by `row_num <= dr_max_active_tasks`, even if
they have zero active tasks.
Better would be to partition by both `TI.dag_id` and `TI.run_id` (or the dag
run PK) before applying the limit so each DAG run keeps its own quota.
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
ashb commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3470289106 This is looking super promising. With this enabled it managed to schedule 22k TIs across 24 different dags in 40min, where as without it the same TI+dag count took the scheduler 2hr10m (Green with, yellow without) https://github.com/user-attachments/assets/984ef69b-e027-4746-a4d2-87da8a79c833"; /> https://github.com/user-attachments/assets/ffd0f53f-b959-4cc7-9c40-5d406c2b831a"; /> Right now it's looking super likely that we just always turn this on. I will run more test with different dag heirachy/task dependency layouts patterns tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
ashb commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3444757988 I'll take a good look at this starting Monday. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
Asquator commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3418225617 > The issues is that the algo has to be performant (at least as performant as the current code if not better) Good point, can you do a control-group benchmark for a workload with no limits at all? Similar to what I did here: https://github.com/apache/airflow/pull/55537#issuecomment-3325641701 You've shown above that it does a great improvement for your case, but we have to understand the general case too. From my experience with WFs/lateral joins in the scheduler, they are somewhat slower. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3418600414 @Asquator I ran my benchmarks with no limits applied. Total iterations * with the patch: 361 * original code: 344 Time needed to run 4000 tasks * with the patch: 240 secs (4 mins) * original code: 255 secs (4 mins and 15 secs) Everything else was pretty much the same as you can see in the screenshot. https://github.com/user-attachments/assets/5ead6ed2-3304-42f0-86de-6eb1f5d9aaf9"; /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3417939313 @Asquator This patch improves the existing implementation and increases performance. From all the discussions so far, the way I see it, there is no nice way to extend the current approach to the point of accounting for all the edge cases. So, we have to implement a new selection algo from scratch. We can't do that overnight, so why not accept the improvement for now while looking for a new way? The issues is that the algo has to be performant (at least as performant as the current code if not better) and simple enough for people to understand it and be able to debug it. This is open-source and maintainability comes first. Performance can later be improved as long as the next guy understands the code and can work on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3415686639 > Static checks still :( Weird, it's due to the rebase. @shahar1 Regarding the config flag, I'm waiting to see what others think before making the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
Asquator commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3416696572 Config flag is a debate, and as I understood, it's not desirable because we don't want to fix the API. Anyway, the thing that concerns me the most here is that this implementation doesn't fix other starvation cases, and neither it provides a clear path to fixing all of them in the future. Especially, I don't see how https://github.com/apache/airflow/issues/45636 can be fixed after this one is merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
shahar1 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3415286180 Static checks still :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
uranusjr commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2341843237 ## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ## @@ -152,6 +164,16 @@ def _is_parent_process() -> bool: return multiprocessing.current_process().name == "MainProcess" +def _get_current_dr_task_concurrency(states: Iterable[TaskInstanceState]) -> Query: Review Comment: I _think_ this return value is a Subquery, not a Query (honestly the return type annotation is pretty useless anyway due to the lack of SQLAlchemy support) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
uranusjr commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2341840062 ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -121,12 +121,11 @@ core: default: "32" max_active_tasks_per_dag: description: | -The maximum number of task instances allowed to run concurrently in each DAG. To calculate -the number of tasks that is running concurrently for a DAG, add up the number of running -tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``, +The maximum number of task instances allowed to run concurrently in each DAG_RUN. +This is configurable at the DAG level with ``max_active_tasks``, which is defaulted as ``[core] max_active_tasks_per_dag``. -An example scenario when this would be useful is when you want to stop a new dag with an early +An example scenario when this would be useful is when you want to stop a new dag_run with an early Review Comment: ```suggestion An example scenario when this would be useful is when you want to stop a new dag run with an early ``` ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -121,12 +121,11 @@ core: default: "32" max_active_tasks_per_dag: description: | -The maximum number of task instances allowed to run concurrently in each DAG. To calculate -the number of tasks that is running concurrently for a DAG, add up the number of running -tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``, +The maximum number of task instances allowed to run concurrently in each DAG_RUN. +This is configurable at the DAG level with ``max_active_tasks``, Review Comment: ```suggestion The maximum number of task instances allowed to run concurrently in each dag run. This is also configurable per-dag with ``max_active_tasks``, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
Asquator commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3315173200 > @shahar1 Thank you for the review! > > > configurable with a flag (opt-in), > > That's fine by me. I had a flag in my initial POC, I can add it back. > > [e940864](https://github.com/apache/airflow/commit/e940864d175c0876c11c6362f6eff1167a6d5982) I think it's a great opportunity to also take this logic out of the scheduler job class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3314974917 @shahar1 Thank you for the review! > configurable with a flag (opt-in), That's fine by me. I had a flag in my initial POC, I can add it back. https://github.com/apache/airflow/pull/54103/commits/e940864d175c0876c11c6362f6eff1167a6d5982 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
Asquator commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3304768333 Hey @xBis7, This is definitely a starvation problem that causes the scheduler to queue less than `max_tis` tasks in every iteration. The starvation simply comes from the edge cases where you have SO much tasks that this inefficiency turns to a complete hell where tasks are created faster than they can be queued, simply because the scheduler picks 1-2 tasks in every cycle (priorities and similar sorting patterns may be involved). We originally experienced this issue with pools: https://github.com/apache/airflow/issues/45636 It's conceptually the same problem you presented, but one that stems from the pool slots limit rather than `max_active_tasks`. We wanted to open a very similar PR to address our case, but soon realized the problem is a bit wider, and our fix wouldn't have solved your problem for instance. We started digging deeper and found out that starvation may come from all kinds the limits: 1. pool slots (our case) 2. `max_active_tasks` (your case) 3. `max_active_tis_per_dag` 4. `max_active_tis_per_dagrun` 5. _executor slots_ (slightly different problem, local per scheduler) So we gave up the idea of solving one specific case with pools simply because it would benefit only part of the users. We wanted a global solution. https://github.com/apache/airflow/pull/53492 was born to do the thing you've done here, but for ALL the limits using window functions/lateral joins. It turned out in the end that applying this logic to multiple limits that are orthogonal to each other is kind of impossible in SQL (for instance, `max_active_tis_per_dagrun` is a sub-limit to `max_active_tis_per_dag` and `max_active_tasks`, but pool slots is a limit orthogonal to all others - meaning there's no logical nesting between them). Won't bring up all the reasoning here, but we tried VERY hard to make it work. There are still cases where this logic fails, in addition to SQL being uncapable of optimizing multiple WFs so it can run fast. This way https://github.com/apache/airflow/pull/55537 was born, and I'll soon give a notice of it on mail (need some time to make it robust and ready for review). It seems to solve all cases 1-4, but has some drawbacks that have to be addressed. Could you please kindly share the exact workloads used for benchmarks? I mean the source code. It would be great to agree on some "generic" workloads so we can test different proposals using the same DAGs. Numbers will be different due to hardware differences, but we only care about the ratios here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
Nataneljpwd commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3300201564 Hello @xBis7, what is the goal of the PR? it seems to solve the issue that someone on the mailing list had with the starvation, however, even here it does not fully do it. If the goal is to increase throughput, why not delegate ALL the work to the sql query? It seems weird to only delegate 1 check to sql, which is used equally as much as `max_active_tis_per_dagrun` and `dag_max_active_tasks`, no? It just looks like you are trying to solve a single issue that someone in the mailing list was experiencing, which happens to increase the scheduler throughput, and it seems like a sub problem of the greater problem of general starvation in airflow. Why shouldn't we add the other checks? Why did you decide specifically to add the `max_active_tasks` check only? it seems weird to me that you chose one limit, did you try other limits first? or did you ever try to do it on multiple limits together? And it seems to increase scheduler throughput, up until the point you have mapped tasks, or a lot of tasks, or tasks which belong to a starved pool, where you will still choose multiple tasks that will be dropped in the loop, and cause more iterations, however, this time the query takes twice the amount of time. An example is as follows: Task A goes to pool A, 128 mapped tasks, short running Task B goes to pool B 50 mapped tasks Pool A 80 free slots Pool B 1 free slot Same dag, and dagrun, max_active_tasks set to 10 Here, the tasks will sort in an alternating pattern, lets suppose Task A is first for whatever reason, does not really matter. We will get 10 tasks, 5 from A, 5 from B, run only one from B, and suppose, Task A falls because of mapped task count limit, and we only schedule 1 task. Next run, another task (or two) from Pool B is freed, the same thing happens again, we schedule only 1 or 2 tasks, and drop the other selected tasks. This issue can occur for any other concurrency limit such as `max_active_tis_per_dag`, or pool slot count. I just don't think I fully understood the goal of the PR, and so, if you coud clarify, it would be appreciated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3301379665 > it seems to solve the issue that someone on the mailing list had with the starvation, however, even here it does not fully do it. > > > It just looks like you are trying to solve a single issue that someone in the mailing list was experiencing, which happens to increase the scheduler throughput, and it seems like a sub problem of the greater problem of general starvation in airflow. Hi @Nataneljpwd, we haven't experienced any noticeable starvation issues and we hadn't been concerned about it until you brought it up on the mailing list. From what I see, the scheduler is designed to select tasks in FIFO order while it prioritizes some tasks that have a higher priority weight. All tasks are running sooner or later. I haven't been trying to change the order by which tasks are selected. > If the goal is to increase throughput, why not delegate ALL the work to the sql query? > > Why shouldn't we add the other checks? > Why did you decide specifically to add the max_active_tasks check only? it seems weird to me that you chose one limit, did you try other limits first? or did you ever try to do it on multiple limits together? Let me give you a bit of context. We noticed that the scheduler and the workers didn't use as much CPU as expected, even when configured properly to do so. After some research, we found a few inefficiencies in the scheduler queuing process and noticed that a lot of iterations aren't doing much work even though there is load on the system. Here is an example, assuming that the scheduler can get 30 tasks from the query, examine them and queue them. There are 3-5 dags with 100 tasks each. We can run at max 10 tasks for each dag_run. The current query will get 30 tasks from the 1st dag, queue 10 and discard the rest. It will do a few more iterations to realize that we have reached max tasks for that dag_run and should examine the next in line. But if it takes into consideration the limit along with the current number of active tasks, it will only get the number of tasks that can actually be queued and then move on to the next dag_run based on FIFO and priorities. It will keep doing that until it reaches 30 tasks which all will be valid for queuing, all in 1 iteration instead of multiple ones. > And it seems to increase scheduler throughput, up until the point you have mapped tasks, or a lot of tasks, or tasks which belong to a starved pool, where you will still choose multiple tasks that will be dropped in the loop, and cause more iterations, however, this time the query takes twice the amount of time. If you take a look at the unit tests and the benchmarks, you will see that that's not the case. Yes, the query takes twice the time but the iterations are far less because it actually queues everything the query returns. The more the tasks, the greater the performance improvement. Additionally, we have tested this in production with a huge load of tasks. One of our dags with mapped tasks might end up with 10.000 tasks. > Task A goes to pool A, 128 mapped tasks, short running > Task B goes to pool B 50 mapped tasks I think here you mean `dag A` and `dag B`. > Here, the tasks will sort in an alternating pattern, lets suppose Task A is first for whatever reason, does not really matter. > We will get 10 tasks, 5 from A, 5 from B, run only one from B, and suppose, Task A falls because of mapped task count limit, and we only schedule 1 task. > Next run, another task (or two) from Pool B is freed, the same thing happens again, we schedule only 1 or 2 tasks, and drop the other selected tasks. > > This issue can occur for any other concurrency limit such as max_active_tis_per_dag, or pool slot count. I'm not trying to address this in the current PR. It will continue working as it is currently working in the scheduler. The point is that we already have these limits and we check them after getting the tasks from the db. If we also check them while getting the tasks from the db, we won't spend time looking into tasks that can't be queued and the number of required iterations will decrease. > I just don't think I fully understood the goal of the PR, and so, if you coud clarify, it would be appreciated. I hope all the above make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3284485855 > I guess the scheduler loop duration is longer as each one is now doing more useful work? @ashb Yes, it's doing more work because there is an extra condition on the query, checking the max number of tasks that we can queue. But it's also more efficient, 1 new iteration queues almost as many tasks as 3 old iterations would. If we have reached the max active tasks for a dag run, we stop getting tasks to examine from the db and move on to examine the next dag run. That way, the iteration doesn't concentrate on examining tasks that it can't queue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2343521922 ## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ## @@ -152,6 +164,16 @@ def _is_parent_process() -> bool: return multiprocessing.current_process().name == "MainProcess" +def _get_current_dr_task_concurrency(states: Iterable[TaskInstanceState]) -> Query: Review Comment: You are right. I fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3284396016 @uranusjr Thank you for the review! I addressed your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2343454124 ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -121,12 +121,11 @@ core: default: "32" max_active_tasks_per_dag: description: | -The maximum number of task instances allowed to run concurrently in each DAG. To calculate -the number of tasks that is running concurrently for a DAG, add up the number of running -tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``, +The maximum number of task instances allowed to run concurrently in each DAG_RUN. +This is configurable at the DAG level with ``max_active_tasks``, which is defaulted as ``[core] max_active_tasks_per_dag``. -An example scenario when this would be useful is when you want to stop a new dag with an early +An example scenario when this would be useful is when you want to stop a new dag_run with an early Review Comment: Done. ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -121,12 +121,11 @@ core: default: "32" max_active_tasks_per_dag: description: | -The maximum number of task instances allowed to run concurrently in each DAG. To calculate -the number of tasks that is running concurrently for a DAG, add up the number of running -tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``, +The maximum number of task instances allowed to run concurrently in each DAG_RUN. +This is configurable at the DAG level with ``max_active_tasks``, Review Comment: Made the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2343372833 ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -121,12 +121,11 @@ core: default: "32" max_active_tasks_per_dag: description: | -The maximum number of task instances allowed to run concurrently in each DAG. To calculate -the number of tasks that is running concurrently for a DAG, add up the number of running -tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``, +The maximum number of task instances allowed to run concurrently in each DAG_RUN. +This is configurable at the DAG level with ``max_active_tasks``, Review Comment: `This is configurable at the DAG level with ``max_active_tasks``,` is directly from main. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
ashb commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3284201824 I guess the scheduler loop duration is longer as each one is now doing more useful work? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3281650048 > That gave me a testing idea. I'm going to load 100 dags with at least 500 tasks each on the db and then re-capture the metrics to see how that affects the loop's performance. I added 99 dags with 1000 tasks and each dag had 1 more task than the previous, e.g. 1001, 1002, 1003, ..., 1099. The scheduler had roughly 100.000 tasks to go over from the db. I triggered the same 6 dags as before and the numbers were pretty much the same. * total number of scheduler iterations * with the patch, * try 1: 517 iterations * try 2: 550 iterations * try 3: 542 iterations * original code, * try 1: 1445 iterations * try 2: 1507 iterations * try 3: 1463 iterations * total time * with the patch, * try 1: 514.31 s * try 2: 498.51 s * try 3: 500.37 s * original code, * try 1: 792.8 s * try 2: 811.82 s * try 3: 798.96 s * average time per iteration * with the patch, * try 1: 0.99 s * try 2: 0.9 s * try 3: 0.92 s * original code, * try 1: 0.54 s * try 2: 0.538 s * try 3: 0,546 s I ran the test 3 times and as you can see there wasn't much deviation in the timings. https://github.com/user-attachments/assets/1cca83a5-39a9-4683-a606-484142f4ab0c"; /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
uranusjr commented on code in PR #54103: URL: https://github.com/apache/airflow/pull/54103#discussion_r2341843237 ## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ## @@ -152,6 +164,16 @@ def _is_parent_process() -> bool: return multiprocessing.current_process().name == "MainProcess" +def _get_current_dr_task_concurrency(states: Iterable[TaskInstanceState]) -> Query: Review Comment: I _think_ this return value is a Statement, not a Query (honestly the return type annotation is pretty useless anyway due to the lack of SQLAlchemy support) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
ashb commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2285132956
##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -180,6 +180,39 @@ def _create_dagrun(
return _create_dagrun
+def task_maker(dag_maker, session, dag_id: str, task_num: int,
max_active_tasks: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0: # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}",
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+# 'logical_date' is used to create the 'run_id'. Set it to 'now', in order
to get distinct run ids.
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
logical_date=timezone.utcnow())
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] =
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# Add to the list.
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = max_active_tasks
+
+session.flush()
+
+return tis_list
Review Comment:
Yeah that makes sense. I first read it as a test that wasn't testing much
and was confused.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3199126239 @ashb Thank you for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2284001691
##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -180,6 +180,39 @@ def _create_dagrun(
return _create_dagrun
+def task_maker(dag_maker, session, dag_id: str, task_num: int,
max_active_tasks: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0: # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}",
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+# 'logical_date' is used to create the 'run_id'. Set it to 'now', in order
to get distinct run ids.
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
logical_date=timezone.utcnow())
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] =
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# Add to the list.
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = max_active_tasks
+
+session.flush()
+
+return tis_list
Review Comment:
It's similar to the `dag_maker` but for creating tasks. Apart from the
creation, it also sets them all to `SCHEDULED` and assigns a higher priority to
every 10th task.
I used the priority weights to verify that higher priority tasks get
examined and queued first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
ashb commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2283387540
##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -180,6 +180,39 @@ def _create_dagrun(
return _create_dagrun
+def task_maker(dag_maker, session, dag_id: str, task_num: int,
max_active_tasks: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0: # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}",
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+# 'logical_date' is used to create the 'run_id'. Set it to 'now', in order
to get distinct run ids.
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
logical_date=timezone.utcnow())
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] =
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# Add to the list.
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = max_active_tasks
+
+session.flush()
+
+return tis_list
Review Comment:
Ohhh. I read that as `test_maker` 🤦🏻
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
ashb commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2283385792
##
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##
@@ -180,6 +180,39 @@ def _create_dagrun(
return _create_dagrun
+def task_maker(dag_maker, session, dag_id: str, task_num: int,
max_active_tasks: int):
+dag_tasks = {}
+
+with dag_maker(dag_id=dag_id):
+for i in range(task_num):
+# Assign priority weight to certain tasks.
+if (i % 10) == 0: # 10, 20, 30, 40, 50, ...
+weight = int(i / 2)
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}",
priority_weight=weight)
+else:
+# No executor specified, runs on default executor
+dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+# 'logical_date' is used to create the 'run_id'. Set it to 'now', in order
to get distinct run ids.
+dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
logical_date=timezone.utcnow())
+
+task_tis = {}
+
+tis_list = []
+for i in range(task_num):
+task_tis[f"ti{i}"] =
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+# Add to the list.
+tis_list.append(task_tis[f"ti{i}"])
+
+for ti in tis_list:
+ti.state = State.SCHEDULED
+ti.dag_model.max_active_tasks = max_active_tasks
+
+session.flush()
+
+return tis_list
Review Comment:
Is this worth keeping?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
ashb commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3191128408 Yes, that was fixed on main, a rebase should fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
ashb commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3191129722 @dstandish Do you have context to usefully review this? I think you were looking at this area recently? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Include the max_active_tasks limit in the query fetching TIs to be queued [airflow]
xBis7 commented on PR #54103: URL: https://github.com/apache/airflow/pull/54103#issuecomment-3190567184 This failure on the CI ``` FAILED airflow-core/tests/unit/models/test_renderedtifields.py::TestRenderedTaskInstanceFields::test_rtif_deletion_stale_data_error - Failed: Timeout >60.0s ``` seems unrelated. I ran this test locally with breeze for postgres, mysql and sqlite and it passed without errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
