HartigFries opened a new issue, #67891:
URL: https://github.com/apache/airflow/issues/67891
### Under which category would you file this issue?
Providers
### Apache Airflow version
3.2.1, 3.1.8, 3.1.0
### What happened and how to reproduce it?
**Issue Description**
With KubernetesExecutor, the scheduler crashes with
sqlalchemy.orm.exc.DetachedInstanceError inside adopt_or_reset_orphaned_tasks()
and then enters a crash loop until the metadata DB is cleaned by hand.
```
Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
...
File ".../airflow/jobs/scheduler_job_runner.py", line 2288, in
adopt_or_reset_orphaned_tasks
reset_tis_message.append(repr(ti))
File ".../airflow/models/taskinstance.py", line 940, in __repr__
return prefix + f"[{self.state}]>"
...
File ".../sqlalchemy/orm/strategies.py", line 536, in _load_for_state
raise orm_exc.DetachedInstanceError(
sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at
0x...> is not bound to a Session; deferred load operation of attribute 'state'
cannot proceed
```
Root cause is a nested scoped session opened inside the orphan-adoption
transaction:
1. adopt_or_reset_orphaned_tasks() runs under @provide_session, which opens
a thread-local scoped session (airflow.settings.Session). It loads the orphan
TaskInstances with load_only(dag_id, task_id, run_id), so state / map_index /
executor are deferred.
2. It calls executor.try_adopt_task_instances(tis). For the
KubernetesExecutor this reaches _adopt_completed_pods() →
_alive_other_scheduler_job_ids() (added in 10.17.1 by #66400), which does:
```python
with create_session() as session: # scoped=True by default
...
```
3. Because Session is a thread-local scoped_session, this inner session is
the same object as the outer one. When the with block exits, Session.close()
runs expunge_all() and detaches the orphan TaskInstance rows still owned by the
outer scope.
4. Back in the outer scope, line 2288 builds a log message via repr(ti),
which reads the deferred self.state. The object is now detached →
DetachedInstanceError → unhandled → scheduler exits.
The triggering condition (adoptable orphan TIs whose queued_by_job is no
longer running) stays in the DB, so the restarted scheduler hits the same path
and crashes again causing a self-sustaining loop under continuous
KubernetesExecutor load.
This is a regression from #66396 / #66400 (the multi-scheduler completed-pod
thrash fix), which introduced _alive_other_scheduler_job_ids(). The older
_adopt_completed_pods() used only the kube client and opened no DB session, so
this never happened.
**How to Reproduce?**
Live, read-only repro inside a scheduler pod (SELECT-only; changes no data):
```python
from airflow.settings import Session
from airflow.utils.session import create_session
from airflow.models.taskinstance import TaskInstance as TI
from sqlalchemy import select
from sqlalchemy.orm import load_only
def load_one(sess):
return sess.scalars(
select(TI).options(load_only(TI.dag_id, TI.task_id,
TI.run_id)).limit(1)
).first()
# ---- TEST 1: buggy path (nested scoped session) ----
s = Session()
ti = load_one(s)
with create_session() as s2:
pass
print("inner session IS outer scoped session?", s2 is s)
try:
print("ti.state ->", ti.state, " => NO crash (bug did NOT reproduce)")
except Exception as e:
print("ti.state -> RAISED", type(e).__name__, " => bug reproduced")
Session.remove() # reset thread-local scoped session before next test
# ---- TEST 2: proposed fix (nested NON-scoped session) ----
s = Session()
ti2 = load_one(s)
with create_session(scoped=False) as s3:
pass
print("inner session IS outer scoped session?", s3 is s)
try:
print("ti2.state ->", ti2.state, " => OK, fix keeps it attached")
except Exception as e:
print("ti2.state -> RAISED", type(e).__name__)
```
### What you think should happen instead?
adopt_or_reset_orphaned_tasks() should reset/adopt orphaned task instances
and log them without the scheduler dying, and _alive_other_scheduler_job_ids()
should not tear down the caller's session.
The minimal fix is to open the helper's session as non-scoped, so its
close() cannot expunge the scheduler's scoped session. In
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
(line 724 in 10.17.1; locate by context on main):
```diff
@@ -721,7 +721,7 @@
timeout = conf.getint("scheduler",
"scheduler_health_check_threshold")
cutoff = timezone.utcnow() - timedelta(seconds=timeout)
- with create_session() as session:
+ with create_session(scoped=False) as session:
```
create_session(scoped=False) uses settings.NonScopedSession (an independent
session with expire_on_commit=False), so it gives this isolated read its own
session while leaving the scheduler's scoped session and its in-flight
TaskInstances untouched. Single-scheduler behavior is unchanged. I'm willing to
open a PR with this change plus a regression test. (A larger alternative
[threading the caller's session down through try_adopt_task_instances so the
read reuses it] would change the BaseExecutor interface across executors and is
out of scope here.)
### Operating System
Linux
### Deployment
Other 3rd-party Helm chart
### Apache Airflow Provider(s)
cncf-kubernetes
### Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==10.17.1
### Official Helm Chart version
Not Applicable
### Kubernetes Version
_No response_
### Helm Chart configuration
```Helm
- name: AIRFLOW__CORE__EXECUTOR
value: "KubernetesExecutor,CeleryExecutor"
```
### Docker Image customizations
Not Applicable
### Anything else?
Text formatted using Claude AI
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]