Copilot commented on code in PR #64025:
URL: https://github.com/apache/airflow/pull/64025#discussion_r3025366367
##########
airflow-core/src/airflow/ti_deps/dep_context.py:
##########
@@ -89,11 +89,15 @@ def ensure_finished_tis(self, dag_run: DagRun, session:
Session) -> list[TaskIns
"""
Ensure finished_tis is populated if it's currently None, which allows
running tasks without dag_run.
- :param dag_run: The DagRun for which to find finished tasks
+ :param dag_run: The DagRun for which to find finished tasks
:return: A list of all the finished tasks of this DAG and logical_date
Review Comment:
Docstring indentation is inconsistent: the `:return:` line is misaligned
(extra leading space) compared to `:param dag_run:`. Please align the docstring
fields so they render correctly in generated docs.
```suggestion
:return: A list of all the finished tasks of this DAG and
logical_date
```
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -800,6 +800,8 @@ def fetch_task_instances(
task_ids: list[str] | None = None,
state: Iterable[TaskInstanceState | None] | None = None,
session: Session = NEW_SESSION,
+ *,
+ refresh_from_db: bool = False,
) -> list[TI]:
"""Return the task instances for this dag run."""
Review Comment:
`DagRun.fetch_task_instances()` / `get_task_instances()` now add a
`refresh_from_db` keyword-only parameter. Since these methods are widely used
(and callable by plugins/providers), this is a public-ish API change. Please
ensure the docstring explicitly documents what “refresh” means (e.g., uses
`populate_existing` to overwrite identity-map values) and when callers should
use it—otherwise it’s hard to reason about correctness/perf tradeoffs.
```suggestion
"""
Return the task instances for the given DAG run.
When ``refresh_from_db`` is ``False`` (the default), SQLAlchemy will
return any
existing ``TaskInstance`` objects already present in the session's
identity map
without overwriting their in-memory state. This is usually what you
want for
short‑lived sessions created per request/operation, and it avoids
the extra work
of re-populating already-loaded ORM instances.
When ``refresh_from_db`` is ``True``, the underlying query is
executed with
``execution_options(populate_existing=True)``. This instructs
SQLAlchemy to
overwrite the attributes of any ``TaskInstance`` objects that are
already present
in the identity map with the current values from the database. Use
this when:
* You are reusing a long‑lived session and may already have
``TaskInstance``
rows for this DAG run loaded, and you need to ensure they reflect
the latest
committed database state (e.g. after other code has modified TIs
in the same
session or in a different session/process).
* Correctness (reading the most up‑to‑date state) is more important
than the
additional overhead of refreshing ORM instances.
Leave ``refresh_from_db`` at its default of ``False`` when you
either know the
session does not already contain the relevant ``TaskInstance``
objects, or you
are willing to tolerate potentially stale in‑memory values in
exchange for
slightly better performance.
"""
```
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -2254,6 +2254,68 @@ def
test_schedule_tis_only_one_scheduler_update_succeeds_when_competing(dag_make
assert refreshed_ti.try_number == 1
+def
test_task_instance_scheduling_decisions_refresh_finished_tis_before_setting_upstream_failed(
+ dag_maker, session
+):
+ with dag_maker(session=session,
dag_id="refresh_finished_tis_before_upstream_failed", serialized=True):
+ fail_task = EmptyOperator(task_id="fail_task")
+ t0 = EmptyOperator(task_id="t0")
+ fail_task >> t0
+
+ dag = dag_maker.serialized_dag
+ dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
state=DagRunState.RUNNING)
+ dr.dag = dag
+
+ tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session,
state=State.task_states)}
+ tis["fail_task"].state = TaskInstanceState.FAILED
+ session.commit()
+
+ with create_session() as other_session:
+ dag.set_task_instance_state(
+ task_id="fail_task",
+ run_id=dr.run_id,
+ state=TaskInstanceState.SUCCESS,
+ session=other_session,
+ )
+
+ decision = dr.task_instance_scheduling_decisions(session=session)
+ assert [ti.task_id for ti in decision.schedulable_tis] == ["t0"]
+ assert tis["fail_task"].state == TaskInstanceState.SUCCESS
+ assert tis["t0"].state is None
+
+
+def
test_task_instance_scheduling_decisions_refresh_finished_tis_after_api_clear(dag_maker,
session):
+ with dag_maker(session=session,
dag_id="refresh_finished_tis_after_api_clear", serialized=True):
+ fail_task = EmptyOperator(task_id="fail_task")
+ t0 = EmptyOperator(task_id="t0")
+ t1 = EmptyOperator(task_id="t1")
+ t2 = EmptyOperator(task_id="t2")
+ fail_task >> t0 >> t1 >> t2
+
+ dag = dag_maker.serialized_dag
+ dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
state=DagRunState.RUNNING)
+ dr.dag = dag
+
+ tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session,
state=State.task_states)}
+ tis["fail_task"].state = TaskInstanceState.FAILED
+ tis["t0"].state = TaskInstanceState.UPSTREAM_FAILED
+ session.commit()
+
+ with create_session() as other_session:
+ dag.set_task_instance_state(
+ task_id="fail_task",
+ run_id=dr.run_id,
+ state=TaskInstanceState.SUCCESS,
+ session=other_session,
+ )
+
+ decision = dr.task_instance_scheduling_decisions(session=session)
+ assert [ti.task_id for ti in decision.schedulable_tis] == ["t0"]
+ assert tis["fail_task"].state == TaskInstanceState.SUCCESS
+ assert tis["t0"].state is None
+ assert tis["t1"].state is None
Review Comment:
These new regression tests mutate TI states by direct attribute assignment
(e.g. `tis[...].state = ...`) rather than using `TaskInstance.set_state()`. The
test intent is clearer and closer to real scheduler/API behavior if state
changes go through `set_state()` or the same helper used in the API patch path,
especially since `set_state()` also maintains timestamps/duration.
##########
airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py:
##########
@@ -214,3 +215,39 @@ def test_unmapped_parent_skip_mapped_downstream(session,
dag_maker):
assert len(list(dep.get_dep_statuses(tis["op2"], session, DepContext())))
== 1
assert not dep.is_met(tis["op2"], session)
assert tis["op2"].state == State.SKIPPED
+
+
+def
test_cleared_skipmixin_parent_does_not_skip_with_stale_finished_tis(session,
dag_maker):
+ start_date = pendulum.datetime(2020, 1, 1)
+ with dag_maker(
+ "test_cleared_skipmixin_parent_does_not_skip_with_stale_finished_tis",
+ schedule=None,
+ start_date=start_date,
+ session=session,
+ ):
+ op1 = BranchPythonOperator(task_id="op1", python_callable=lambda:
"op3")
+ op2 = EmptyOperator(task_id="op2")
+ op3 = EmptyOperator(task_id="op3")
+ op1 >> [op2, op3]
+
+ dagrun = dag_maker.create_dagrun(run_type=DagRunType.MANUAL,
state=State.RUNNING)
+ tis = {ti.task_id: ti for ti in dagrun.task_instances}
+ run_task_instance(tis["op1"], op1)
+ session.commit()
+
+ with create_session() as other_session:
+ other_ti = other_session.scalar(
+ select(TaskInstance).where(
+ TaskInstance.dag_id == dagrun.dag_id,
+ TaskInstance.run_id == dagrun.run_id,
+ TaskInstance.task_id == "op1",
+ )
+ )
+ assert other_ti is not None
+ other_ti.state = State.NONE
+ other_session.flush()
Review Comment:
This test simulates a cross-session update, but it updates `op1.state` to
`State.NONE` without updating the `state` *and* without clearing related fields
like `end_date`/`duration` that `TaskInstance.set_state(None, ...)` would
normally manage. To better match real “clear” semantics (and avoid inconsistent
rows that might affect other deps), consider using the same API used by clear
operations (e.g., `ti.set_state(None, ...)` or the relevant clear helper)
rather than assigning `state` directly.
```suggestion
other_ti.set_state(None, session=other_session)
```
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1238,7 +1247,7 @@ def recalculate(self) -> _UnfinishedStates:
@provide_session
def task_instance_scheduling_decisions(self, session: Session =
NEW_SESSION) -> TISchedulingDecision:
- tis = self.get_task_instances(session=session, state=State.task_states)
+ tis = self.get_task_instances(session=session,
state=State.task_states, refresh_from_db=True)
self.log.debug("number of tis tasks for %s: %s task(s)", self,
len(tis))
Review Comment:
`task_instance_scheduling_decisions()` now always loads TIs with
`refresh_from_db=True`, which forces SQLAlchemy to overwrite any already-loaded
instances in the session. This is on a scheduler hot path; please consider
limiting refresh to the specific dependency-evaluation snapshot that needs it
(e.g. only finished TIs via `DepContext.ensure_finished_tis`) or add a brief
comment explaining why always-refreshing the full `State.task_states` set is
required and acceptable for performance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]