Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
potiuk commented on PR #61769: URL: https://github.com/apache/airflow/pull/61769#issuecomment-4179328038 @SakshamSinghal20 This PR has been converted to **draft** because it does not yet meet our [Pull Request quality criteria](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-quality-criteria). **Issues found:** - :x: **Pre-commit / static checks**: Failing: CI image checks / Static checks. Run `prek run --from-ref main` locally to find and fix issues. See [Pre-commit / static checks docs](https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst). - :x: **mypy (type checking)**: Failing: CI image checks / MyPy checks (mypy-airflow-core). Run `prek --stage manual mypy-airflow-core --all-files` locally to reproduce. You need `breeze ci-image build --python 3.10` for Docker-based mypy. See [mypy (type checking) docs](https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst#mypy-checks). - :x: **Other failing CI checks**: Failing: Postgres tests: core / DB-core:Postgres:14:3.10:API...Serialization, MySQL tests: core / DB-core:MySQL:8.0:3.10:API...Serialization, Sqlite tests: core / DB-core:Sqlite:3.10:API...Serialization, Non-DB tests: core / Non-DB-core::3.10:API...Serialization, Integration and System Tests / Integration core kerberos (+3 more). Run `prek run --from-ref main` locally to reproduce. See [static checks docs](https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst). - :warning: **Unresolved review comments**: This PR has 2 unresolved review threads from maintainers: **@jscheffl** (MEMBER): 1 unresolved threads; **@uranusjr** (MEMBER): 1 unresolved threads. Please review and resolve all inline review comments before requesting another review. You can resolve a conversation by clicking 'Resolve conversation' on each thread after addressing the feedback. See [pull request guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst). > **Note:** Your branch is **1377 commits behind `main`**. Some check failures may be caused by changes in the base branch rather than by your PR. Please rebase your branch and push again to get up-to-date CI results. **What to do next:** - The comment informs you what you need to do. - Fix each issue, then mark the PR as "Ready for review" in the GitHub UI - but only after making sure that all the issues are fixed. - There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. - Maintainers will then proceed with a normal review. Converting a PR to draft is **not** a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the [Airflow Slack](https://s.apache.org/airflow-slack). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
jscheffl commented on PR #61769: URL: https://github.com/apache/airflow/pull/61769#issuecomment-4085991381 Lazy consensus passed. WOuld it be possible to complete the PR for review by the next 24h? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
jscheffl commented on PR #61769: URL: https://github.com/apache/airflow/pull/61769#issuecomment-4064049082 > > "Request changes" mainly to block merge as long as devlist discussion not finalized > > Understood. Called for LAZY CONSENSUS in devlist in https://lists.apache.org/thread/qvnfj09qlktmbpdpcsgkbdb8pdmfzm77 Would it be OK to prepare the PR towards this so that in 3 days it is ready to be merged? As this is quite important for me and our setup, let me know if I shall support in making the changes or fixes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
SakshamSinghal20 commented on PR #61769: URL: https://github.com/apache/airflow/pull/61769#issuecomment-3989824812 > "Request changes" mainly to block merge as long as devlist discussion not finalized Understood. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
jscheffl commented on PR #61769: URL: https://github.com/apache/airflow/pull/61769#issuecomment-3980210006 > @uranusjr I have changed the state to Active. also let me know if i need to do anything else before the PR gets merged. Thank you. Discussion in Devlist is still ongoing. I think the outcome of this influenced the implementation strategy of this PR. Please do not merge before settled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
jscheffl commented on PR #61769: URL: https://github.com/apache/airflow/pull/61769#issuecomment-3940637121 > Hi @jscheffl and @Nataneljpwd! I'm a new contributor and I am very interested in participating in GSoC with this organization this year. I've been reading through the contributing guidelines, but I wanted to ask if there are any specific parts of the codebase you'd recommend I focus on to make a meaningful impact? I'd take a look for https://github.com/apache/airflow/issues?q=is%3Aissue%20state%3Aopen%20label%3A%22good%20first%20issue%22 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
SakshamSinghal20 commented on PR #61769: URL: https://github.com/apache/airflow/pull/61769#issuecomment-3940634969 Hi @jscheffl and @Nataneljpwd! I'm a new contributor and I am very interested in participating in GSoC with this organization this year. I've been reading through the contributing guidelines, but I wanted to ask if there are any specific parts of the codebase you'd recommend I focus on to make a meaningful impact? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
jscheffl commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837431329
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
self.dag_run_active_tasks_map.clear()
self.task_concurrency_map.clear()
self.task_dagrun_concurrency_map.clear()
+# Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to
exclude DEFERRED from dag_run_active_tasks_map
+#while still counting it for task-level limits.
query = session.execute(
-select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
-.where(TI.state.in_(EXECUTION_STATES))
-.group_by(TI.task_id, TI.run_id, TI.dag_id)
+select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+.where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+.group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
)
-for dag_id, task_id, run_id, c in query:
-self.dag_run_active_tasks_map[dag_id, run_id] += c
-self.task_concurrency_map[(dag_id, task_id)] += c
-self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+for dag_id, task_id, run_id, state, count in query:
+# Always count towards task-level concurrency
(max_active_tis_per_dag /
+# max_active_tis_per_dagrun), including DEFERRED.
+self.task_concurrency_map[(dag_id, task_id)] += count
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] +=
count
+# Only count non-deferred states towards DAG-run active tasks
+# (max_active_tasks / worker slot accounting).
+if state != TaskInstanceState.DEFERRED:
+self.dag_run_active_tasks_map[dag_id, run_id] += count
Review Comment:
--> https://lists.apache.org/thread/vvjpo6q6h0j4f80dq36g4oo0mrp5ldbq
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
jscheffl commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837400987
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
self.dag_run_active_tasks_map.clear()
self.task_concurrency_map.clear()
self.task_dagrun_concurrency_map.clear()
+# Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to
exclude DEFERRED from dag_run_active_tasks_map
+#while still counting it for task-level limits.
query = session.execute(
-select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
-.where(TI.state.in_(EXECUTION_STATES))
-.group_by(TI.task_id, TI.run_id, TI.dag_id)
+select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+.where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+.group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
)
-for dag_id, task_id, run_id, c in query:
-self.dag_run_active_tasks_map[dag_id, run_id] += c
-self.task_concurrency_map[(dag_id, task_id)] += c
-self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+for dag_id, task_id, run_id, state, count in query:
+# Always count towards task-level concurrency
(max_active_tis_per_dag /
+# max_active_tis_per_dagrun), including DEFERRED.
+self.task_concurrency_map[(dag_id, task_id)] += count
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] +=
count
+# Only count non-deferred states towards DAG-run active tasks
+# (max_active_tasks / worker slot accounting).
+if state != TaskInstanceState.DEFERRED:
+self.dag_run_active_tasks_map[dag_id, run_id] += count
Review Comment:
I'll drop a question to devlist
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
SakshamSinghal20 commented on code in PR #61769: URL: https://github.com/apache/airflow/pull/61769#discussion_r2837222900 ## airflow-core/src/airflow/models/taskinstance.py: ## @@ -1738,15 +1738,23 @@ def xcom_pull( @provide_session def get_num_running_task_instances(self, session: Session, same_dagrun: bool = False) -> int: -"""Return Number of running TIs from the DB.""" +""" +Return number of active TIs for this task from the DB. + +Counts task instances in running, queued, or deferred state. +Deferred TIs are included because they are still logically in-flight +and must count against max_active_tis_per_dag / max_active_tis_per_dagrun. +""" Review Comment: A deferred task is "running" against the external system, but not against Airflow's worker pool. We could potentially add a config like `max_active_tasks_include_deferred` in the future for consistency, but treating this task-level limit differently is necessary to prevent external resource exhaustion. let me know if i am wrong as i am still learning.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
SakshamSinghal20 commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837219667
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
self.dag_run_active_tasks_map.clear()
self.task_concurrency_map.clear()
self.task_dagrun_concurrency_map.clear()
+# Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to
exclude DEFERRED from dag_run_active_tasks_map
+#while still counting it for task-level limits.
query = session.execute(
-select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
-.where(TI.state.in_(EXECUTION_STATES))
-.group_by(TI.task_id, TI.run_id, TI.dag_id)
+select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+.where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+.group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
)
-for dag_id, task_id, run_id, c in query:
-self.dag_run_active_tasks_map[dag_id, run_id] += c
-self.task_concurrency_map[(dag_id, task_id)] += c
-self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+for dag_id, task_id, run_id, state, count in query:
+# Always count towards task-level concurrency
(max_active_tis_per_dag /
+# max_active_tis_per_dagrun), including DEFERRED.
+self.task_concurrency_map[(dag_id, task_id)] += count
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] +=
count
+# Only count non-deferred states towards DAG-run active tasks
+# (max_active_tasks / worker slot accounting).
+if state != TaskInstanceState.DEFERRED:
+self.dag_run_active_tasks_map[dag_id, run_id] += count
Review Comment:
As @Nataneljpwd mentioned, `max_active_tasks` is primarily about limiting
how many executor slots a DAG run can take up. Because a `DEFERRED` task
releases its worker slot to the Triggerer, excluding it from this limit ensures
that a DAG doesn't needlessly block other tasks from being scheduled while it
waits on external events.
However, task level limits like `max_active_tis_per_dag` and
`max_active_tis_per_dagrun` are typically configured by users to protect
external resources. Even though a task is `DEFERRED` internally, it is still
logically "active" against that external system. If we exclude `DEFERRED` tasks
from these limits, Airflow could queue up hundreds of them simultaneously,
completely bypassing the user's intended safeguard and overwhelming the
downstream service.
open up for advive and changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
Nataneljpwd commented on code in PR #61769: URL: https://github.com/apache/airflow/pull/61769#discussion_r2837208217 ## airflow-core/src/airflow/models/taskinstance.py: ## @@ -1738,15 +1738,23 @@ def xcom_pull( @provide_session def get_num_running_task_instances(self, session: Session, same_dagrun: bool = False) -> int: -"""Return Number of running TIs from the DB.""" +""" +Return number of active TIs for this task from the DB. + +Counts task instances in running, queued, or deferred state. +Deferred TIs are included because they are still logically in-flight +and must count against max_active_tis_per_dag / max_active_tis_per_dagrun. +""" Review Comment: If we consider deferred tasks as running, isn't it logical that the `max_active_tasks` also takes the deferred tasks into consideration? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
Nataneljpwd commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837206151
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
self.dag_run_active_tasks_map.clear()
self.task_concurrency_map.clear()
self.task_dagrun_concurrency_map.clear()
+# Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to
exclude DEFERRED from dag_run_active_tasks_map
+#while still counting it for task-level limits.
query = session.execute(
-select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
-.where(TI.state.in_(EXECUTION_STATES))
-.group_by(TI.task_id, TI.run_id, TI.dag_id)
+select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+.where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+.group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
)
-for dag_id, task_id, run_id, c in query:
-self.dag_run_active_tasks_map[dag_id, run_id] += c
-self.task_concurrency_map[(dag_id, task_id)] += c
-self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+for dag_id, task_id, run_id, state, count in query:
+# Always count towards task-level concurrency
(max_active_tis_per_dag /
+# max_active_tis_per_dagrun), including DEFERRED.
+self.task_concurrency_map[(dag_id, task_id)] += count
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] +=
count
+# Only count non-deferred states towards DAG-run active tasks
+# (max_active_tasks / worker slot accounting).
+if state != TaskInstanceState.DEFERRED:
+self.dag_run_active_tasks_map[dag_id, run_id] += count
Review Comment:
From what I understand, it is done because the `max_active_tasks` limits
running tasks across all runs of the dag, which talks only about tasks taking
up executor slots (running state).
Yet I do agree with you on why is this different than the other 2 limits, as
it seems like all limits are talking about running tasks, and if in some places
we consider deferred as running, why don't we do it everywhere
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
Nataneljpwd commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837206151
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
self.dag_run_active_tasks_map.clear()
self.task_concurrency_map.clear()
self.task_dagrun_concurrency_map.clear()
+# Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to
exclude DEFERRED from dag_run_active_tasks_map
+#while still counting it for task-level limits.
query = session.execute(
-select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
-.where(TI.state.in_(EXECUTION_STATES))
-.group_by(TI.task_id, TI.run_id, TI.dag_id)
+select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+.where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+.group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
)
-for dag_id, task_id, run_id, c in query:
-self.dag_run_active_tasks_map[dag_id, run_id] += c
-self.task_concurrency_map[(dag_id, task_id)] += c
-self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+for dag_id, task_id, run_id, state, count in query:
+# Always count towards task-level concurrency
(max_active_tis_per_dag /
+# max_active_tis_per_dagrun), including DEFERRED.
+self.task_concurrency_map[(dag_id, task_id)] += count
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] +=
count
+# Only count non-deferred states towards DAG-run active tasks
+# (max_active_tasks / worker slot accounting).
+if state != TaskInstanceState.DEFERRED:
+self.dag_run_active_tasks_map[dag_id, run_id] += count
Review Comment:
From what I understand, it is done because the `max_active_tasks` limits
running tasks across all runs of the dag, which talks only about tasks taking
up executor slots (running state).
Yet I do agree, with you on why is this different than the other 2 limits,
as it seems like all limits are talking about running tasks, and if in some
places we consider deferred as running, why don't we do it everywhere
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
jscheffl commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2836491409
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
self.dag_run_active_tasks_map.clear()
self.task_concurrency_map.clear()
self.task_dagrun_concurrency_map.clear()
+# Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to
exclude DEFERRED from dag_run_active_tasks_map
+#while still counting it for task-level limits.
query = session.execute(
-select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
-.where(TI.state.in_(EXECUTION_STATES))
-.group_by(TI.task_id, TI.run_id, TI.dag_id)
+select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+.where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+.group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
)
-for dag_id, task_id, run_id, c in query:
-self.dag_run_active_tasks_map[dag_id, run_id] += c
-self.task_concurrency_map[(dag_id, task_id)] += c
-self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+for dag_id, task_id, run_id, state, count in query:
+# Always count towards task-level concurrency
(max_active_tis_per_dag /
+# max_active_tis_per_dagrun), including DEFERRED.
+self.task_concurrency_map[(dag_id, task_id)] += count
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] +=
count
+# Only count non-deferred states towards DAG-run active tasks
+# (max_active_tasks / worker slot accounting).
+if state != TaskInstanceState.DEFERRED:
+self.dag_run_active_tasks_map[dag_id, run_id] += count
Review Comment:
Why would you count deferred different task concurrencies on the different
limits? Is it not all deferred similar to running?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
SakshamSinghal20 commented on PR #61769: URL: https://github.com/apache/airflow/pull/61769#issuecomment-3928425891 Just let me know if everything is right or do i need to change anything else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
SakshamSinghal20 commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2827279692
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
self.task_concurrency_map[(dag_id, task_id)] += c
self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+# Additionally count DEFERRED TIs for task-level concurrency limits.
+# Deferred TIs are still in-flight and must count against
+# max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+# count toward dag_run_active_tasks (max_active_tasks) because deferred
+# tasks don't consume worker slots.
+deferred_query = session.execute(
+select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
+.where(TI.state == TaskInstanceState.DEFERRED)
+.group_by(TI.task_id, TI.run_id, TI.dag_id)
+)
+for dag_id, task_id, run_id, c in deferred_query:
+self.task_concurrency_map[(dag_id, task_id)] += c
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
Review Comment:
Okay I will update it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
SakshamSinghal20 commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2827280820
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
self.task_concurrency_map[(dag_id, task_id)] += c
self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+# Additionally count DEFERRED TIs for task-level concurrency limits.
+# Deferred TIs are still in-flight and must count against
+# max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+# count toward dag_run_active_tasks (max_active_tasks) because deferred
+# tasks don't consume worker slots.
+deferred_query = session.execute(
+select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
+.where(TI.state == TaskInstanceState.DEFERRED)
+.group_by(TI.task_id, TI.run_id, TI.dag_id)
+)
Review Comment:
Sure I will fix it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
SakshamSinghal20 commented on code in PR #61769: URL: https://github.com/apache/airflow/pull/61769#discussion_r2827277968 ## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ## @@ -193,6 +193,20 @@ def load(self, session: Session) -> None: self.task_concurrency_map[(dag_id, task_id)] += c self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c +# Additionally count DEFERRED TIs for task-level concurrency limits. +# Deferred TIs are still in-flight and must count against +# max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT +# count toward dag_run_active_tasks (max_active_tasks) because deferred +# tasks don't consume worker slots. Review Comment: sure i will update this ASAP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
jscheffl commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2821164333
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
self.task_concurrency_map[(dag_id, task_id)] += c
self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+# Additionally count DEFERRED TIs for task-level concurrency limits.
+# Deferred TIs are still in-flight and must count against
+# max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+# count toward dag_run_active_tasks (max_active_tasks) because deferred
+# tasks don't consume worker slots.
+deferred_query = session.execute(
+select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
+.where(TI.state == TaskInstanceState.DEFERRED)
+.group_by(TI.task_id, TI.run_id, TI.dag_id)
+)
Review Comment:
Instead of firing another second query, can you please add deferred just in
line 188 to the condition? And additional query would be expensive and
separation does not make it better
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
Nataneljpwd commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2799094569
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
self.task_concurrency_map[(dag_id, task_id)] += c
self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+# Additionally count DEFERRED TIs for task-level concurrency limits.
+# Deferred TIs are still in-flight and must count against
+# max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+# count toward dag_run_active_tasks (max_active_tasks) because deferred
+# tasks don't consume worker slots.
Review Comment:
I think this comment can be shortened to 1 sentance
##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
self.task_concurrency_map[(dag_id, task_id)] += c
self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+# Additionally count DEFERRED TIs for task-level concurrency limits.
+# Deferred TIs are still in-flight and must count against
+# max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+# count toward dag_run_active_tasks (max_active_tasks) because deferred
+# tasks don't consume worker slots.
+deferred_query = session.execute(
+select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
+.where(TI.state == TaskInstanceState.DEFERRED)
+.group_by(TI.task_id, TI.run_id, TI.dag_id)
+)
+for dag_id, task_id, run_id, c in deferred_query:
+self.task_concurrency_map[(dag_id, task_id)] += c
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
Review Comment:
I think this can be added to the query above instead of issuing another query
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
[PR] Fix max_active_tis_per_dag for deferred task instances [airflow]
SakshamSinghal20 opened a new pull request, #61769: URL: https://github.com/apache/airflow/pull/61769 issue #61700 ### Description Deferred tasks were previously excluded from [max_active_tis_per_dag](cci:1://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/tests/unit/models/test_taskinstance.py:313:4-325:37) and [max_active_tis_per_dagrun](cci:1://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/tests/unit/models/test_taskinstance.py:327:4-339:37) concurrency checks. This allowed an unlimited number of tasks to enter the `DEFERRED` state, bypassing configured limits. This PR enforces these limits for `DEFERRED` task instances while ensuring that [max_active_tasks](cci:1://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/tests/unit/jobs/test_scheduler_job.py:1444:4-1491:79) (which limits worker slot usage) remains unaffected. **Changes:** * **[airflow/ti_deps/dependencies_states.py](cci:7://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/src/airflow/ti_deps/dependencies_states.py:0:0-0:0)**: Introduced `TASK_CONCURRENCY_EXECUTION_STATES` to include `DEFERRED`, `RUNNING`, and `QUEUED`. * **[airflow/jobs/scheduler_job_runner.py](cci:7://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py:0:0-0:0)**: Updated [ConcurrencyMap](cci:2://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py:167:0-207:76) to count deferred TIs for task-level limits, but explicitly exclude them from `dag_run_active_tasks_map`. * **[airflow/models/taskinstance.py](cci:7://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/src/airflow/models/taskinstance.py:0:0-0:0)**: Updated [get_num_running_task_instances](cci:1://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/src/airflow/models/taskinstance.py:1738:4-1755:68) to count deferred TIs for pre-execution dependency checks. **Tests:** Added unit tests covering: * Deferred tasks blocking new scheduling attempts. * Mixed state scenarios (Running + Deferred). * Regression checks ensuring [max_active_tasks](cci:1://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/tests/unit/jobs/test_scheduler_job.py:1444:4-1491:79) is not impacted. --- # Was generative AI tooling used to co-author this PR? - [x] Yes (used for guidance and file locating) --- * closes: #61700 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
