Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-04-02 Thread via GitHub


potiuk commented on PR #61769:
URL: https://github.com/apache/airflow/pull/61769#issuecomment-4179328038

   @SakshamSinghal20 This PR has been converted to **draft** because it does 
not yet meet our [Pull Request quality 
criteria](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-quality-criteria).
   
   **Issues found:**
   - :x: **Pre-commit / static checks**: Failing: CI image checks / Static 
checks. Run `prek run --from-ref main` locally to find and fix issues. See 
[Pre-commit / static checks 
docs](https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst).
   - :x: **mypy (type checking)**: Failing: CI image checks / MyPy checks 
(mypy-airflow-core). Run `prek --stage manual mypy-airflow-core --all-files` 
locally to reproduce. You need `breeze ci-image build --python 3.10` for 
Docker-based mypy. See [mypy (type checking) 
docs](https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst#mypy-checks).
   - :x: **Other failing CI checks**: Failing: Postgres tests: core / 
DB-core:Postgres:14:3.10:API...Serialization, MySQL tests: core / 
DB-core:MySQL:8.0:3.10:API...Serialization, Sqlite tests: core / 
DB-core:Sqlite:3.10:API...Serialization, Non-DB tests: core / 
Non-DB-core::3.10:API...Serialization, Integration and System Tests / 
Integration core kerberos (+3 more). Run `prek run --from-ref main` locally to 
reproduce. See [static checks 
docs](https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst).
   - :warning: **Unresolved review comments**: This PR has 2 unresolved review 
threads from maintainers: **@jscheffl** (MEMBER): 1 unresolved threads; 
**@uranusjr** (MEMBER): 1 unresolved threads. Please review and resolve all 
inline review comments before requesting another review. You can resolve a 
conversation by clicking 'Resolve conversation' on each thread after addressing 
the feedback. See [pull request 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst).
   
   > **Note:** Your branch is **1377 commits behind `main`**. Some check 
failures may be caused by changes in the base branch rather than by your PR. 
Please rebase your branch and push again to get up-to-date CI results.
   
   **What to do next:**
   - The comment informs you what you need to do.
   - Fix each issue, then mark the PR as "Ready for review" in the GitHub UI - 
but only after making sure that all the issues are fixed.
   - There is no rush — take your time and work at your own pace. We appreciate 
your contribution and are happy to wait for updates.
   - Maintainers will then proceed with a normal review.
   
   Converting a PR to draft is **not** a rejection — it is an invitation to 
bring the PR up to the project's standards so that maintainer review time is 
spent productively. There is no rush — take your time and work at your own 
pace. We appreciate your contribution and are happy to wait for updates. If you 
have questions, feel free to ask on the [Airflow 
Slack](https://s.apache.org/airflow-slack).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-03-18 Thread via GitHub


jscheffl commented on PR #61769:
URL: https://github.com/apache/airflow/pull/61769#issuecomment-4085991381

   Lazy consensus passed. WOuld it be possible to complete the PR for review by 
the next 24h?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-03-15 Thread via GitHub


jscheffl commented on PR #61769:
URL: https://github.com/apache/airflow/pull/61769#issuecomment-4064049082

   > > "Request changes" mainly to block merge as long as devlist discussion 
not finalized
   > 
   > Understood.
   
   Called for LAZY CONSENSUS in devlist in 
https://lists.apache.org/thread/qvnfj09qlktmbpdpcsgkbdb8pdmfzm77
   
   Would it be OK to prepare the PR towards this so that in 3 days it is ready 
to be merged?
   As this is quite important for me and our setup, let me know if I shall 
support in making the changes or fixes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-03-03 Thread via GitHub


SakshamSinghal20 commented on PR #61769:
URL: https://github.com/apache/airflow/pull/61769#issuecomment-3989824812

   > "Request changes" mainly to block merge as long as devlist discussion not 
finalized
   
   Understood.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-03-01 Thread via GitHub


jscheffl commented on PR #61769:
URL: https://github.com/apache/airflow/pull/61769#issuecomment-3980210006

   > @uranusjr I have changed the state to Active. also let me know if i need 
to do anything else before the PR gets merged. Thank you.
   
   Discussion in Devlist is still ongoing. I think the outcome of this 
influenced the implementation strategy of this PR. Please do not merge before 
settled.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-22 Thread via GitHub


jscheffl commented on PR #61769:
URL: https://github.com/apache/airflow/pull/61769#issuecomment-3940637121

   > Hi @jscheffl and @Nataneljpwd! I'm a new contributor and I am very 
interested in participating in GSoC with this organization this year. I've been 
reading through the contributing guidelines, but I wanted to ask if there are 
any specific parts of the codebase you'd recommend I focus on to make a 
meaningful impact?
   
   I'd take a look for 
https://github.com/apache/airflow/issues?q=is%3Aissue%20state%3Aopen%20label%3A%22good%20first%20issue%22


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-22 Thread via GitHub


SakshamSinghal20 commented on PR #61769:
URL: https://github.com/apache/airflow/pull/61769#issuecomment-3940634969

   Hi @jscheffl and @Nataneljpwd! I'm a new contributor and I am very 
interested in participating in GSoC with this organization this year. I've been 
reading through the contributing guidelines, but I wanted to ask if there are 
any specific parts of the codebase you'd recommend I focus on to make a 
meaningful impact?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-22 Thread via GitHub


jscheffl commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837431329


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
 self.dag_run_active_tasks_map.clear()
 self.task_concurrency_map.clear()
 self.task_dagrun_concurrency_map.clear()
+# Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to 
exclude DEFERRED from dag_run_active_tasks_map 
+#while still counting it for task-level limits.
 query = session.execute(
-select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
-.where(TI.state.in_(EXECUTION_STATES))
-.group_by(TI.task_id, TI.run_id, TI.dag_id)
+select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+.where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+.group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
 )
-for dag_id, task_id, run_id, c in query:
-self.dag_run_active_tasks_map[dag_id, run_id] += c
-self.task_concurrency_map[(dag_id, task_id)] += c
-self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+for dag_id, task_id, run_id, state, count in query:
+# Always count towards task-level concurrency 
(max_active_tis_per_dag /
+# max_active_tis_per_dagrun), including DEFERRED.
+self.task_concurrency_map[(dag_id, task_id)] += count
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += 
count
+# Only count non-deferred states towards DAG-run active tasks
+# (max_active_tasks / worker slot accounting).
+if state != TaskInstanceState.DEFERRED:
+self.dag_run_active_tasks_map[dag_id, run_id] += count

Review Comment:
   --> https://lists.apache.org/thread/vvjpo6q6h0j4f80dq36g4oo0mrp5ldbq



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-22 Thread via GitHub


jscheffl commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837400987


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
 self.dag_run_active_tasks_map.clear()
 self.task_concurrency_map.clear()
 self.task_dagrun_concurrency_map.clear()
+# Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to 
exclude DEFERRED from dag_run_active_tasks_map 
+#while still counting it for task-level limits.
 query = session.execute(
-select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
-.where(TI.state.in_(EXECUTION_STATES))
-.group_by(TI.task_id, TI.run_id, TI.dag_id)
+select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+.where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+.group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
 )
-for dag_id, task_id, run_id, c in query:
-self.dag_run_active_tasks_map[dag_id, run_id] += c
-self.task_concurrency_map[(dag_id, task_id)] += c
-self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+for dag_id, task_id, run_id, state, count in query:
+# Always count towards task-level concurrency 
(max_active_tis_per_dag /
+# max_active_tis_per_dagrun), including DEFERRED.
+self.task_concurrency_map[(dag_id, task_id)] += count
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += 
count
+# Only count non-deferred states towards DAG-run active tasks
+# (max_active_tasks / worker slot accounting).
+if state != TaskInstanceState.DEFERRED:
+self.dag_run_active_tasks_map[dag_id, run_id] += count

Review Comment:
   I'll drop a question to devlist



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-21 Thread via GitHub


SakshamSinghal20 commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837222900


##
airflow-core/src/airflow/models/taskinstance.py:
##
@@ -1738,15 +1738,23 @@ def xcom_pull(
 
 @provide_session
 def get_num_running_task_instances(self, session: Session, same_dagrun: 
bool = False) -> int:
-"""Return Number of running TIs from the DB."""
+"""
+Return number of active TIs for this task from the DB.
+
+Counts task instances in running, queued, or deferred state.
+Deferred TIs are included because they are still logically in-flight
+and must count against max_active_tis_per_dag / 
max_active_tis_per_dagrun.
+"""

Review Comment:
   A deferred task is "running" against the external system, but not against 
Airflow's worker pool. We could potentially add a config like 
`max_active_tasks_include_deferred` in the future for consistency, but treating 
this task-level limit differently is necessary to prevent external resource 
exhaustion.
   
   let me know if i am wrong as i am still learning..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-21 Thread via GitHub


SakshamSinghal20 commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837219667


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
 self.dag_run_active_tasks_map.clear()
 self.task_concurrency_map.clear()
 self.task_dagrun_concurrency_map.clear()
+# Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to 
exclude DEFERRED from dag_run_active_tasks_map 
+#while still counting it for task-level limits.
 query = session.execute(
-select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
-.where(TI.state.in_(EXECUTION_STATES))
-.group_by(TI.task_id, TI.run_id, TI.dag_id)
+select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+.where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+.group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
 )
-for dag_id, task_id, run_id, c in query:
-self.dag_run_active_tasks_map[dag_id, run_id] += c
-self.task_concurrency_map[(dag_id, task_id)] += c
-self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+for dag_id, task_id, run_id, state, count in query:
+# Always count towards task-level concurrency 
(max_active_tis_per_dag /
+# max_active_tis_per_dagrun), including DEFERRED.
+self.task_concurrency_map[(dag_id, task_id)] += count
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += 
count
+# Only count non-deferred states towards DAG-run active tasks
+# (max_active_tasks / worker slot accounting).
+if state != TaskInstanceState.DEFERRED:
+self.dag_run_active_tasks_map[dag_id, run_id] += count

Review Comment:
   As @Nataneljpwd  mentioned, `max_active_tasks` is primarily about limiting 
how many executor slots a DAG run can take up. Because a `DEFERRED` task 
releases its worker slot to the Triggerer, excluding it from this limit ensures 
that a DAG doesn't needlessly block other tasks from being scheduled while it 
waits on external events. 
   However, task level limits like `max_active_tis_per_dag` and 
`max_active_tis_per_dagrun` are typically configured by users to protect 
external resources. Even though a task is `DEFERRED` internally, it is still 
logically "active" against that external system. If we exclude `DEFERRED` tasks 
from these limits, Airflow could queue up hundreds of them simultaneously, 
completely bypassing the user's intended safeguard and overwhelming the 
downstream service.
   
   open up for advive and changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-21 Thread via GitHub


Nataneljpwd commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837208217


##
airflow-core/src/airflow/models/taskinstance.py:
##
@@ -1738,15 +1738,23 @@ def xcom_pull(
 
 @provide_session
 def get_num_running_task_instances(self, session: Session, same_dagrun: 
bool = False) -> int:
-"""Return Number of running TIs from the DB."""
+"""
+Return number of active TIs for this task from the DB.
+
+Counts task instances in running, queued, or deferred state.
+Deferred TIs are included because they are still logically in-flight
+and must count against max_active_tis_per_dag / 
max_active_tis_per_dagrun.
+"""

Review Comment:
   If we consider deferred tasks as running, isn't it logical that the 
`max_active_tasks` also takes the deferred tasks into consideration?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-21 Thread via GitHub


Nataneljpwd commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837206151


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
 self.dag_run_active_tasks_map.clear()
 self.task_concurrency_map.clear()
 self.task_dagrun_concurrency_map.clear()
+# Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to 
exclude DEFERRED from dag_run_active_tasks_map 
+#while still counting it for task-level limits.
 query = session.execute(
-select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
-.where(TI.state.in_(EXECUTION_STATES))
-.group_by(TI.task_id, TI.run_id, TI.dag_id)
+select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+.where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+.group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
 )
-for dag_id, task_id, run_id, c in query:
-self.dag_run_active_tasks_map[dag_id, run_id] += c
-self.task_concurrency_map[(dag_id, task_id)] += c
-self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+for dag_id, task_id, run_id, state, count in query:
+# Always count towards task-level concurrency 
(max_active_tis_per_dag /
+# max_active_tis_per_dagrun), including DEFERRED.
+self.task_concurrency_map[(dag_id, task_id)] += count
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += 
count
+# Only count non-deferred states towards DAG-run active tasks
+# (max_active_tasks / worker slot accounting).
+if state != TaskInstanceState.DEFERRED:
+self.dag_run_active_tasks_map[dag_id, run_id] += count

Review Comment:
   From what I understand, it is done because the `max_active_tasks` limits 
running tasks across all runs of the dag, which talks only about tasks taking 
up executor slots (running state).
   
   Yet I do agree with you on why is this different than the other 2 limits, as 
it seems like all limits are talking about running tasks, and if in some places 
we consider deferred as running, why don't we do it everywhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-21 Thread via GitHub


Nataneljpwd commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2837206151


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
 self.dag_run_active_tasks_map.clear()
 self.task_concurrency_map.clear()
 self.task_dagrun_concurrency_map.clear()
+# Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to 
exclude DEFERRED from dag_run_active_tasks_map 
+#while still counting it for task-level limits.
 query = session.execute(
-select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
-.where(TI.state.in_(EXECUTION_STATES))
-.group_by(TI.task_id, TI.run_id, TI.dag_id)
+select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+.where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+.group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
 )
-for dag_id, task_id, run_id, c in query:
-self.dag_run_active_tasks_map[dag_id, run_id] += c
-self.task_concurrency_map[(dag_id, task_id)] += c
-self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+for dag_id, task_id, run_id, state, count in query:
+# Always count towards task-level concurrency 
(max_active_tis_per_dag /
+# max_active_tis_per_dagrun), including DEFERRED.
+self.task_concurrency_map[(dag_id, task_id)] += count
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += 
count
+# Only count non-deferred states towards DAG-run active tasks
+# (max_active_tasks / worker slot accounting).
+if state != TaskInstanceState.DEFERRED:
+self.dag_run_active_tasks_map[dag_id, run_id] += count

Review Comment:
   From what I understand, it is done because the `max_active_tasks` limits 
running tasks across all runs of the dag, which talks only about tasks taking 
up executor slots (running state).
   
   Yet I do agree, with you on why is this different than the other 2 limits, 
as it seems like all limits are talking about running tasks, and if in some 
places we consider deferred as running, why don't we do it everywhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-21 Thread via GitHub


jscheffl commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2836491409


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -183,15 +183,22 @@ def load(self, session: Session) -> None:
 self.dag_run_active_tasks_map.clear()
 self.task_concurrency_map.clear()
 self.task_dagrun_concurrency_map.clear()
+# Use one grouped query on TASK_CONCURRENCY_EXECUTION_STATES to 
exclude DEFERRED from dag_run_active_tasks_map 
+#while still counting it for task-level limits.
 query = session.execute(
-select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
-.where(TI.state.in_(EXECUTION_STATES))
-.group_by(TI.task_id, TI.run_id, TI.dag_id)
+select(TI.dag_id, TI.task_id, TI.run_id, TI.state, func.count("*"))
+.where(TI.state.in_(TASK_CONCURRENCY_EXECUTION_STATES))
+.group_by(TI.dag_id, TI.task_id, TI.run_id, TI.state)
 )
-for dag_id, task_id, run_id, c in query:
-self.dag_run_active_tasks_map[dag_id, run_id] += c
-self.task_concurrency_map[(dag_id, task_id)] += c
-self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+for dag_id, task_id, run_id, state, count in query:
+# Always count towards task-level concurrency 
(max_active_tis_per_dag /
+# max_active_tis_per_dagrun), including DEFERRED.
+self.task_concurrency_map[(dag_id, task_id)] += count
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += 
count
+# Only count non-deferred states towards DAG-run active tasks
+# (max_active_tasks / worker slot accounting).
+if state != TaskInstanceState.DEFERRED:
+self.dag_run_active_tasks_map[dag_id, run_id] += count

Review Comment:
   Why would you count deferred different task concurrencies on the different 
limits? Is it not all deferred similar to running?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-19 Thread via GitHub


SakshamSinghal20 commented on PR #61769:
URL: https://github.com/apache/airflow/pull/61769#issuecomment-3928425891

Just let me know if everything is right or do i need to change anything 
else?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-19 Thread via GitHub


SakshamSinghal20 commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2827279692


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
 self.task_concurrency_map[(dag_id, task_id)] += c
 self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
 
+# Additionally count DEFERRED TIs for task-level concurrency limits.
+# Deferred TIs are still in-flight and must count against
+# max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+# count toward dag_run_active_tasks (max_active_tasks) because deferred
+# tasks don't consume worker slots.
+deferred_query = session.execute(
+select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
+.where(TI.state == TaskInstanceState.DEFERRED)
+.group_by(TI.task_id, TI.run_id, TI.dag_id)
+)
+for dag_id, task_id, run_id, c in deferred_query:
+self.task_concurrency_map[(dag_id, task_id)] += c
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c

Review Comment:
   Okay I will update it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-19 Thread via GitHub


SakshamSinghal20 commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2827280820


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
 self.task_concurrency_map[(dag_id, task_id)] += c
 self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
 
+# Additionally count DEFERRED TIs for task-level concurrency limits.
+# Deferred TIs are still in-flight and must count against
+# max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+# count toward dag_run_active_tasks (max_active_tasks) because deferred
+# tasks don't consume worker slots.
+deferred_query = session.execute(
+select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
+.where(TI.state == TaskInstanceState.DEFERRED)
+.group_by(TI.task_id, TI.run_id, TI.dag_id)
+)

Review Comment:
   Sure I will fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-19 Thread via GitHub


SakshamSinghal20 commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2827277968


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
 self.task_concurrency_map[(dag_id, task_id)] += c
 self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
 
+# Additionally count DEFERRED TIs for task-level concurrency limits.
+# Deferred TIs are still in-flight and must count against
+# max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+# count toward dag_run_active_tasks (max_active_tasks) because deferred
+# tasks don't consume worker slots.

Review Comment:
   sure i will update this ASAP



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-18 Thread via GitHub


jscheffl commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2821164333


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
 self.task_concurrency_map[(dag_id, task_id)] += c
 self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
 
+# Additionally count DEFERRED TIs for task-level concurrency limits.
+# Deferred TIs are still in-flight and must count against
+# max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+# count toward dag_run_active_tasks (max_active_tasks) because deferred
+# tasks don't consume worker slots.
+deferred_query = session.execute(
+select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
+.where(TI.state == TaskInstanceState.DEFERRED)
+.group_by(TI.task_id, TI.run_id, TI.dag_id)
+)

Review Comment:
   Instead of firing another second query, can you please add deferred just in 
line 188 to the condition? And additional query would be expensive and 
separation does not make it better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-17 Thread via GitHub


Nataneljpwd commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2799094569


##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
 self.task_concurrency_map[(dag_id, task_id)] += c
 self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
 
+# Additionally count DEFERRED TIs for task-level concurrency limits.
+# Deferred TIs are still in-flight and must count against
+# max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+# count toward dag_run_active_tasks (max_active_tasks) because deferred
+# tasks don't consume worker slots.

Review Comment:
   I think this comment can be shortened to 1 sentance



##
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
 self.task_concurrency_map[(dag_id, task_id)] += c
 self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
 
+# Additionally count DEFERRED TIs for task-level concurrency limits.
+# Deferred TIs are still in-flight and must count against
+# max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+# count toward dag_run_active_tasks (max_active_tasks) because deferred
+# tasks don't consume worker slots.
+deferred_query = session.execute(
+select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
+.where(TI.state == TaskInstanceState.DEFERRED)
+.group_by(TI.task_id, TI.run_id, TI.dag_id)
+)
+for dag_id, task_id, run_id, c in deferred_query:
+self.task_concurrency_map[(dag_id, task_id)] += c
+self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c

Review Comment:
   I think this can be added to the query above instead of issuing another query



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



[PR] Fix max_active_tis_per_dag for deferred task instances [airflow]

2026-02-11 Thread via GitHub


SakshamSinghal20 opened a new pull request, #61769:
URL: https://github.com/apache/airflow/pull/61769

   issue #61700
   ### Description
   
   Deferred tasks were previously excluded from 
[max_active_tis_per_dag](cci:1://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/tests/unit/models/test_taskinstance.py:313:4-325:37)
 and 
[max_active_tis_per_dagrun](cci:1://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/tests/unit/models/test_taskinstance.py:327:4-339:37)
 concurrency checks. This allowed an unlimited number of tasks to enter the 
`DEFERRED` state, bypassing configured limits.
   
   This PR enforces these limits for `DEFERRED` task instances while ensuring 
that 
[max_active_tasks](cci:1://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/tests/unit/jobs/test_scheduler_job.py:1444:4-1491:79)
 (which limits worker slot usage) remains unaffected.
   
   **Changes:**
   
   *   
**[airflow/ti_deps/dependencies_states.py](cci:7://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/src/airflow/ti_deps/dependencies_states.py:0:0-0:0)**:
 Introduced `TASK_CONCURRENCY_EXECUTION_STATES` to include `DEFERRED`, 
`RUNNING`, and `QUEUED`.
   *   
**[airflow/jobs/scheduler_job_runner.py](cci:7://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py:0:0-0:0)**:
 Updated 
[ConcurrencyMap](cci:2://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py:167:0-207:76)
 to count deferred TIs for task-level limits, but explicitly exclude them from 
`dag_run_active_tasks_map`.
   *   
**[airflow/models/taskinstance.py](cci:7://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/src/airflow/models/taskinstance.py:0:0-0:0)**:
 Updated 
[get_num_running_task_instances](cci:1://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/src/airflow/models/taskinstance.py:1738:4-1755:68)
 to count deferred TIs for pre-execution dependency checks.
   
   **Tests:**
   
   Added unit tests covering:
   *   Deferred tasks blocking new scheduling attempts.
   *   Mixed state scenarios (Running + Deferred).
   *   Regression checks ensuring 
[max_active_tasks](cci:1://file:///c:/Users/Saksham%20Singhal/OneDrive/Desktop/Airflowww/airflow/airflow-core/tests/unit/jobs/test_scheduler_job.py:1444:4-1491:79)
 is not impacted.
   
   ---
   
   # Was generative AI tooling used to co-author this PR?
   
   - [x] Yes  (used for guidance and file locating)
   
   
   ---
   
   * closes: #61700


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]