This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f9400cc40c7 Fix:added none to lastupdated Date if event is not in
queue (#54652)
f9400cc40c7 is described below
commit f9400cc40c7b802509a7ef1093ef8ea9470723d3
Author: vikrantkumar-max <[email protected]>
AuthorDate: Fri Sep 19 19:49:38 2025 +0530
Fix:added none to lastupdated Date if event is not in queue (#54652)
closes: #53817
---
.../api_fastapi/core_api/routes/ui/assets.py | 12 +++-
.../api_fastapi/core_api/routes/ui/test_assets.py | 83 ++++++++++++++++++++++
2 files changed, 94 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
index ce476d94ffc..a112bec9639 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
@@ -18,7 +18,7 @@
from __future__ import annotations
from fastapi import Depends, HTTPException, status
-from sqlalchemy import and_, func, select
+from sqlalchemy import and_, case, func, select
from airflow.api_fastapi.common.dagbag import DagBagDep
from airflow.api_fastapi.common.db.common import SessionDep
@@ -53,6 +53,12 @@ def next_run_assets(
AssetModel.uri,
AssetModel.name,
func.max(AssetEvent.timestamp).label("lastUpdate"),
+ func.max(
+ case(
+ (AssetDagRunQueue.asset_id.is_not(None), 1),
+ else_=0,
+ )
+ ).label("queued"),
)
.join(DagScheduleAssetReference,
DagScheduleAssetReference.asset_id == AssetModel.id)
.join(
@@ -81,5 +87,9 @@ def next_run_assets(
)
]
+ for event in events:
+ if not event.pop("queued", None):
+ event["lastUpdate"] = None
+
data = {"asset_expression": dag_model.asset_expression, "events": events}
return data
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
index a75cadbd6c0..91b70967891 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
@@ -18,8 +18,10 @@ from __future__ import annotations
from unittest import mock
+import pendulum
import pytest
+from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.asset import Asset
@@ -74,3 +76,84 @@ class TestNextRunAssets:
def test_should_respond_403(self, unauthorized_test_client):
response = unauthorized_test_client.get("/next_run_assets/upstream")
assert response.status_code == 403
+
+ def test_should_set_last_update_only_for_queued_and_hide_flag(self,
test_client, dag_maker, session):
+ with dag_maker(
+ dag_id="two_assets_equal",
+ schedule=[
+ Asset(uri="s3://bucket/A", name="A"),
+ Asset(uri="s3://bucket/B", name="B"),
+ ],
+ serialized=True,
+ ):
+ EmptyOperator(task_id="t")
+
+ dr = dag_maker.create_dagrun()
+ dag_maker.sync_dagbag_to_db()
+
+ assets = {
+ a.uri: a
+ for a in
session.query(AssetModel).filter(AssetModel.uri.in_(["s3://bucket/A",
"s3://bucket/B"]))
+ }
+ # Queue and add an event only for A
+ session.add(AssetDagRunQueue(asset_id=assets["s3://bucket/A"].id,
target_dag_id="two_assets_equal"))
+ session.add(
+ AssetEvent(asset_id=assets["s3://bucket/A"].id,
timestamp=dr.logical_date or pendulum.now())
+ )
+ session.commit()
+
+ response = test_client.get("/next_run_assets/two_assets_equal")
+ assert response.status_code == 200
+ assert response.json() == {
+ "asset_expression": {
+ "all": [
+ {
+ "asset": {
+ "uri": "s3://bucket/A",
+ "name": "A",
+ "group": "asset",
+ "id": mock.ANY,
+ }
+ },
+ {
+ "asset": {
+ "uri": "s3://bucket/B",
+ "name": "B",
+ "group": "asset",
+ "id": mock.ANY,
+ }
+ },
+ ]
+ },
+ # events are ordered by uri
+ "events": [
+ {"id": mock.ANY, "uri": "s3://bucket/A", "name": "A",
"lastUpdate": mock.ANY},
+ {"id": mock.ANY, "uri": "s3://bucket/B", "name": "B",
"lastUpdate": None},
+ ],
+ }
+
+ def test_last_update_respects_latest_run_filter(self, test_client,
dag_maker, session):
+ with dag_maker(
+ dag_id="filter_run",
+ schedule=[Asset(uri="s3://bucket/F", name="F")],
+ serialized=True,
+ ):
+ EmptyOperator(task_id="t")
+
+ dr = dag_maker.create_dagrun()
+ dag_maker.sync_dagbag_to_db()
+
+ asset = session.query(AssetModel).filter(AssetModel.uri ==
"s3://bucket/F").one()
+ session.add(AssetDagRunQueue(asset_id=asset.id,
target_dag_id="filter_run"))
+ # event before latest_run should be ignored
+ ts_base = dr.logical_date or pendulum.now()
+ session.add(AssetEvent(asset_id=asset.id,
timestamp=ts_base.subtract(minutes=10)))
+ # event after latest_run counts
+ session.add(AssetEvent(asset_id=asset.id,
timestamp=ts_base.add(minutes=10)))
+ session.commit()
+
+ resp = test_client.get("/next_run_assets/filter_run")
+ assert resp.status_code == 200
+ ev = resp.json()["events"][0]
+ assert ev["lastUpdate"] is not None
+ assert "queued" not in ev