This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1193e5e29d3 Fix scheduler MySQL task instance index hint (#66785)
1193e5e29d3 is described below
commit 1193e5e29d371e66b16938aaab401847d9f67f68
Author: SilverGun <[email protected]>
AuthorDate: Mon May 18 13:45:39 2026 +0900
Fix scheduler MySQL task instance index hint (#66785)
Co-authored-by: nanaones <[email protected]>
---
.../src/airflow/jobs/scheduler_job_runner.py | 1 -
airflow-core/tests/unit/jobs/test_scheduler_job.py | 29 ++++++++++++++++++++++
2 files changed, 29 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 9a650b110c9..18e5dd3fd22 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -632,7 +632,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# Select only rows where row_number <= max_active_tasks.
query = (
select(TI)
- .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
.select_from(ranked_query)
.join(
TI,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index ff1222c1222..5bb780fdbac 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -37,6 +37,7 @@ import psutil
import pytest
import time_machine
from sqlalchemy import delete, func, select, update
+from sqlalchemy.dialects import mysql
from sqlalchemy.orm import joinedload
from airflow import settings
@@ -1248,6 +1249,34 @@ class TestSchedulerJob:
assert {x.key for x in queued_tis} == {ti_non_backfill.key,
ti_backfill.key}
session.rollback()
+ def
test_find_executable_task_instances_mysql_hint_only_applies_to_inner_query(self,
dag_maker, session):
+ dag_id =
"SchedulerJobTest.test_find_executable_task_instances_mysql_hint_only_applies_to_inner_query"
+ task_id = "dummy"
+ with dag_maker(dag_id=dag_id, max_active_tasks=16):
+ task = EmptyOperator(task_id=task_id)
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+ dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+ ti = dag_run.get_task_instance(task.task_id)
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.flush()
+
+ captured_queries = []
+
+ def capture_locked_query(query, **kwargs):
+ captured_queries.append(query)
+ return query
+
+ with mock.patch("airflow.jobs.scheduler_job_runner.with_row_locks",
side_effect=capture_locked_query):
+ queued_tis =
self.job_runner._executable_task_instances_to_queued(max_tis=32,
session=session)
+
+ assert {queued_ti.key for queued_ti in queued_tis} == {ti.key}
+ compiled_query =
str(captured_queries[0].compile(dialect=mysql.dialect()))
+ assert compiled_query.count("USE INDEX (ti_state)") == 1
+
def test_find_executable_task_instances_pool(self, dag_maker):
dag_id = "SchedulerJobTest.test_find_executable_task_instances_pool"
task_id_1 = "dummy"