ashb commented on code in PR #67592:
URL: https://github.com/apache/airflow/pull/67592#discussion_r3435464094
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -233,6 +237,21 @@ def ti_run(
extra=json.dumps({"host_name": ti_run_payload.hostname}) if
ti_run_payload.hostname else None,
)
)
+ # Emit task.queued_duration only on the genuine first QUEUED ->
RUNNING of a try,
+ # otherwise the timing would be misleading. Two cases are skipped:
+ # - a retry of a previous attempt, identified by an existing
end_date;
Review Comment:
I don't think this is not correct.
Each retry is a separate TI row (the old one gets moved/copied in to
TIHistory)
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -1038,6 +1038,92 @@ def test_ti_run_creates_audit_log(self, client, session,
create_task_instance, t
assert logs[0].owner == ti.task.owner
assert logs[0].extra == '{"host_name": "random-hostname"}'
+ def test_ti_run_emits_queued_duration_metric(self, client, session,
create_task_instance, time_machine):
+ """task.queued_duration is emitted when a TI transitions QUEUED ->
RUNNING."""
+ queued_at = timezone.parse("2024-09-30T12:00:00Z")
+ run_at = queued_at.add(seconds=42)
+
+ ti = create_task_instance(
+ task_id="test_ti_run_emits_queued_duration_metric",
+ state=State.QUEUED,
+ dagrun_state=DagRunState.RUNNING,
+ session=session,
+ start_date=queued_at,
+ dag_id=str(uuid4()),
+ )
+ ti.queued_dttm = queued_at
+ ti.queue = "default"
+ session.commit()
+
+ time_machine.move_to(run_at, tick=False)
+
+ with
mock.patch("airflow.api_fastapi.execution_api.routes.task_instances.stats") as
mock_stats:
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/run",
+ json={
+ "state": "running",
+ "hostname": "random-hostname",
+ "unixname": "random-unixname",
+ "pid": 100,
+ "start_date": run_at.isoformat(),
+ },
+ )
+
+ assert response.status_code == 200
+ mock_stats.timing.assert_called_once_with(
+ "task.queued_duration",
+ run_at - queued_at,
+ tags={"task_id": ti.task_id, "dag_id": ti.dag_id, "queue":
"default"},
+ )
+
+ @pytest.mark.parametrize(
+ "skip_reason",
+ ["end_date_set", "deferral_resume", "queued_dttm_missing"],
+ )
+ def test_ti_run_skips_queued_duration_metric(
+ self, client, session, create_task_instance, time_machine, skip_reason
+ ):
+ """task.queued_duration is skipped on a retry (end_date set), on a
resume from
+ deferral (next_method set, end_date still None), and when queued_dttm
was not
+ recorded (rare race / test setups)."""
+ queued_at = timezone.parse("2024-09-30T12:00:00Z")
+ run_at = queued_at.add(seconds=42)
+ time_machine.move_to(run_at, tick=False)
+
+ ti = create_task_instance(
+ task_id=f"test_ti_run_skips_queued_duration_metric_{skip_reason}",
+ state=State.QUEUED,
+ dagrun_state=DagRunState.RUNNING,
+ session=session,
+ start_date=queued_at,
+ dag_id=str(uuid4()),
+ )
+ if skip_reason == "end_date_set":
+ ti.queued_dttm = queued_at
+ ti.end_date = queued_at.add(seconds=10)
Review Comment:
This test doesn't add much value -- we are at this point essentially just
testing the test code, we are not correctly asserting that the invariant (that
`ti.end_date` is set when a task's second attempt is retried.) which means this
could change in the future and this test wouldn't fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]