This is an automated email from the ASF dual-hosted git repository.

ashb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8395c29ffd5 Fix scheduler orphaned task reset logging crash (#67822)
8395c29ffd5 is described below

commit 8395c29ffd518e2daf7f23094d0c76e95adb6256
Author: Revanth <[email protected]>
AuthorDate: Mon Jun 1 07:13:48 2026 -0500

    Fix scheduler orphaned task reset logging crash (#67822)
    
    The scheduler orphaned-task adoption path loaded task instances with a 
narrow load_only(...) set and then called repr(ti) while building the reset log 
message. TaskInstance.__repr__ reads map_index and state, so detached
    instances with deferred columns could raise DetachedInstanceError and crash 
the scheduler on restart.
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 12 ++++++-
 airflow-core/src/airflow/models/taskinstance.py    | 21 +++++++++---
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 36 ++++++++++++++++++-
 .../tests/unit/models/test_taskinstance.py         | 40 +++++++++++++++++++++-
 4 files changed, 102 insertions(+), 7 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index f61e20840ed..ff9e5921d44 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2851,7 +2851,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                         .where(Job.state.is_distinct_from(JobState.RUNNING))
                         .join(TI.dag_run)
                         .where(DagRun.state == DagRunState.RUNNING)
-                        .options(load_only(TI.dag_id, TI.task_id, TI.run_id, 
TI.external_executor_id))
+                        .options(
+                            load_only(
+                                TI.id,
+                                TI.dag_id,
+                                TI.task_id,
+                                TI.run_id,
+                                TI.map_index,
+                                TI.state,
+                                TI.external_executor_id,
+                            )
+                        )
                     )
 
                     # Lock these rows, so that another scheduler can't try and 
adopt these too
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index f9d498c1ab1..3469cf4acb8 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -66,6 +66,7 @@ from sqlalchemy.ext.hybrid import hybrid_property
 from sqlalchemy.ext.mutable import MutableDict
 from sqlalchemy.orm import Mapped, lazyload, mapped_column, reconstructor, 
relationship
 from sqlalchemy.orm.attributes import NO_VALUE, set_committed_value
+from sqlalchemy.orm.exc import DetachedInstanceError, ObjectDeletedError
 
 from airflow import settings
 from airflow._shared.observability.metrics import stats
@@ -1158,10 +1159,22 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
                     yield dep_status
 
     def __repr__(self) -> str:
-        prefix = f"<TaskInstance: {self.dag_id}.{self.task_id} {self.run_id} "
-        if self.map_index != -1:
-            prefix += f"map_index={self.map_index} "
-        return prefix + f"[{self.state}] ti_id={self.id}>"
+        # ``__repr__`` is used in logging and must never raise. Real values 
are printed
+        # whenever they can be read (including a normal lazy-load on an 
*attached* instance);
+        # we only fall back to a placeholder when SQLAlchemy cannot produce 
the value at all:
+        # a deferred column on a *detached* instance (DetachedInstanceError), 
or a row deleted
+        # out from under an expired instance (ObjectDeletedError).
+        def field(name: str) -> Any:
+            try:
+                return getattr(self, name)
+            except (DetachedInstanceError, ObjectDeletedError):
+                return "<deferred>"
+
+        prefix = f"<TaskInstance: {field('dag_id')}.{field('task_id')} 
{field('run_id')} "
+        map_index = field("map_index")
+        if map_index != -1:
+            prefix += f"map_index={map_index} "
+        return prefix + f"[{field('state')}] ti_id={field('id')}>"
 
     def next_retry_datetime(self):
         """
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 3b576271610..fc267802252 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -36,7 +36,7 @@ import pendulum
 import psutil
 import pytest
 import time_machine
-from sqlalchemy import delete, func, select, update
+from sqlalchemy import delete, func, inspect, select, update
 from sqlalchemy.dialects import mysql
 from sqlalchemy.orm import joinedload
 
@@ -3297,6 +3297,40 @@ class TestSchedulerJob:
         ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
         assert ti2.state == State.NONE, "Tasks run by Backfill Jobs should be 
treated the same"
 
+    def test_adopt_or_reset_orphaned_tasks_loads_state_for_reset_logging(
+        self, dag_maker, session, mock_executor
+    ):
+        with 
dag_maker("test_adopt_or_reset_orphaned_tasks_loads_state_for_reset_logging", 
session=session):
+            op1 = EmptyOperator(task_id="op1")
+
+        scheduler_job = Job()
+        session.add(scheduler_job)
+        session.flush()
+
+        dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.QUEUED
+        ti.queued_by_job_id = scheduler_job.id
+        session.commit()
+        session.expunge_all()
+
+        def refuse_adoption(tis):
+            assert len(tis) == 1
+            # ``repr(ti)`` in the reset path reads both ``state`` and 
``map_index``; the query
+            # must load both so the reset log stays accurate (and never 
lazy-loads on detach).
+            unloaded = inspect(tis[0]).unloaded
+            assert "state" not in unloaded
+            assert "map_index" not in unloaded
+            # repr must render the real state, not the ``<deferred>`` fallback.
+            assert "queued" in repr(tis[0])
+            return tis
+
+        mock_executor.try_adopt_task_instances.side_effect = refuse_adoption
+
+        self.job_runner = SchedulerJobRunner(job=Job(), num_runs=0)
+
+        assert self.job_runner.adopt_or_reset_orphaned_tasks(session=session) 
== 1
+
     def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker, 
mock_executors):
         """
         Test that with multiple executors configured tasks are sorted 
correctly and handed off to the
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index dae7f419ff4..99cdd38f4f3 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -33,8 +33,9 @@ import uuid6
 from opentelemetry import trace
 from opentelemetry.sdk.trace import TracerProvider
 from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapPropagator
-from sqlalchemy import delete, func, select
+from sqlalchemy import delete, func, inspect as sa_inspect, select
 from sqlalchemy.exc import IntegrityError
+from sqlalchemy.orm import load_only
 from sqlalchemy.orm.attributes import set_committed_value
 
 from airflow import settings
@@ -3897,3 +3898,40 @@ def 
test_clear_task_instances_preserves_detail_level(dag_maker, session):
     new_ctx = TraceContextTextMapPropagator().extract(dag_run.context_carrier)
     span = trace.get_current_span(new_ctx)
     assert get_task_span_detail_level(span) == 2
+
+
[email protected]_test
+def test_task_instance_repr_does_not_raise_for_deferred_columns(dag_maker, 
session):
+    """``TaskInstance.__repr__`` must survive *any* deferred column it reads.
+
+    Regression test for issue #67813: the scheduler's orphaned-task adoption 
loaded
+    TaskInstances via ``load_only`` and then called ``repr(ti)`` on detached 
instances.
+    ``__repr__`` reads ``map_index`` and ``state`` (among others); on a 
detached instance
+    a column that was not loaded raises ``DetachedInstanceError``. 
``__repr__`` is used in
+    logging and must degrade gracefully — printing ``<deferred>`` — instead of 
crashing.
+    """
+    with dag_maker("test_repr_deferred_columns", session=session):
+        EmptyOperator(task_id="op1")
+    dr = dag_maker.create_dagrun()
+    ti = dr.get_task_instance(task_id="op1", session=session)
+    ti.state = State.QUEUED
+    session.commit()
+    ti_id = ti.id
+
+    # Reload the row with ``map_index`` and ``state`` left as deferred 
columns, then detach
+    # the instance so that touching them would otherwise require a 
(now-impossible) DB load.
+    session.expunge_all()
+    reloaded = session.scalar(
+        select(TaskInstance)
+        .where(TaskInstance.id == ti_id)
+        .options(load_only(TaskInstance.dag_id, TaskInstance.task_id, 
TaskInstance.run_id))
+    )
+    session.expunge(reloaded)
+    unloaded = sa_inspect(reloaded).unloaded
+    assert "map_index" in unloaded
+    assert "state" in unloaded
+
+    result = repr(reloaded)  # would raise DetachedInstanceError without the 
guard
+
+    assert "<deferred>" in result
+    assert "[queued]" not in result

Reply via email to