This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new bc13269940c Correctly pre-allocate `external_exeuctor_id` with 
multiple executors. (#67388) (#67458)
bc13269940c is described below

commit bc13269940c5d30de3ee504e0967e34da202a704
Author: Rahul Vats <[email protected]>
AuthorDate: Mon May 25 14:28:43 2026 +0530

    Correctly pre-allocate `external_exeuctor_id` with multiple executors. 
(#67388) (#67458)
    
    (cherry picked from commit 2def8027d7c22f49c206593b71cc8808bd6ad642)
    
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 21 ++++++++++--
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 38 +++++++++++++++++-----
 2 files changed, 47 insertions(+), 12 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 7b6a781ce89..9e1fb20165c 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -33,7 +33,22 @@ from functools import lru_cache, partial
 from itertools import groupby
 from typing import TYPE_CHECKING, Any, cast
 
-from sqlalchemy import CTE, and_, case, delete, exists, func, inspect, or_, 
select, text, tuple_, update
+from sqlalchemy import (
+    CTE,
+    Text,
+    and_,
+    case,
+    cast as sql_cast,
+    delete,
+    exists,
+    func,
+    inspect,
+    or_,
+    select,
+    text,
+    tuple_,
+    update,
+)
 from sqlalchemy.exc import DBAPIError, OperationalError
 from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, 
selectinload
 from sqlalchemy.sql import expression
@@ -941,9 +956,9 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                         opt_in_names.add(exc.name.module_path)
                 whens = []
                 if opt_in_names:
-                    whens.append((TI.executor.in_(opt_in_names), 
random_db_uuid()))
+                    whens.append((TI.executor.in_(opt_in_names), 
sql_cast(random_db_uuid(), Text)))
                 if default_opts_in:
-                    whens.append((TI.executor.is_(None), random_db_uuid()))
+                    whens.append((TI.executor.is_(None), 
sql_cast(random_db_uuid(), Text)))
                 if whens:
                     queued_values["external_executor_id"] = case(*whens, 
else_=TI.external_executor_id)
 
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index e19bed5f52d..efe94326696 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -2788,31 +2788,51 @@ class TestSchedulerJob:
         dag_id = "SchedulerJobTest.test_executable_sets_external_executor_id"
         session = settings.Session()
         with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE, 
session=session):
-            EmptyOperator(task_id="dummy")
+            EmptyOperator(task_id="a_task_pre_assign")
+            EmptyOperator(task_id="b_task_regular")
 
         class PreAssigningExecutor(MockExecutor):
             pre_assigns_external_executor_id = True
+            mock_module_path = "mock.pre_assigning.executor"
+            mock_alias = "pre_assigning_executor"
 
-        scheduler_job = Job()
-        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[PreAssigningExecutor()])
+        regular_exec = MockExecutor()
+        assert regular_exec.pre_assigns_external_executor_id is False, 
"Pre-condition"
+
+        pre_assigning_exec = PreAssigningExecutor()
+
+        self.job_runner = SchedulerJobRunner(job=Job(), 
executors=(regular_exec, pre_assigning_exec))
 
         dr = dag_maker.create_dagrun()
-        ti = dr.get_task_instance("dummy", session)
-        ti.state = State.SCHEDULED
-        session.merge(ti)
+        ti_pre_assign = dr.get_task_instance("a_task_pre_assign", session)
+        ti_regular = dr.get_task_instance("b_task_regular", session)
+
+        ti_regular.state = State.SCHEDULED
+        ti_regular.executor = regular_exec.name.module_path
+        ti_pre_assign.state = State.SCHEDULED
+        ti_pre_assign.executor = pre_assigning_exec.name.module_path
         session.flush()
 
         returned_tis = 
self.job_runner._executable_task_instances_to_queued(max_tis=32, 
session=session)
+        returned_tis.sort(key=lambda ti: ti.task_id)
+
+        assert len(returned_tis) == 2
 
-        assert len(returned_tis) == 1
         # In-memory object (post make_transient) should carry the UUID
+        assert returned_tis[0].id == ti_pre_assign.id
         assert returned_tis[0].external_executor_id is not None
-        UUID(returned_tis[0].external_executor_id)
+        assert UUID(returned_tis[0].external_executor_id), "is valid uuid"
 
         # DB row should also have it (the whole point — survives a crash)
-        db_value = 
session.scalar(select(TaskInstance.external_executor_id).where(TaskInstance.id 
== ti.id))
+        db_value = session.scalar(
+            select(TaskInstance.external_executor_id).where(TaskInstance.id == 
ti_pre_assign.id)
+        )
         assert db_value == returned_tis[0].external_executor_id
 
+        # In mixed-executor mode, only TIs routed to a pre-assigning executor 
get an external_executor_id.
+        assert returned_tis[1].id == ti_regular.id
+        assert returned_tis[1].external_executor_id is None
+
         session.rollback()
 
     @pytest.mark.parametrize("state", [State.FAILED, State.SUCCESS])

Reply via email to