This is an automated email from the ASF dual-hosted git repository. vatsrahul1001 pushed a commit to branch v3-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 41ce931d55f04fddf5ff6d6e4a377897ecd9e995 Author: Rahul Vats <[email protected]> AuthorDate: Mon May 25 14:28:43 2026 +0530 Correctly pre-allocate `external_exeuctor_id` with multiple executors. (#67388) (#67458) (cherry picked from commit 2def8027d7c22f49c206593b71cc8808bd6ad642) Co-authored-by: Ash Berlin-Taylor <[email protected]> --- .../src/airflow/jobs/scheduler_job_runner.py | 21 ++++++++++-- airflow-core/tests/unit/jobs/test_scheduler_job.py | 38 +++++++++++++++++----- 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 7b6a781ce89..9e1fb20165c 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -33,7 +33,22 @@ from functools import lru_cache, partial from itertools import groupby from typing import TYPE_CHECKING, Any, cast -from sqlalchemy import CTE, and_, case, delete, exists, func, inspect, or_, select, text, tuple_, update +from sqlalchemy import ( + CTE, + Text, + and_, + case, + cast as sql_cast, + delete, + exists, + func, + inspect, + or_, + select, + text, + tuple_, + update, +) from sqlalchemy.exc import DBAPIError, OperationalError from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, selectinload from sqlalchemy.sql import expression @@ -941,9 +956,9 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): opt_in_names.add(exc.name.module_path) whens = [] if opt_in_names: - whens.append((TI.executor.in_(opt_in_names), random_db_uuid())) + whens.append((TI.executor.in_(opt_in_names), sql_cast(random_db_uuid(), Text))) if default_opts_in: - whens.append((TI.executor.is_(None), random_db_uuid())) + whens.append((TI.executor.is_(None), sql_cast(random_db_uuid(), Text))) if whens: queued_values["external_executor_id"] = case(*whens, else_=TI.external_executor_id) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index e19bed5f52d..efe94326696 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -2788,31 +2788,51 @@ class TestSchedulerJob: dag_id = "SchedulerJobTest.test_executable_sets_external_executor_id" session = settings.Session() with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE, session=session): - EmptyOperator(task_id="dummy") + EmptyOperator(task_id="a_task_pre_assign") + EmptyOperator(task_id="b_task_regular") class PreAssigningExecutor(MockExecutor): pre_assigns_external_executor_id = True + mock_module_path = "mock.pre_assigning.executor" + mock_alias = "pre_assigning_executor" - scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[PreAssigningExecutor()]) + regular_exec = MockExecutor() + assert regular_exec.pre_assigns_external_executor_id is False, "Pre-condition" + + pre_assigning_exec = PreAssigningExecutor() + + self.job_runner = SchedulerJobRunner(job=Job(), executors=(regular_exec, pre_assigning_exec)) dr = dag_maker.create_dagrun() - ti = dr.get_task_instance("dummy", session) - ti.state = State.SCHEDULED - session.merge(ti) + ti_pre_assign = dr.get_task_instance("a_task_pre_assign", session) + ti_regular = dr.get_task_instance("b_task_regular", session) + + ti_regular.state = State.SCHEDULED + ti_regular.executor = regular_exec.name.module_path + ti_pre_assign.state = State.SCHEDULED + ti_pre_assign.executor = pre_assigning_exec.name.module_path session.flush() returned_tis = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session) + returned_tis.sort(key=lambda ti: ti.task_id) + + assert len(returned_tis) == 2 - assert len(returned_tis) == 1 # In-memory object (post make_transient) should carry the UUID + assert returned_tis[0].id == ti_pre_assign.id assert returned_tis[0].external_executor_id is not None - UUID(returned_tis[0].external_executor_id) + assert UUID(returned_tis[0].external_executor_id), "is valid uuid" # DB row should also have it (the whole point — survives a crash) - db_value = session.scalar(select(TaskInstance.external_executor_id).where(TaskInstance.id == ti.id)) + db_value = session.scalar( + select(TaskInstance.external_executor_id).where(TaskInstance.id == ti_pre_assign.id) + ) assert db_value == returned_tis[0].external_executor_id + # In mixed-executor mode, only TIs routed to a pre-assigning executor get an external_executor_id. + assert returned_tis[1].id == ti_regular.id + assert returned_tis[1].external_executor_id is None + session.rollback() @pytest.mark.parametrize("state", [State.FAILED, State.SUCCESS])
