uranusjr commented on code in PR #37176:
URL: https://github.com/apache/airflow/pull/37176#discussion_r1494039476


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +131,167 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _generate_queued_event_where_clause(
+    *,
+    dag_id: str | None = None,
+    dataset_id: int | None = None,
+    uri: str | None = None,
+    before: str | None = None,
+    permitted_dag_ids: set[str] | None = None,
+) -> list:
+    """Get DatasetDagRunQueue where clause."""
+    where_clause = []
+    if dag_id is not None:
+        where_clause.append(DatasetDagRunQueue.target_dag_id == dag_id)
+    if dataset_id is not None:
+        where_clause.append(DatasetDagRunQueue.dataset_id == dataset_id)
+    if uri is not None:
+        where_clause.append(DatasetModel.uri == uri)
+    if before is not None:
+        where_clause.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+    if permitted_dag_ids is not None:
+        
where_clause.append(DatasetDagRunQueue.target_dag_id.in_(permitted_dag_ids))
+    return where_clause
+
+
+@security.requires_access_dataset("GET")
+@security.requires_access_dag("GET")
+@provide_session
+def get_dag_dataset_queued_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Get a queued Dataset event for a DAG."""
+    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri, 
before=before)
+    ddrq = session.scalar(
+        select(DatasetDagRunQueue)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    if ddrq is None:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: 
`{uri}` was not found",
+        )
+    queued_event = {"created_at": ddrq.created_at, "dag_id": dag_id, "uri": 
uri}
+    return queued_event_schema.dump(queued_event)
+
+
+@security.requires_access_dataset("DELETE")
+@security.requires_access_dag("GET")
+@provide_session
+def delete_dag_dataset_queued_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Delete a queued Dataset event for a DAG."""
+    dataset_id = 
session.scalars(select(DatasetModel.id).where(DatasetModel.uri == 
uri)).one_or_none()
+    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, 
dataset_id=dataset_id, before=before)
+    delete_stmt = delete(DatasetDagRunQueue).where(*where_clause)

Review Comment:
   I wonder if we can do this in one query (using a `JOIN` between DatasetModel 
and DDRQ) and check the return value of `delete` for the response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to