This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a24e3caa277 add dag_id filters (#47730)
a24e3caa277 is described below
commit a24e3caa277a98d06c30e59a444ce377a68e954b
Author: Kalyan R <[email protected]>
AuthorDate: Fri Mar 14 13:33:42 2025 +0530
add dag_id filters (#47730)
---
.../api_fastapi/core_api/routes/public/assets.py | 34 ++++++++++++++++++----
1 file changed, 28 insertions(+), 6 deletions(-)
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow/api_fastapi/core_api/routes/public/assets.py
index 83874ea8ceb..8a0d25c8472 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -54,6 +54,7 @@ from airflow.api_fastapi.core_api.datamodels.assets import (
from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import (
+ ReadableDagsFilterDep,
requires_access_asset,
requires_access_asset_alias,
requires_access_dag,
@@ -81,6 +82,7 @@ def _generate_queued_event_where_clause(
asset_id: int | None = None,
dag_id: str | None = None,
before: datetime | str | None = None,
+ permitted_dag_ids: set[str] | None = None,
) -> list:
"""Get AssetDagRunQueue where clause."""
where_clause = []
@@ -90,6 +92,8 @@ def _generate_queued_event_where_clause(
where_clause.append(AssetDagRunQueue.asset_id == asset_id)
if before is not None:
where_clause.append(AssetDagRunQueue.created_at < before)
+ if permitted_dag_ids is not None:
+
where_clause.append(AssetDagRunQueue.target_dag_id.in_(permitted_dag_ids))
return where_clause
@@ -322,11 +326,14 @@ def materialize_asset(
)
def get_asset_queued_events(
asset_id: int,
+ readable_dags_filter: ReadableDagsFilterDep,
session: SessionDep,
before: OptionalDateTimeQuery = None,
) -> QueuedEventCollectionResponse:
"""Get queued asset events for an asset."""
- where_clause = _generate_queued_event_where_clause(asset_id=asset_id,
before=before)
+ where_clause = _generate_queued_event_where_clause(
+ asset_id=asset_id, before=before,
permitted_dag_ids=readable_dags_filter.value
+ )
query = select(AssetDagRunQueue).where(*where_clause)
dag_asset_queued_events_select, total_entries =
paginated_select(statement=query)
@@ -381,11 +388,14 @@ def get_asset(
)
def get_dag_asset_queued_events(
dag_id: str,
+ readable_dags_filter: ReadableDagsFilterDep,
session: SessionDep,
before: OptionalDateTimeQuery = None,
) -> QueuedEventCollectionResponse:
"""Get queued asset events for a DAG."""
- where_clause = _generate_queued_event_where_clause(dag_id=dag_id,
before=before)
+ where_clause = _generate_queued_event_where_clause(
+ dag_id=dag_id, before=before,
permitted_dag_ids=readable_dags_filter.value
+ )
query = select(AssetDagRunQueue).where(*where_clause)
dag_asset_queued_events_select, total_entries =
paginated_select(statement=query)
@@ -412,11 +422,14 @@ def get_dag_asset_queued_events(
def get_dag_asset_queued_event(
dag_id: str,
asset_id: int,
+ readable_dags_filter: ReadableDagsFilterDep,
session: SessionDep,
before: OptionalDateTimeQuery = None,
) -> QueuedEventResponse:
"""Get a queued asset event for a DAG."""
- where_clause = _generate_queued_event_where_clause(dag_id=dag_id,
asset_id=asset_id, before=before)
+ where_clause = _generate_queued_event_where_clause(
+ dag_id=dag_id, asset_id=asset_id, before=before,
permitted_dag_ids=readable_dags_filter.value
+ )
query = select(AssetDagRunQueue).where(*where_clause)
adrq = session.scalar(query)
if not adrq:
@@ -440,11 +453,14 @@ def get_dag_asset_queued_event(
)
def delete_asset_queued_events(
asset_id: int,
+ readable_dags_filter: ReadableDagsFilterDep,
session: SessionDep,
before: OptionalDateTimeQuery = None,
):
"""Delete queued asset events for an asset."""
- where_clause = _generate_queued_event_where_clause(asset_id=asset_id,
before=before)
+ where_clause = _generate_queued_event_where_clause(
+ asset_id=asset_id, before=before,
permitted_dag_ids=readable_dags_filter.value
+ )
delete_stmt =
delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch")
result = session.execute(delete_stmt)
if result.rowcount == 0:
@@ -471,10 +487,13 @@ def delete_asset_queued_events(
)
def delete_dag_asset_queued_events(
dag_id: str,
+ readable_dags_filter: ReadableDagsFilterDep,
session: SessionDep,
before: OptionalDateTimeQuery = None,
):
- where_clause = _generate_queued_event_where_clause(dag_id=dag_id,
before=before)
+ where_clause = _generate_queued_event_where_clause(
+ dag_id=dag_id, before=before,
permitted_dag_ids=readable_dags_filter.value
+ )
delete_statement = delete(AssetDagRunQueue).where(*where_clause)
result = session.execute(delete_statement)
@@ -501,11 +520,14 @@ def delete_dag_asset_queued_events(
def delete_dag_asset_queued_event(
dag_id: str,
asset_id: int,
+ readable_dags_filter: ReadableDagsFilterDep,
session: SessionDep,
before: OptionalDateTimeQuery = None,
):
"""Delete a queued asset event for a DAG."""
- where_clause = _generate_queued_event_where_clause(dag_id=dag_id,
before=before, asset_id=asset_id)
+ where_clause = _generate_queued_event_where_clause(
+ dag_id=dag_id, before=before, asset_id=asset_id,
permitted_dag_ids=readable_dags_filter.value
+ )
delete_statement = (
delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch")
)