Copilot commented on code in PR #64025:
URL: https://github.com/apache/airflow/pull/64025#discussion_r3025366367


##########
airflow-core/src/airflow/ti_deps/dep_context.py:
##########
@@ -89,11 +89,15 @@ def ensure_finished_tis(self, dag_run: DagRun, session: 
Session) -> list[TaskIns
         """
         Ensure finished_tis is populated if it's currently None, which allows 
running tasks without dag_run.
 
-         :param dag_run: The DagRun for which to find finished tasks
+        :param dag_run: The DagRun for which to find finished tasks
          :return: A list of all the finished tasks of this DAG and logical_date

Review Comment:
   Docstring indentation is inconsistent: the `:return:` line is misaligned 
(extra leading space) compared to `:param dag_run:`. Please align the docstring 
fields so they render correctly in generated docs.
   ```suggestion
           :return: A list of all the finished tasks of this DAG and 
logical_date
   ```



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -800,6 +800,8 @@ def fetch_task_instances(
         task_ids: list[str] | None = None,
         state: Iterable[TaskInstanceState | None] | None = None,
         session: Session = NEW_SESSION,
+        *,
+        refresh_from_db: bool = False,
     ) -> list[TI]:
         """Return the task instances for this dag run."""

Review Comment:
   `DagRun.fetch_task_instances()` / `get_task_instances()` now add a 
`refresh_from_db` keyword-only parameter. Since these methods are widely used 
(and callable by plugins/providers), this is a public-ish API change. Please 
ensure the docstring explicitly documents what “refresh” means (e.g., uses 
`populate_existing` to overwrite identity-map values) and when callers should 
use it—otherwise it’s hard to reason about correctness/perf tradeoffs.
   ```suggestion
           """
           Return the task instances for the given DAG run.
   
           When ``refresh_from_db`` is ``False`` (the default), SQLAlchemy will 
return any
           existing ``TaskInstance`` objects already present in the session's 
identity map
           without overwriting their in-memory state. This is usually what you 
want for
           short‑lived sessions created per request/operation, and it avoids 
the extra work
           of re-populating already-loaded ORM instances.
   
           When ``refresh_from_db`` is ``True``, the underlying query is 
executed with
           ``execution_options(populate_existing=True)``. This instructs 
SQLAlchemy to
           overwrite the attributes of any ``TaskInstance`` objects that are 
already present
           in the identity map with the current values from the database. Use 
this when:
   
           * You are reusing a long‑lived session and may already have 
``TaskInstance``
             rows for this DAG run loaded, and you need to ensure they reflect 
the latest
             committed database state (e.g. after other code has modified TIs 
in the same
             session or in a different session/process).
           * Correctness (reading the most up‑to‑date state) is more important 
than the
             additional overhead of refreshing ORM instances.
   
           Leave ``refresh_from_db`` at its default of ``False`` when you 
either know the
           session does not already contain the relevant ``TaskInstance`` 
objects, or you
           are willing to tolerate potentially stale in‑memory values in 
exchange for
           slightly better performance.
           """
   ```



##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -2254,6 +2254,68 @@ def 
test_schedule_tis_only_one_scheduler_update_succeeds_when_competing(dag_make
     assert refreshed_ti.try_number == 1
 
 
+def 
test_task_instance_scheduling_decisions_refresh_finished_tis_before_setting_upstream_failed(
+    dag_maker, session
+):
+    with dag_maker(session=session, 
dag_id="refresh_finished_tis_before_upstream_failed", serialized=True):
+        fail_task = EmptyOperator(task_id="fail_task")
+        t0 = EmptyOperator(task_id="t0")
+        fail_task >> t0
+
+    dag = dag_maker.serialized_dag
+    dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=DagRunState.RUNNING)
+    dr.dag = dag
+
+    tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session, 
state=State.task_states)}
+    tis["fail_task"].state = TaskInstanceState.FAILED
+    session.commit()
+
+    with create_session() as other_session:
+        dag.set_task_instance_state(
+            task_id="fail_task",
+            run_id=dr.run_id,
+            state=TaskInstanceState.SUCCESS,
+            session=other_session,
+        )
+
+    decision = dr.task_instance_scheduling_decisions(session=session)
+    assert [ti.task_id for ti in decision.schedulable_tis] == ["t0"]
+    assert tis["fail_task"].state == TaskInstanceState.SUCCESS
+    assert tis["t0"].state is None
+
+
+def 
test_task_instance_scheduling_decisions_refresh_finished_tis_after_api_clear(dag_maker,
 session):
+    with dag_maker(session=session, 
dag_id="refresh_finished_tis_after_api_clear", serialized=True):
+        fail_task = EmptyOperator(task_id="fail_task")
+        t0 = EmptyOperator(task_id="t0")
+        t1 = EmptyOperator(task_id="t1")
+        t2 = EmptyOperator(task_id="t2")
+        fail_task >> t0 >> t1 >> t2
+
+    dag = dag_maker.serialized_dag
+    dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=DagRunState.RUNNING)
+    dr.dag = dag
+
+    tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session, 
state=State.task_states)}
+    tis["fail_task"].state = TaskInstanceState.FAILED
+    tis["t0"].state = TaskInstanceState.UPSTREAM_FAILED
+    session.commit()
+
+    with create_session() as other_session:
+        dag.set_task_instance_state(
+            task_id="fail_task",
+            run_id=dr.run_id,
+            state=TaskInstanceState.SUCCESS,
+            session=other_session,
+        )
+
+    decision = dr.task_instance_scheduling_decisions(session=session)
+    assert [ti.task_id for ti in decision.schedulable_tis] == ["t0"]
+    assert tis["fail_task"].state == TaskInstanceState.SUCCESS
+    assert tis["t0"].state is None
+    assert tis["t1"].state is None

Review Comment:
   These new regression tests mutate TI states by direct attribute assignment 
(e.g. `tis[...].state = ...`) rather than using `TaskInstance.set_state()`. The 
test intent is clearer and closer to real scheduler/API behavior if state 
changes go through `set_state()` or the same helper used in the API patch path, 
especially since `set_state()` also maintains timestamps/duration.



##########
airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py:
##########
@@ -214,3 +215,39 @@ def test_unmapped_parent_skip_mapped_downstream(session, 
dag_maker):
     assert len(list(dep.get_dep_statuses(tis["op2"], session, DepContext()))) 
== 1
     assert not dep.is_met(tis["op2"], session)
     assert tis["op2"].state == State.SKIPPED
+
+
+def 
test_cleared_skipmixin_parent_does_not_skip_with_stale_finished_tis(session, 
dag_maker):
+    start_date = pendulum.datetime(2020, 1, 1)
+    with dag_maker(
+        "test_cleared_skipmixin_parent_does_not_skip_with_stale_finished_tis",
+        schedule=None,
+        start_date=start_date,
+        session=session,
+    ):
+        op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: 
"op3")
+        op2 = EmptyOperator(task_id="op2")
+        op3 = EmptyOperator(task_id="op3")
+        op1 >> [op2, op3]
+
+    dagrun = dag_maker.create_dagrun(run_type=DagRunType.MANUAL, 
state=State.RUNNING)
+    tis = {ti.task_id: ti for ti in dagrun.task_instances}
+    run_task_instance(tis["op1"], op1)
+    session.commit()
+
+    with create_session() as other_session:
+        other_ti = other_session.scalar(
+            select(TaskInstance).where(
+                TaskInstance.dag_id == dagrun.dag_id,
+                TaskInstance.run_id == dagrun.run_id,
+                TaskInstance.task_id == "op1",
+            )
+        )
+        assert other_ti is not None
+        other_ti.state = State.NONE
+        other_session.flush()

Review Comment:
   This test simulates a cross-session update, but it updates `op1.state` to 
`State.NONE` without updating the `state` *and* without clearing related fields 
like `end_date`/`duration` that `TaskInstance.set_state(None, ...)` would 
normally manage. To better match real “clear” semantics (and avoid inconsistent 
rows that might affect other deps), consider using the same API used by clear 
operations (e.g., `ti.set_state(None, ...)` or the relevant clear helper) 
rather than assigning `state` directly.
   ```suggestion
           other_ti.set_state(None, session=other_session)
   ```



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1238,7 +1247,7 @@ def recalculate(self) -> _UnfinishedStates:
 
     @provide_session
     def task_instance_scheduling_decisions(self, session: Session = 
NEW_SESSION) -> TISchedulingDecision:
-        tis = self.get_task_instances(session=session, state=State.task_states)
+        tis = self.get_task_instances(session=session, 
state=State.task_states, refresh_from_db=True)
         self.log.debug("number of tis tasks for %s: %s task(s)", self, 
len(tis))

Review Comment:
   `task_instance_scheduling_decisions()` now always loads TIs with 
`refresh_from_db=True`, which forces SQLAlchemy to overwrite any already-loaded 
instances in the session. This is on a scheduler hot path; please consider 
limiting refresh to the specific dependency-evaluation snapshot that needs it 
(e.g. only finished TIs via `DepContext.ensure_finished_tis`) or add a brief 
comment explaining why always-refreshing the full `State.task_states` set is 
required and acceptable for performance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to