This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a24e3caa277 add dag_id filters (#47730)
a24e3caa277 is described below

commit a24e3caa277a98d06c30e59a444ce377a68e954b
Author: Kalyan R <[email protected]>
AuthorDate: Fri Mar 14 13:33:42 2025 +0530

    add dag_id filters (#47730)
---
 .../api_fastapi/core_api/routes/public/assets.py   | 34 ++++++++++++++++++----
 1 file changed, 28 insertions(+), 6 deletions(-)

diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow/api_fastapi/core_api/routes/public/assets.py
index 83874ea8ceb..8a0d25c8472 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -54,6 +54,7 @@ from airflow.api_fastapi.core_api.datamodels.assets import (
 from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.api_fastapi.core_api.security import (
+    ReadableDagsFilterDep,
     requires_access_asset,
     requires_access_asset_alias,
     requires_access_dag,
@@ -81,6 +82,7 @@ def _generate_queued_event_where_clause(
     asset_id: int | None = None,
     dag_id: str | None = None,
     before: datetime | str | None = None,
+    permitted_dag_ids: set[str] | None = None,
 ) -> list:
     """Get AssetDagRunQueue where clause."""
     where_clause = []
@@ -90,6 +92,8 @@ def _generate_queued_event_where_clause(
         where_clause.append(AssetDagRunQueue.asset_id == asset_id)
     if before is not None:
         where_clause.append(AssetDagRunQueue.created_at < before)
+    if permitted_dag_ids is not None:
+        
where_clause.append(AssetDagRunQueue.target_dag_id.in_(permitted_dag_ids))
     return where_clause
 
 
@@ -322,11 +326,14 @@ def materialize_asset(
 )
 def get_asset_queued_events(
     asset_id: int,
+    readable_dags_filter: ReadableDagsFilterDep,
     session: SessionDep,
     before: OptionalDateTimeQuery = None,
 ) -> QueuedEventCollectionResponse:
     """Get queued asset events for an asset."""
-    where_clause = _generate_queued_event_where_clause(asset_id=asset_id, 
before=before)
+    where_clause = _generate_queued_event_where_clause(
+        asset_id=asset_id, before=before, 
permitted_dag_ids=readable_dags_filter.value
+    )
     query = select(AssetDagRunQueue).where(*where_clause)
 
     dag_asset_queued_events_select, total_entries = 
paginated_select(statement=query)
@@ -381,11 +388,14 @@ def get_asset(
 )
 def get_dag_asset_queued_events(
     dag_id: str,
+    readable_dags_filter: ReadableDagsFilterDep,
     session: SessionDep,
     before: OptionalDateTimeQuery = None,
 ) -> QueuedEventCollectionResponse:
     """Get queued asset events for a DAG."""
-    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, 
before=before)
+    where_clause = _generate_queued_event_where_clause(
+        dag_id=dag_id, before=before, 
permitted_dag_ids=readable_dags_filter.value
+    )
     query = select(AssetDagRunQueue).where(*where_clause)
 
     dag_asset_queued_events_select, total_entries = 
paginated_select(statement=query)
@@ -412,11 +422,14 @@ def get_dag_asset_queued_events(
 def get_dag_asset_queued_event(
     dag_id: str,
     asset_id: int,
+    readable_dags_filter: ReadableDagsFilterDep,
     session: SessionDep,
     before: OptionalDateTimeQuery = None,
 ) -> QueuedEventResponse:
     """Get a queued asset event for a DAG."""
-    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, 
asset_id=asset_id, before=before)
+    where_clause = _generate_queued_event_where_clause(
+        dag_id=dag_id, asset_id=asset_id, before=before, 
permitted_dag_ids=readable_dags_filter.value
+    )
     query = select(AssetDagRunQueue).where(*where_clause)
     adrq = session.scalar(query)
     if not adrq:
@@ -440,11 +453,14 @@ def get_dag_asset_queued_event(
 )
 def delete_asset_queued_events(
     asset_id: int,
+    readable_dags_filter: ReadableDagsFilterDep,
     session: SessionDep,
     before: OptionalDateTimeQuery = None,
 ):
     """Delete queued asset events for an asset."""
-    where_clause = _generate_queued_event_where_clause(asset_id=asset_id, 
before=before)
+    where_clause = _generate_queued_event_where_clause(
+        asset_id=asset_id, before=before, 
permitted_dag_ids=readable_dags_filter.value
+    )
     delete_stmt = 
delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch")
     result = session.execute(delete_stmt)
     if result.rowcount == 0:
@@ -471,10 +487,13 @@ def delete_asset_queued_events(
 )
 def delete_dag_asset_queued_events(
     dag_id: str,
+    readable_dags_filter: ReadableDagsFilterDep,
     session: SessionDep,
     before: OptionalDateTimeQuery = None,
 ):
-    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, 
before=before)
+    where_clause = _generate_queued_event_where_clause(
+        dag_id=dag_id, before=before, 
permitted_dag_ids=readable_dags_filter.value
+    )
 
     delete_statement = delete(AssetDagRunQueue).where(*where_clause)
     result = session.execute(delete_statement)
@@ -501,11 +520,14 @@ def delete_dag_asset_queued_events(
 def delete_dag_asset_queued_event(
     dag_id: str,
     asset_id: int,
+    readable_dags_filter: ReadableDagsFilterDep,
     session: SessionDep,
     before: OptionalDateTimeQuery = None,
 ):
     """Delete a queued asset event for a DAG."""
-    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, 
before=before, asset_id=asset_id)
+    where_clause = _generate_queued_event_where_clause(
+        dag_id=dag_id, before=before, asset_id=asset_id, 
permitted_dag_ids=readable_dags_filter.value
+    )
     delete_statement = (
         
delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch")
     )

Reply via email to