This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new bc13269940c Correctly pre-allocate `external_exeuctor_id` with
multiple executors. (#67388) (#67458)
bc13269940c is described below
commit bc13269940c5d30de3ee504e0967e34da202a704
Author: Rahul Vats <[email protected]>
AuthorDate: Mon May 25 14:28:43 2026 +0530
Correctly pre-allocate `external_exeuctor_id` with multiple executors.
(#67388) (#67458)
(cherry picked from commit 2def8027d7c22f49c206593b71cc8808bd6ad642)
Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
.../src/airflow/jobs/scheduler_job_runner.py | 21 ++++++++++--
airflow-core/tests/unit/jobs/test_scheduler_job.py | 38 +++++++++++++++++-----
2 files changed, 47 insertions(+), 12 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 7b6a781ce89..9e1fb20165c 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -33,7 +33,22 @@ from functools import lru_cache, partial
from itertools import groupby
from typing import TYPE_CHECKING, Any, cast
-from sqlalchemy import CTE, and_, case, delete, exists, func, inspect, or_,
select, text, tuple_, update
+from sqlalchemy import (
+ CTE,
+ Text,
+ and_,
+ case,
+ cast as sql_cast,
+ delete,
+ exists,
+ func,
+ inspect,
+ or_,
+ select,
+ text,
+ tuple_,
+ update,
+)
from sqlalchemy.exc import DBAPIError, OperationalError
from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient,
selectinload
from sqlalchemy.sql import expression
@@ -941,9 +956,9 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
opt_in_names.add(exc.name.module_path)
whens = []
if opt_in_names:
- whens.append((TI.executor.in_(opt_in_names),
random_db_uuid()))
+ whens.append((TI.executor.in_(opt_in_names),
sql_cast(random_db_uuid(), Text)))
if default_opts_in:
- whens.append((TI.executor.is_(None), random_db_uuid()))
+ whens.append((TI.executor.is_(None),
sql_cast(random_db_uuid(), Text)))
if whens:
queued_values["external_executor_id"] = case(*whens,
else_=TI.external_executor_id)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index e19bed5f52d..efe94326696 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -2788,31 +2788,51 @@ class TestSchedulerJob:
dag_id = "SchedulerJobTest.test_executable_sets_external_executor_id"
session = settings.Session()
with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE,
session=session):
- EmptyOperator(task_id="dummy")
+ EmptyOperator(task_id="a_task_pre_assign")
+ EmptyOperator(task_id="b_task_regular")
class PreAssigningExecutor(MockExecutor):
pre_assigns_external_executor_id = True
+ mock_module_path = "mock.pre_assigning.executor"
+ mock_alias = "pre_assigning_executor"
- scheduler_job = Job()
- self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[PreAssigningExecutor()])
+ regular_exec = MockExecutor()
+ assert regular_exec.pre_assigns_external_executor_id is False,
"Pre-condition"
+
+ pre_assigning_exec = PreAssigningExecutor()
+
+ self.job_runner = SchedulerJobRunner(job=Job(),
executors=(regular_exec, pre_assigning_exec))
dr = dag_maker.create_dagrun()
- ti = dr.get_task_instance("dummy", session)
- ti.state = State.SCHEDULED
- session.merge(ti)
+ ti_pre_assign = dr.get_task_instance("a_task_pre_assign", session)
+ ti_regular = dr.get_task_instance("b_task_regular", session)
+
+ ti_regular.state = State.SCHEDULED
+ ti_regular.executor = regular_exec.name.module_path
+ ti_pre_assign.state = State.SCHEDULED
+ ti_pre_assign.executor = pre_assigning_exec.name.module_path
session.flush()
returned_tis =
self.job_runner._executable_task_instances_to_queued(max_tis=32,
session=session)
+ returned_tis.sort(key=lambda ti: ti.task_id)
+
+ assert len(returned_tis) == 2
- assert len(returned_tis) == 1
# In-memory object (post make_transient) should carry the UUID
+ assert returned_tis[0].id == ti_pre_assign.id
assert returned_tis[0].external_executor_id is not None
- UUID(returned_tis[0].external_executor_id)
+ assert UUID(returned_tis[0].external_executor_id), "is valid uuid"
# DB row should also have it (the whole point — survives a crash)
- db_value =
session.scalar(select(TaskInstance.external_executor_id).where(TaskInstance.id
== ti.id))
+ db_value = session.scalar(
+ select(TaskInstance.external_executor_id).where(TaskInstance.id ==
ti_pre_assign.id)
+ )
assert db_value == returned_tis[0].external_executor_id
+ # In mixed-executor mode, only TIs routed to a pre-assigning executor
get an external_executor_id.
+ assert returned_tis[1].id == ti_regular.id
+ assert returned_tis[1].external_executor_id is None
+
session.rollback()
@pytest.mark.parametrize("state", [State.FAILED, State.SUCCESS])