kaxil commented on issue #56763:
URL: https://github.com/apache/airflow/issues/56763#issuecomment-3415698520
The following should most likely fix it:
```diff
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index ddab7dd971..4546a07921 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -110,6 +110,31 @@ TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in
queued reschedule"
""":meta private:"""
+def _eager_load_dag_run_for_validation():
+ """
+ Eager-load DagRun relations required for execution API datamodel
validation.
+
+ When building TaskCallbackRequest with
DRDataModel.model_validate(ti.dag_run),
+ the consumed_asset_events collection and nested asset/source_aliases
must be
+ preloaded to avoid DetachedInstanceError after the session closes.
+
+ Returns a tuple of two load options:
+ - Asset loader: TI.dag_run → consumed_asset_events → asset
+ - Alias loader: TI.dag_run → consumed_asset_events → source_aliases
+
+ Example usage::
+
+ asset_loader, alias_loader = _eager_load_dag_run_for_validation()
+ query = select(TI).options(asset_loader).options(alias_loader)
+ """
+ # Traverse TI → dag_run → consumed_asset_events once, then branch to
asset/aliases
+ base =
selectinload(TI.dag_run).selectinload(DagRun.consumed_asset_events)
+ return (
+ base.selectinload(AssetEvent.asset),
+ base.selectinload(AssetEvent.source_aliases),
+ )
+
+
def _get_current_dag(dag_id: str, session: Session) -> SerializedDAG | None:
serdag = SerializedDagModel.get(dag_id=dag_id, session=session) #
grabs the latest version
if not serdag:
@@ -806,11 +831,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# Check state of finished tasks
filter_for_tis = TI.filter_for_tis(tis_with_right_state)
+ asset_loader, alias_loader = _eager_load_dag_run_for_validation()
query = (
select(TI)
.where(filter_for_tis)
.options(selectinload(TI.dag_model))
-
.options(joinedload(TI.dag_run).selectinload(DagRun.consumed_asset_events))
+ .options(asset_loader)
+ .options(alias_loader)
.options(joinedload(TI.dag_run).selectinload(DagRun.created_dag_version))
.options(joinedload(TI.dag_version))
)
@@ -2375,10 +2402,12 @@ class SchedulerJobRunner(BaseJobRunner,
LoggingMixin):
def _find_task_instances_without_heartbeats(self, *, session: Session)
-> list[TI]:
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() -
timedelta(seconds=self._task_instance_heartbeat_timeout_secs)
+ asset_loader, alias_loader = _eager_load_dag_run_for_validation()
task_instances_without_heartbeats = session.scalars(
select(TI)
.options(selectinload(TI.dag_model))
-
.options(selectinload(TI.dag_run).selectinload(DagRun.consumed_asset_events))
+ .options(asset_loader)
+ .options(alias_loader)
.options(selectinload(TI.dag_version))
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
.join(DM, TI.dag_id == DM.dag_id)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 7c03dd4fa1..ea47b62601 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -608,6 +608,66 @@ class TestSchedulerJob:
scheduler_job.executor.callback_sink.send.assert_not_called()
mock_stats_incr.assert_not_called()
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
+ def test_process_executor_events_with_asset_events(self,
mock_stats_incr, session, dag_maker):
+ """
+ Test that _process_executor_events handles asset events without
DetachedInstanceError.
+
+ Regression test for scheduler crashes when task callbacks are built
with
+ consumed_asset_events that weren't eager-loaded.
+ """
+ asset1 = Asset(uri="test://asset1", name="test_asset_executor",
group="test_group")
+ asset_model = AssetModel(name=asset1.name, uri=asset1.uri,
group=asset1.group)
+ session.add(asset_model)
+ session.flush()
+
+ with dag_maker(dag_id="test_executor_events_with_assets",
schedule=[asset1], fileloc="/test_path1/"):
+ EmptyOperator(task_id="dummy_task", on_failure_callback=lambda
ctx: None)
+
+ dag = dag_maker.dag
+ scheduler_dag = sync_dag_to_db(dag)
+ dag_v = DagVersion.get_latest_version(dag.dag_id)
+
+ dr = dag_maker.create_dagrun()
+
+ # Create asset event and attach to dag run
+ asset_event = AssetEvent(
+ asset_id=asset_model.id,
+ source_task_id="upstream_task",
+ source_dag_id="upstream_dag",
+ source_run_id="upstream_run",
+ source_map_index=-1,
+ )
+ session.add(asset_event)
+ session.flush()
+ dr.consumed_asset_events.append(asset_event)
+ session.flush()
+
+ executor = MockExecutor(do_update=False)
+ scheduler_job = Job(executor=executor)
+ self.job_runner = SchedulerJobRunner(scheduler_job)
+
+ ti1 = dr.get_task_instance("dummy_task")
+ ti1.state = State.QUEUED
+ session.merge(ti1)
+ session.commit()
+
+ executor.event_buffer[ti1.key] = State.FAILED, None
+
+ # This should not raise DetachedInstanceError
+ self.job_runner._process_executor_events(executor=executor,
session=session)
+
+ ti1.refresh_from_db(session=session)
+ assert ti1.state == State.FAILED
+
+ # Verify callback was created with asset event data
+ scheduler_job.executor.callback_sink.send.assert_called_once()
+ callback_request =
scheduler_job.executor.callback_sink.send.call_args.args[0]
+ assert callback_request.context_from_server is not None
+ assert
len(callback_request.context_from_server.dag_run.consumed_asset_events) == 1
+ assert
callback_request.context_from_server.dag_run.consumed_asset_events[0].asset.uri
== asset1.uri
+
def test_execute_task_instances_is_paused_wont_execute(self, session,
dag_maker):
dag_id =
"SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute"
task_id_1 = "dummy_task"
@@ -6186,6 +6246,98 @@ class TestSchedulerJob:
"External Executor Id": "abcdefg",
}
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def
test_find_and_purge_task_instances_without_heartbeats_with_asset_events(
+ self, session, dag_maker, create_dagrun
+ ):
+ """
+ Test that heartbeat purge succeeds when DagRun has
consumed_asset_events.
+
+ Regression test for DetachedInstanceError when building
TaskCallbackRequest
+ with asset event data after session expunge.
+ """
+ asset1 = Asset(uri="test://asset1", name="test_asset",
group="test_group")
+ asset_model = AssetModel(name=asset1.name, uri=asset1.uri,
group=asset1.group)
+ session.add(asset_model)
+ session.flush()
+
+ with dag_maker(dag_id="test_heartbeat_with_assets",
schedule=[asset1]):
+ EmptyOperator(task_id="dummy_task")
+
+ dag = dag_maker.dag
+ scheduler_dag = sync_dag_to_db(dag)
+ dag_v = DagVersion.get_latest_version(dag.dag_id)
+
+ data_interval =
infer_automated_data_interval(scheduler_dag.timetable, DEFAULT_LOGICAL_DATE)
+ dag_run = create_dagrun(
+ scheduler_dag,
+ logical_date=DEFAULT_DATE,
+ run_type=DagRunType.SCHEDULED,
+ data_interval=data_interval,
+ )
+
+ # Create asset alias and event with full relationships
+ asset_alias = AssetAliasModel(name="test_alias", group="test_group")
+ session.add(asset_alias)
+ session.flush()
+
+ asset_event = AssetEvent(
+ asset_id=asset_model.id,
+ source_task_id="upstream_task",
+ source_dag_id="upstream_dag",
+ source_run_id="upstream_run",
+ source_map_index=-1,
+ )
+ session.add(asset_event)
+ session.flush()
+
+ # Attach alias to event and event to dag run
+ asset_event.source_aliases.append(asset_alias)
+ dag_run.consumed_asset_events.append(asset_event)
+ session.flush()
+
+ executor = MockExecutor()
+ scheduler_job = Job(executor=executor)
+ with
mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as
loader_mock:
+ loader_mock.return_value = executor
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+ ti = dag_run.get_task_instance("dummy_task")
+ assert ti is not None # sanity check: dag_maker.create_dagrun
created the TI
+
+ ti.state = State.RUNNING
+ ti.last_heartbeat_at = timezone.utcnow() - timedelta(minutes=6)
+ ti.start_date = timezone.utcnow() - timedelta(minutes=10)
+ ti.queued_by_job_id = scheduler_job.id
+ ti.dag_version = dag_v
+ session.merge(ti)
+ session.flush()
+
+ executor.running.add(ti.key)
+
+ tis_without_heartbeats =
self.job_runner._find_task_instances_without_heartbeats(session=session)
+ assert len(tis_without_heartbeats) == 1
+ ti_from_query = tis_without_heartbeats[0]
+ ti_key = ti_from_query.key
+
+ # Detach all ORM objects to mirror scheduler behaviour after
session closes
+ session.expunge_all()
+
+ # This should not raise DetachedInstanceError now that eager
loads are in place
+ self.job_runner._purge_task_instances_without_heartbeats(
+ tis_without_heartbeats, session=session
+ )
+ assert ti_key not in executor.running
+
+ executor.callback_sink.send.assert_called_once()
+ callback_request = executor.callback_sink.send.call_args.args[0]
+ assert callback_request.context_from_server is not None
+ assert
len(callback_request.context_from_server.dag_run.consumed_asset_events) == 1
+ consumed_event =
callback_request.context_from_server.dag_run.consumed_asset_events[0]
+ assert consumed_event.asset.uri == asset1.uri
+ assert len(consumed_event.source_aliases) == 1
+ assert consumed_event.source_aliases[0].name == "test_alias"
+
@mock.patch.object(settings, "USE_JOB_SCHEDULE", False)
def run_scheduler_until_dagrun_terminal(self):
"""
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]