Copilot commented on code in PR #64600:
URL: https://github.com/apache/airflow/pull/64600#discussion_r3025331391
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -225,28 +235,33 @@ def get_template_context(self) -> Context:
# Cache the context object, which ensures that all calls to
get_template_context
# are operating on the same context object.
- self._cached_template_context: Context = self._cached_template_context
or {
- # From the Task Execution interface
- "dag": self.task.dag,
- "inlets": self.task.inlets,
- "map_index_template": self.task.map_index_template,
- "outlets": self.task.outlets,
- "run_id": self.run_id,
- "task": self.task,
- "task_instance": self,
- "ti": self,
- "outlet_events": OutletEventAccessors(),
- "inlet_events": InletEventsAccessors(self.task.inlets),
- "macros": MacrosAccessor(),
- "params": validated_params,
- # TODO: Make this go through Public API longer term.
- # "test_mode": task_instance.test_mode,
- "var": {
- "json": VariableAccessor(deserialize_json=True),
- "value": VariableAccessor(deserialize_json=False),
- },
- "conn": ConnectionAccessor(),
- }
+ if self._cached_template_context is None:
+ self._cached_template_context = {
+ # From the Task Execution interface
+ "dag": self.task.dag,
+ "inlets": self.task.inlets,
+ "map_index_template": self.task.map_index_template,
+ "outlets": self.task.outlets,
+ "run_id": self.run_id,
+ "task": self.task,
+ "task_instance": self,
+ "ti": self,
+ "outlet_events": OutletEventAccessors(),
+ "inlet_events": InletEventsAccessors(self.task.inlets),
+ "macros": MacrosAccessor(),
+ "params": validated_params,
+ # TODO: Make this go through Public API longer term.
+ # "test_mode": task_instance.test_mode,
+ "var": {
+ "json": VariableAccessor(deserialize_json=True),
+ "value": VariableAccessor(deserialize_json=False),
+ },
+ "conn": ConnectionAccessor(),
+ }
+
+ # After initialization, _cached_template_context is definitely not None
+ context = self._cached_template_context
+
Review Comment:
`context = self._cached_template_context` is typed as `Context | None`, but
it is used immediately as a non-optional mapping (`context.update(...)` and
returned). Static type checkers (mypy) generally do not narrow instance
attributes after a `None` check, so this can introduce typing errors (and makes
a potential `None` dereference easier to miss if `_cached_template_context` is
mutated elsewhere).
Suggested fix: after initializing the cache, use an explicit cast to
`Context` (or assign to a local `Context` variable at initialization time) so
`context` is non-optional for the remainder of the method.
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4741,3 +4741,70 @@ def test_dag_add_result(create_runtime_ti,
mock_supervisor_comms):
dag_result=True,
)
)
+
+
+class TestRuntimeTaskInstanceDuration:
+ """Tests for the duration property on RuntimeTaskInstance."""
+
+ def test_duration_with_both_dates(self, create_runtime_ti):
+ """Test duration is computed correctly when both start_date and
end_date are set."""
+ task = BaseOperator(task_id="test_duration")
+ ti = create_runtime_ti(task=task)
+ ti.start_date = timezone.parse("2024-12-01T01:00:00Z")
+ ti.end_date = timezone.parse("2024-12-01T01:05:30Z")
+ assert ti.duration == 330.0 # 5 minutes and 30 seconds
+
+ def test_duration_without_end_date(self, create_runtime_ti):
+ """Test duration is None when end_date is not set (task still
running)."""
+ task = BaseOperator(task_id="test_duration_no_end")
+ ti = create_runtime_ti(task=task)
+ ti.start_date = timezone.parse("2024-12-01T01:00:00Z")
+ ti.end_date = None
+ assert ti.duration is None
+
+ def test_duration_zero(self, create_runtime_ti):
+ """Test duration is zero when start_date equals end_date."""
+ task = BaseOperator(task_id="test_duration_zero")
+ ti = create_runtime_ti(task=task)
+ same_time = timezone.parse("2024-12-01T01:00:00Z")
+ ti.start_date = same_time
+ ti.end_date = same_time
+ assert ti.duration == 0.0
+
+
+class TestRuntimeTaskInstanceQueuedDttm:
+ """Tests for the queued_dttm field on RuntimeTaskInstance."""
+
+ def test_queued_dttm_from_ti_context(self, mocked_parse, make_ti_context):
+ """Test that queued_dttm flows from TIRunContext to
RuntimeTaskInstance via parse."""
+ queued_time = timezone.parse("2024-12-01T00:30:00Z")
+ ti_context = make_ti_context()
+ ti_context.queued_dttm = queued_time
+
+ task = BaseOperator(task_id="test_queued")
+ task.dag = DAG(dag_id="test_dag", start_date=timezone.datetime(2024,
12, 3))
+
+ startup = StartupDetails(
+ ti=TaskInstance(
+ id=uuid7(),
+ task_id="test_queued",
+ dag_id="test_dag",
+ run_id="test_run",
+ try_number=1,
+ dag_version_id=uuid7(),
+ ),
+ dag_rel_path="",
+ bundle_info=BundleInfo(name="anything", version="any"),
+ ti_context=ti_context,
+ start_date=timezone.utcnow(),
+ sentry_integration="",
+ )
+
+ ti = mocked_parse(startup, "test_dag", task)
+ assert ti.queued_dttm == queued_time
Review Comment:
This test uses the `mocked_parse` fixture, which constructs a
`RuntimeTaskInstance` directly (via `RuntimeTaskInstance.model_construct(...)`)
rather than calling the real `parse()` implementation. The fixture currently
does not propagate `what.ti_context.queued_dttm` into the constructed
`RuntimeTaskInstance`, so `ti.queued_dttm` will remain `None` and this
assertion will fail.
Suggested fix: either update the `mocked_parse` fixture to include
`queued_dttm=what.ti_context.queued_dttm` when constructing the
`RuntimeTaskInstance`, or adjust this test to exercise the real `parse()` path
so it verifies the intended data flow.
##########
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##########
@@ -384,6 +384,9 @@ class TIRunContext(BaseModel):
should_retry: bool = False
"""If the ti encounters an error, whether it should enter retry or failed
state."""
+ queued_dttm: UtcDateTime | None = None
+ """When the task instance was queued."""
+
Review Comment:
Adding `queued_dttm` to `TIRunContext` changes the Execution API response
schema for `ti_run`. Since this API is Cadwyn-versioned, this should be
introduced behind a new version (date) and a corresponding `VersionChange` so
that clients requesting older versions (e.g. `2026-04-06`) do not receive an
unexpected `queued_dttm` field when it is set.
Suggested fix: add a new
`airflow-core/src/airflow/api_fastapi/execution_api/versions/vYYYY_MM_DD.py`
`VersionChange` that introduces `queued_dttm` on `schema(TIRunContext)` and (if
needed) removes it when converting responses to previous versions; then update
`execution_api/versions/__init__.py` to include the new version in the bundle
(and regenerate the Task SDK models against the new OpenAPI).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]