Copilot commented on code in PR #62703:
URL: https://github.com/apache/airflow/pull/62703#discussion_r2872289014
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -2152,6 +2152,84 @@ def
test_schedule_tis_empty_operator_try_number(dag_maker, session: Session):
assert empty_ti.try_number == 1
+def test_schedule_tis_try_number_mismatch_logs_warning(dag_maker, session:
Session, monkeypatch):
+ with dag_maker(session=session):
+ BaseOperator(task_id="task_1")
+
+ dr: DagRun = dag_maker.create_dagrun(session=session)
+ ti = dr.get_task_instance("task_1", session=session)
+ assert ti is not None
+
+ original_execute = session.execute
+
+ class _FakeSelectResult:
+ def all(self):
+ return [(ti.id, ti.try_number + 2, TaskInstanceState.SCHEDULED)]
+
+ def execute_with_mismatch(statement, *args, **kwargs):
+ if getattr(statement, "is_select", False):
+ return _FakeSelectResult()
+ return original_execute(statement, *args, **kwargs)
+
+ monkeypatch.setattr(session, "execute", execute_with_mismatch)
+
+ with (
+ mock.patch.object(dr.log, "isEnabledFor", return_value=True),
+ mock.patch.object(dr.log, "warning") as warning_mock,
+ ):
+ dr.schedule_tis((ti,), session=session)
+
Review Comment:
These new tests call `DagRun.schedule_tis()` without explicitly setting
`ti.task`, even though `schedule_tis()` assumes each TI has its `task`
attribute populated. This currently works only if the TI instance happens to
already have `task` set in the session identity map; please set `ti.task =
dr.dag.get_task("task_1")` (or similar) in the test to avoid brittleness and
better reflect the method contract.
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -2152,6 +2152,84 @@ def
test_schedule_tis_empty_operator_try_number(dag_maker, session: Session):
assert empty_ti.try_number == 1
+def test_schedule_tis_try_number_mismatch_logs_warning(dag_maker, session:
Session, monkeypatch):
+ with dag_maker(session=session):
+ BaseOperator(task_id="task_1")
+
+ dr: DagRun = dag_maker.create_dagrun(session=session)
+ ti = dr.get_task_instance("task_1", session=session)
+ assert ti is not None
+
+ original_execute = session.execute
+
+ class _FakeSelectResult:
+ def all(self):
+ return [(ti.id, ti.try_number + 2, TaskInstanceState.SCHEDULED)]
+
+ def execute_with_mismatch(statement, *args, **kwargs):
+ if getattr(statement, "is_select", False):
+ return _FakeSelectResult()
+ return original_execute(statement, *args, **kwargs)
+
+ monkeypatch.setattr(session, "execute", execute_with_mismatch)
+
+ with (
+ mock.patch.object(dr.log, "isEnabledFor", return_value=True),
+ mock.patch.object(dr.log, "warning") as warning_mock,
+ ):
+ dr.schedule_tis((ti,), session=session)
+
+ assert any(
+ "schedule_tis: try_number mismatch after scheduling" in call.args[0]
+ for call in warning_mock.call_args_list
+ )
+
+
+def test_schedule_tis_try_number_match_has_no_warning(dag_maker, session:
Session):
+ with dag_maker(session=session):
+ BaseOperator(task_id="task_1")
+
+ dr: DagRun = dag_maker.create_dagrun(session=session)
+ ti = dr.get_task_instance("task_1", session=session)
+ assert ti is not None
+
+ with (
+ mock.patch.object(dr.log, "isEnabledFor", return_value=True),
+ mock.patch.object(dr.log, "warning") as warning_mock,
+ ):
+ dr.schedule_tis((ti,), session=session)
+
Review Comment:
This test calls `DagRun.schedule_tis()` without explicitly setting
`ti.task`. `schedule_tis()` relies on `ti.is_schedulable`, which assumes
`ti.task` is populated; please assign `ti.task` from the DAG (e.g.
`dr.dag.get_task("task_1")`) to avoid relying on session identity-map side
effects.
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -2152,6 +2152,84 @@ def
test_schedule_tis_empty_operator_try_number(dag_maker, session: Session):
assert empty_ti.try_number == 1
+def test_schedule_tis_try_number_mismatch_logs_warning(dag_maker, session:
Session, monkeypatch):
+ with dag_maker(session=session):
+ BaseOperator(task_id="task_1")
+
+ dr: DagRun = dag_maker.create_dagrun(session=session)
+ ti = dr.get_task_instance("task_1", session=session)
+ assert ti is not None
+
+ original_execute = session.execute
+
+ class _FakeSelectResult:
+ def all(self):
+ return [(ti.id, ti.try_number + 2, TaskInstanceState.SCHEDULED)]
+
+ def execute_with_mismatch(statement, *args, **kwargs):
+ if getattr(statement, "is_select", False):
+ return _FakeSelectResult()
+ return original_execute(statement, *args, **kwargs)
+
+ monkeypatch.setattr(session, "execute", execute_with_mismatch)
+
+ with (
+ mock.patch.object(dr.log, "isEnabledFor", return_value=True),
+ mock.patch.object(dr.log, "warning") as warning_mock,
+ ):
+ dr.schedule_tis((ti,), session=session)
+
+ assert any(
+ "schedule_tis: try_number mismatch after scheduling" in call.args[0]
+ for call in warning_mock.call_args_list
+ )
+
+
+def test_schedule_tis_try_number_match_has_no_warning(dag_maker, session:
Session):
+ with dag_maker(session=session):
+ BaseOperator(task_id="task_1")
+
+ dr: DagRun = dag_maker.create_dagrun(session=session)
+ ti = dr.get_task_instance("task_1", session=session)
+ assert ti is not None
+
+ with (
+ mock.patch.object(dr.log, "isEnabledFor", return_value=True),
+ mock.patch.object(dr.log, "warning") as warning_mock,
+ ):
+ dr.schedule_tis((ti,), session=session)
+
+ assert all(
+ "schedule_tis: try_number mismatch after scheduling" not in
call.args[0]
+ for call in warning_mock.call_args_list
+ )
+
+
+def test_schedule_tis_try_number_check_is_debug_only(dag_maker, session:
Session, monkeypatch):
+ with dag_maker(session=session):
+ BaseOperator(task_id="task_1")
+
+ dr: DagRun = dag_maker.create_dagrun(session=session)
+ ti = dr.get_task_instance("task_1", session=session)
+ assert ti is not None
+
+ original_execute = session.execute
+ select_calls = 0
+
+ def execute_with_counter(statement, *args, **kwargs):
+ nonlocal select_calls
+ if getattr(statement, "is_select", False):
+ select_calls += 1
+ return original_execute(statement, *args, **kwargs)
+
+ monkeypatch.setattr(session, "execute", execute_with_counter)
+
+ with mock.patch.object(dr.log, "isEnabledFor", return_value=False):
+ dr.schedule_tis((ti,), session=session)
+
Review Comment:
This test calls `DagRun.schedule_tis()` without ensuring `ti.task` is set.
Since `schedule_tis()` assumes each TI has a populated `task`, the test can
become flaky if the TI is reloaded or the session identity map changes;
explicitly set `ti.task` from `dr.dag` before invoking `schedule_tis()`.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1133,6 +1148,16 @@ def process_executor_events(
# Report execution - handle both task and callback events
for key, (state, _) in event_buffer.items():
if isinstance(key, TaskInstanceKey):
+ existing_try =
ti_primary_key_to_try_number_map.get(key.primary)
+ if existing_try is not None and existing_try != key.try_number:
+ cls.logger().warning(
+ "Multiple executor events for same TI with different
try_numbers! "
+ "primary_key=%s existing_try_number=%d
new_try_number=%d new_state=%s. ",
+ key.primary,
+ existing_try,
+ key.try_number,
+ state,
+ )
Review Comment:
The warning message for multiple try_numbers has a trailing space in the
format string (ends with `new_state=%s. `). This creates inconsistent log
output and makes log matching harder; please remove the trailing whitespace
(and consider dropping the extra period) from the message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]