This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f9400cc40c7 Fix:added none to lastupdated Date if event is not in 
queue (#54652)
f9400cc40c7 is described below

commit f9400cc40c7b802509a7ef1093ef8ea9470723d3
Author: vikrantkumar-max <[email protected]>
AuthorDate: Fri Sep 19 19:49:38 2025 +0530

    Fix:added none to lastupdated Date if event is not in queue (#54652)
    
    closes: #53817
---
 .../api_fastapi/core_api/routes/ui/assets.py       | 12 +++-
 .../api_fastapi/core_api/routes/ui/test_assets.py  | 83 ++++++++++++++++++++++
 2 files changed, 94 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
index ce476d94ffc..a112bec9639 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
@@ -18,7 +18,7 @@
 from __future__ import annotations
 
 from fastapi import Depends, HTTPException, status
-from sqlalchemy import and_, func, select
+from sqlalchemy import and_, case, func, select
 
 from airflow.api_fastapi.common.dagbag import DagBagDep
 from airflow.api_fastapi.common.db.common import SessionDep
@@ -53,6 +53,12 @@ def next_run_assets(
                 AssetModel.uri,
                 AssetModel.name,
                 func.max(AssetEvent.timestamp).label("lastUpdate"),
+                func.max(
+                    case(
+                        (AssetDagRunQueue.asset_id.is_not(None), 1),
+                        else_=0,
+                    )
+                ).label("queued"),
             )
             .join(DagScheduleAssetReference, 
DagScheduleAssetReference.asset_id == AssetModel.id)
             .join(
@@ -81,5 +87,9 @@ def next_run_assets(
         )
     ]
 
+    for event in events:
+        if not event.pop("queued", None):
+            event["lastUpdate"] = None
+
     data = {"asset_expression": dag_model.asset_expression, "events": events}
     return data
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
index a75cadbd6c0..91b70967891 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
@@ -18,8 +18,10 @@ from __future__ import annotations
 
 from unittest import mock
 
+import pendulum
 import pytest
 
+from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.sdk.definitions.asset import Asset
 
@@ -74,3 +76,84 @@ class TestNextRunAssets:
     def test_should_respond_403(self, unauthorized_test_client):
         response = unauthorized_test_client.get("/next_run_assets/upstream")
         assert response.status_code == 403
+
+    def test_should_set_last_update_only_for_queued_and_hide_flag(self, 
test_client, dag_maker, session):
+        with dag_maker(
+            dag_id="two_assets_equal",
+            schedule=[
+                Asset(uri="s3://bucket/A", name="A"),
+                Asset(uri="s3://bucket/B", name="B"),
+            ],
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t")
+
+        dr = dag_maker.create_dagrun()
+        dag_maker.sync_dagbag_to_db()
+
+        assets = {
+            a.uri: a
+            for a in 
session.query(AssetModel).filter(AssetModel.uri.in_(["s3://bucket/A", 
"s3://bucket/B"]))
+        }
+        # Queue and add an event only for A
+        session.add(AssetDagRunQueue(asset_id=assets["s3://bucket/A"].id, 
target_dag_id="two_assets_equal"))
+        session.add(
+            AssetEvent(asset_id=assets["s3://bucket/A"].id, 
timestamp=dr.logical_date or pendulum.now())
+        )
+        session.commit()
+
+        response = test_client.get("/next_run_assets/two_assets_equal")
+        assert response.status_code == 200
+        assert response.json() == {
+            "asset_expression": {
+                "all": [
+                    {
+                        "asset": {
+                            "uri": "s3://bucket/A",
+                            "name": "A",
+                            "group": "asset",
+                            "id": mock.ANY,
+                        }
+                    },
+                    {
+                        "asset": {
+                            "uri": "s3://bucket/B",
+                            "name": "B",
+                            "group": "asset",
+                            "id": mock.ANY,
+                        }
+                    },
+                ]
+            },
+            # events are ordered by uri
+            "events": [
+                {"id": mock.ANY, "uri": "s3://bucket/A", "name": "A", 
"lastUpdate": mock.ANY},
+                {"id": mock.ANY, "uri": "s3://bucket/B", "name": "B", 
"lastUpdate": None},
+            ],
+        }
+
+    def test_last_update_respects_latest_run_filter(self, test_client, 
dag_maker, session):
+        with dag_maker(
+            dag_id="filter_run",
+            schedule=[Asset(uri="s3://bucket/F", name="F")],
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t")
+
+        dr = dag_maker.create_dagrun()
+        dag_maker.sync_dagbag_to_db()
+
+        asset = session.query(AssetModel).filter(AssetModel.uri == 
"s3://bucket/F").one()
+        session.add(AssetDagRunQueue(asset_id=asset.id, 
target_dag_id="filter_run"))
+        # event before latest_run should be ignored
+        ts_base = dr.logical_date or pendulum.now()
+        session.add(AssetEvent(asset_id=asset.id, 
timestamp=ts_base.subtract(minutes=10)))
+        # event after latest_run counts
+        session.add(AssetEvent(asset_id=asset.id, 
timestamp=ts_base.add(minutes=10)))
+        session.commit()
+
+        resp = test_client.get("/next_run_assets/filter_run")
+        assert resp.status_code == 200
+        ev = resp.json()["events"][0]
+        assert ev["lastUpdate"] is not None
+        assert "queued" not in ev

Reply via email to