This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ed237dff7c9 Fix task-level audit logs missing success/running events
in Airflow 3.1.x (#61932)
ed237dff7c9 is described below
commit ed237dff7c9c6ef6a25be34a243f5645ab0ccf67
Author: Kevin Yang <[email protected]>
AuthorDate: Tue Mar 10 17:25:17 2026 -0400
Fix task-level audit logs missing success/running events in Airflow 3.1.x
(#61932)
* create audit log records in ti_run and ti_update_state
* cleanup and merge test cases
* retrive from dag and from dag run, and add extra with only host_name
* update ui end-to-end test to be more stable
get latest changes from main to remove sorting test
* clean up
* remove serial config for tests
* revert change made to the UI end-to-end test as sequential trigger/wait
dag run cause timeout
---
.../execution_api/routes/task_instances.py | 59 +++++++-
.../versions/head/test_task_instances.py | 153 ++++++++++++++++++++-
2 files changed, 206 insertions(+), 6 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 9273cc8b3d4..60dd868c2e3 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -30,7 +30,7 @@ import structlog
from cadwyn import VersionedAPIRouter
from fastapi import Body, HTTPException, Query, Security, status
from pydantic import JsonValue
-from sqlalchemy import func, or_, tuple_, update
+from sqlalchemy import and_, func, or_, tuple_, update
from sqlalchemy.engine import CursorResult
from sqlalchemy.exc import NoResultFound, SQLAlchemyError
from sqlalchemy.orm import joinedload
@@ -64,6 +64,7 @@ from airflow.exceptions import TaskNotFound
from airflow.models.asset import AssetActive
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun as DR
+from airflow.models.log import Log
from airflow.models.taskinstance import TaskInstance as TI,
_stop_remaining_tasks
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.trigger import Trigger
@@ -136,10 +137,14 @@ def ti_run(
# This selects the raw JSON value, bypassing the deserialization
-- we want that to happen on the
# client
column("next_kwargs", JSON),
+ DR.logical_date,
+ DagModel.owners,
)
.select_from(TI)
+ .join(DR, and_(TI.dag_id == DR.dag_id, TI.run_id == DR.run_id))
+ .join(DagModel, TI.dag_id == DagModel.dag_id)
.where(TI.id == task_instance_id)
- .with_for_update()
+ .with_for_update(of=TI)
)
try:
ti = session.execute(old).one()
@@ -195,6 +200,19 @@ def ti_run(
)
else:
log.info("Task started", previous_state=previous_state,
hostname=ti_run_payload.hostname)
+ session.add(
+ Log(
+ event=TaskInstanceState.RUNNING.value,
+ task_id=ti.task_id,
+ dag_id=ti.dag_id,
+ run_id=ti.run_id,
+ map_index=ti.map_index,
+ try_number=ti.try_number,
+ logical_date=ti.logical_date,
+ owner=ti.owners,
+ extra=json.dumps({"host_name": ti_run_payload.hostname}) if
ti_run_payload.hostname else None,
+ )
+ )
# Ensure there is no end date set.
query = query.values(
end_date=None,
@@ -297,9 +315,23 @@ def ti_update_state(
log.debug("Updating task instance state", new_state=ti_patch_payload.state)
old = (
- select(TI.state, TI.try_number, TI.max_tries, TI.dag_id)
+ select(
+ TI.state,
+ TI.try_number,
+ TI.max_tries,
+ TI.dag_id,
+ TI.task_id,
+ TI.run_id,
+ TI.map_index,
+ TI.hostname,
+ DR.logical_date,
+ DagModel.owners,
+ )
+ .select_from(TI)
+ .join(DR, and_(TI.dag_id == DR.dag_id, TI.run_id == DR.run_id))
+ .join(DagModel, TI.dag_id == DagModel.dag_id)
.where(TI.id == task_instance_id)
- .with_for_update()
+ .with_for_update(of=TI)
)
try:
(
@@ -307,6 +339,12 @@ def ti_update_state(
try_number,
max_tries,
dag_id,
+ task_id,
+ run_id,
+ map_index,
+ hostname,
+ logical_date,
+ owners,
) = session.execute(old).one()
log.debug(
"Retrieved current task instance state",
@@ -373,6 +411,19 @@ def ti_update_state(
new_state=updated_state,
rows_affected=getattr(result, "rowcount", 0),
)
+ session.add(
+ Log(
+ event=updated_state.value,
+ task_id=task_id,
+ dag_id=dag_id,
+ run_id=run_id,
+ map_index=map_index,
+ try_number=try_number,
+ logical_date=logical_date,
+ owner=owners,
+ extra=json.dumps({"host_name": hostname}) if hostname else
None,
+ )
+ )
except SQLAlchemyError as e:
log.error("Error updating Task Instance state", error=str(e))
raise HTTPException(
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index d9ec3916187..f821ba4440a 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -34,6 +34,7 @@ from airflow.api_fastapi.execution_api.app import lifespan
from airflow.exceptions import AirflowSkipException
from airflow.models import RenderedTaskInstanceFields, TaskReschedule, Trigger
from airflow.models.asset import AssetActive, AssetAliasModel, AssetEvent,
AssetModel
+from airflow.models.log import Log
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -44,6 +45,7 @@ from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import (
clear_db_assets,
clear_db_dags,
+ clear_db_logs,
clear_db_runs,
clear_db_serialized_dags,
clear_rendered_ti_fields,
@@ -118,11 +120,13 @@ def test_id_matches_sub_claim(client, session,
create_task_instance):
class TestTIRunState:
def setup_method(self):
+ clear_db_logs()
clear_db_runs()
clear_db_serialized_dags()
clear_db_dags()
def teardown_method(self):
+ clear_db_logs()
clear_db_runs()
clear_db_serialized_dags()
clear_db_dags()
@@ -784,14 +788,57 @@ class TestTIRunState:
assert dag_run["run_id"] == "test"
assert dag_run["state"] == "running"
+ def test_ti_run_creates_audit_log(self, client, session,
create_task_instance, time_machine):
+ """Test that transitioning to RUNNING creates an audit log record."""
+ instant_str = "2024-09-30T12:00:00Z"
+ instant = timezone.parse(instant_str)
+ time_machine.move_to(instant, tick=False)
+
+ ti = create_task_instance(
+ task_id="test_ti_run_creates_audit_log",
+ state=State.QUEUED,
+ dagrun_state=DagRunState.RUNNING,
+ session=session,
+ start_date=instant,
+ dag_id=str(uuid4()),
+ )
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/run",
+ json={
+ "state": "running",
+ "hostname": "random-hostname",
+ "unixname": "random-unixname",
+ "pid": 100,
+ "start_date": instant_str,
+ },
+ )
+
+ assert response.status_code == 200
+
+ logs = session.scalars(select(Log).where(Log.dag_id ==
ti.dag_id)).all()
+ assert len(logs) == 1
+ assert logs[0].event == TaskInstanceState.RUNNING.value
+ assert logs[0].task_id == ti.task_id
+ assert logs[0].dag_id == ti.dag_id
+ assert logs[0].run_id == ti.run_id
+ assert logs[0].map_index == ti.map_index
+ assert logs[0].try_number == ti.try_number
+ assert logs[0].logical_date == instant
+ assert logs[0].owner == ti.task.owner
+ assert logs[0].extra == '{"host_name": "random-hostname"}'
+
class TestTIUpdateState:
def setup_method(self):
clear_db_assets()
+ clear_db_logs()
clear_db_runs()
def teardown_method(self):
clear_db_assets()
+ clear_db_logs()
clear_db_runs()
@pytest.mark.parametrize(
@@ -829,6 +876,82 @@ class TestTIUpdateState:
assert ti.state == expected_state
assert ti.end_date == end_date
+ @pytest.mark.parametrize(
+ ("payload", "expected_event"),
+ [
+ pytest.param(
+ {"state": State.SUCCESS, "end_date":
DEFAULT_END_DATE.isoformat()},
+ State.SUCCESS,
+ id="success",
+ ),
+ pytest.param(
+ {"state": State.FAILED, "end_date":
DEFAULT_END_DATE.isoformat()},
+ State.FAILED,
+ id="failed",
+ ),
+ pytest.param(
+ {"state": State.SKIPPED, "end_date":
DEFAULT_END_DATE.isoformat()},
+ State.SKIPPED,
+ id="skipped",
+ ),
+ pytest.param(
+ {"state": State.UP_FOR_RETRY, "end_date":
DEFAULT_END_DATE.isoformat()},
+ TaskInstanceState.UP_FOR_RETRY.value,
+ id="up_for_retry",
+ ),
+ pytest.param(
+ {
+ "state": "deferred",
+ "trigger_kwargs": {"key": "value", "moment":
"2026-02-18T00:00:00Z"},
+ "trigger_timeout": "P1D",
+ "classpath": "my-classpath",
+ "next_method": "execute_callback",
+ },
+ TaskInstanceState.DEFERRED.value,
+ id="deferred",
+ ),
+ pytest.param(
+ {
+ "state": "up_for_reschedule",
+ "reschedule_date": "2026-02-18T11:03:00+00:00",
+ "end_date": DEFAULT_END_DATE.isoformat(),
+ },
+ TaskInstanceState.UP_FOR_RESCHEDULE.value,
+ id="up_for_reschedule",
+ ),
+ ],
+ )
+ def test_ti_update_state_creates_audit_log(
+ self, client, session, create_task_instance, payload, expected_event
+ ):
+ """Test that state transition creates an audit log record."""
+ ti = create_task_instance(
+ task_id="test_ti_update_state_creates_audit_log",
+ start_date=DEFAULT_START_DATE,
+ state=State.RUNNING,
+ hostname="random-hostname",
+ )
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/state",
+ json=payload,
+ )
+
+ assert response.status_code == 204
+
+ logs = session.scalars(select(Log).where(Log.dag_id ==
ti.dag_id)).all()
+ assert len(logs) == 1
+ assert logs[0].event == expected_event
+ assert logs[0].task_id == ti.task_id
+ assert logs[0].dag_id == ti.dag_id
+ assert logs[0].run_id == ti.run_id
+ assert logs[0].map_index == ti.map_index
+ assert logs[0].try_number == ti.try_number
+ assert logs[0].logical_date == ti.dag_run.logical_date
+ assert logs[0].owner == ti.task.owner
+ assert logs[0].extra == '{"host_name": "random-hostname"}'
+
@pytest.mark.parametrize(
("state", "end_date", "expected_state", "rendered_map_index"),
[
@@ -1054,8 +1177,34 @@ class TestTIUpdateState:
mock.patch(
"airflow.api_fastapi.common.db.common.Session.execute",
side_effect=[
- mock.Mock(one=lambda: ("running", 1, 0, "dag")), # First
call returns "queued"
- mock.Mock(one=lambda: ("running", 1, 0, "dag")), # Second
call returns "queued"
+ mock.Mock(
+ one=lambda: (
+ "running",
+ 1,
+ 0,
+ "dag",
+ "task",
+ "run",
+ -1,
+ "localhost",
+ timezone.utcnow(),
+ "test_owner",
+ )
+ ), # First call returns "queued"
+ mock.Mock(
+ one=lambda: (
+ "running",
+ 1,
+ 0,
+ "dag",
+ "task",
+ "run",
+ -1,
+ "localhost",
+ timezone.utcnow(),
+ "test_owner",
+ )
+ ), # Second call returns "queued"
SQLAlchemyError("Database error"), # Last call raises an
error
],
),