Nataneljpwd commented on code in PR #53492:
URL: https://github.com/apache/airflow/pull/53492#discussion_r2250099050
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -373,292 +377,171 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
# If the pools are full, there is no point doing anything!
# If _somehow_ the pool is overfull, don't let the limit go negative -
it breaks SQL
pool_slots_free = sum(max(0, pool["open"]) for pool in pools.values())
+ starved_pools = {pool_name for pool_name, stats in pools.items() if
stats["open"] <= 0}
+ starved_tasks: set[tuple[str, str]] = set()
if pool_slots_free == 0:
self.log.debug("All pools are full!")
return []
- max_tis = min(max_tis, pool_slots_free)
-
- starved_pools = {pool_name for pool_name, stats in pools.items() if
stats["open"] <= 0}
-
- # dag_id to # of running tasks and (dag_id, task_id) to # of running
tasks.
- concurrency_map = ConcurrencyMap()
- concurrency_map.load(session=session)
+ priority_order = [-TI.priority_weight, DR.logical_date, TI.map_index]
- # Number of tasks that cannot be scheduled because of no open slot in
pool
- num_starving_tasks_total = 0
-
- # dag and task ids that can't be queued because of concurrency limits
- starved_dags: set[str] = set()
- starved_tasks: set[tuple[str, str]] = set()
- starved_tasks_task_dagrun_concurrency: set[tuple[str, str, str]] =
set()
-
- pool_num_starving_tasks: dict[str, int] = Counter()
+ query = (
+ select(TI)
+ .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
+ .join(TI.dag_run)
+ .where(DR.state == DagRunState.RUNNING)
+ .join(TI.dag_model)
+ .where(~DM.is_paused)
+ .where(TI.state == TaskInstanceState.SCHEDULED)
+ .where(DM.bundle_name.is_not(None))
+ )
- for loop_count in itertools.count(start=1):
- num_starved_pools = len(starved_pools)
- num_starved_dags = len(starved_dags)
- num_starved_tasks = len(starved_tasks)
- num_starved_tasks_task_dagrun_concurrency =
len(starved_tasks_task_dagrun_concurrency)
+ @dataclass
+ class LimitWindowDescriptor:
+ running_now_join: CTE
+ running_now_join_predicates: Collection[str]
+ limit_column: Column
+ window: expression.ColumnElement
+ limit_join_model: Base | None = None
+
+ def running_tasks_group(group_fields: Collection[Column], states:
Collection[TaskInstanceState]=EXECUTION_STATES) -> CTE:
Review Comment:
This is still a draft PR, nothing has yet been stabilized, we do manual
tests first and then go with the automated tests after we fix all the bugs, as
we first want to measure performance to know if it is even worth the
performance hit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]