uranusjr commented on code in PR #37176:
URL: https://github.com/apache/airflow/pull/37176#discussion_r1494074992


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +131,167 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _generate_queued_event_where_clause(
+    *,
+    dag_id: str | None = None,
+    dataset_id: int | None = None,
+    uri: str | None = None,
+    before: str | None = None,
+    permitted_dag_ids: set[str] | None = None,
+) -> list:
+    """Get DatasetDagRunQueue where clause."""
+    where_clause = []
+    if dag_id is not None:
+        where_clause.append(DatasetDagRunQueue.target_dag_id == dag_id)
+    if dataset_id is not None:
+        where_clause.append(DatasetDagRunQueue.dataset_id == dataset_id)
+    if uri is not None:
+        where_clause.append(DatasetModel.uri == uri)
+    if before is not None:
+        where_clause.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+    if permitted_dag_ids is not None:
+        
where_clause.append(DatasetDagRunQueue.target_dag_id.in_(permitted_dag_ids))
+    return where_clause
+
+
+@security.requires_access_dataset("GET")
+@security.requires_access_dag("GET")
+@provide_session
+def get_dag_dataset_queued_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Get a queued Dataset event for a DAG."""
+    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri, 
before=before)
+    ddrq = session.scalar(
+        select(DatasetDagRunQueue)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    if ddrq is None:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: 
`{uri}` was not found",
+        )
+    queued_event = {"created_at": ddrq.created_at, "dag_id": dag_id, "uri": 
uri}
+    return queued_event_schema.dump(queued_event)
+
+
+@security.requires_access_dataset("DELETE")
+@security.requires_access_dag("GET")
+@provide_session
+def delete_dag_dataset_queued_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Delete a queued Dataset event for a DAG."""
+    dataset_id = 
session.scalars(select(DatasetModel.id).where(DatasetModel.uri == 
uri)).one_or_none()
+    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, 
dataset_id=dataset_id, before=before)
+    delete_stmt = delete(DatasetDagRunQueue).where(*where_clause)

Review Comment:
   I _think_ it should work in the sense I can’t think of a reason why it 
wouldn’t, but if it does not work out, let’s merge this first and work on minor 
improvements later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to