sunank200 commented on code in PR #37176:
URL: https://github.com/apache/airflow/pull/37176#discussion_r1480227166


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +129,150 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _get_ddrq(dag_id: str, uri: str, session: Session, before: str | None = 
None) -> DatasetDagRunQueue:
+    """Get DatasetDagRunQueue."""
+    dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri == 
uri))
+    dataset_id = dataset.id
+
+    where_clause_conditions = [
+        DatasetDagRunQueue.target_dag_id == dag_id,
+        DatasetDagRunQueue.dataset_id == dataset_id,
+    ]
+    if before is not None:
+        where_clause_conditions.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+
+    return 
session.scalar(select(DatasetDagRunQueue).where(*where_clause_conditions))
+
+
+@security.requires_access_dataset("GET")
+@security.requires_access_dag("GET")
+@provide_session
+def get_dag_dataset_pending_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Get a pending dataset event for a DAG."""
+    ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before)
+    if ddrq is None:
+        raise NotFound(
+            "Pending event not found",
+            detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: 
`{uri}` was not found",
+        )
+    return dataset_dag_run_queue_schema.dump(ddrq)
+
+
+@security.requires_access_dataset("DELETE")
+@security.requires_access_dag("DELETE")
+@provide_session
+def delete_dag_dataset_pending_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Delete a pending dataset event for a DAG."""
+    ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before)
+    if ddrq is None:
+        raise NotFound(
+            "Pending event not found",
+            detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: 
`{uri}` was not found",

Review Comment:
   ```suggestion
               detail=f"Pending event with dag_id: `{dag_id}` and dataset uri: 
`{uri}` was not found",
   ```



##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +129,150 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _get_ddrq(dag_id: str, uri: str, session: Session, before: str | None = 
None) -> DatasetDagRunQueue:
+    """Get DatasetDagRunQueue."""
+    dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri == 
uri))
+    dataset_id = dataset.id
+
+    where_clause_conditions = [
+        DatasetDagRunQueue.target_dag_id == dag_id,
+        DatasetDagRunQueue.dataset_id == dataset_id,
+    ]
+    if before is not None:
+        where_clause_conditions.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+
+    return 
session.scalar(select(DatasetDagRunQueue).where(*where_clause_conditions))
+
+
+@security.requires_access_dataset("GET")
+@security.requires_access_dag("GET")
+@provide_session
+def get_dag_dataset_pending_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Get a pending dataset event for a DAG."""
+    ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before)
+    if ddrq is None:
+        raise NotFound(
+            "Pending event not found",
+            detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: 
`{uri}` was not found",
+        )
+    return dataset_dag_run_queue_schema.dump(ddrq)
+
+
+@security.requires_access_dataset("DELETE")
+@security.requires_access_dag("DELETE")
+@provide_session
+def delete_dag_dataset_pending_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Delete a pending dataset event for a DAG."""
+    ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before)
+    if ddrq is None:
+        raise NotFound(
+            "Pending event not found",
+            detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: 
`{uri}` was not found",
+        )
+    session.delete(ddrq)
+    return NoContent, HTTPStatus.NO_CONTENT
+
+
+def _build_get_ddrqs_where_clause(dag_id: str, before: str | None = None):
+    where_clauses = [DatasetDagRunQueue.target_dag_id == dag_id]
+    if before is not None:
+        where_clauses.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+    return where_clauses
+
+
+@security.requires_access_dataset("GET")
+@security.requires_access_dag("GET")
+@provide_session
+def get_dag_dataset_pending_events(
+    *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Get pending dataset events for a DAG."""
+    where_clauses = _build_get_ddrqs_where_clause(dag_id=dag_id, before=before)
+    query = select(DatasetDagRunQueue).where(*where_clauses)
+    total_entries = get_query_count(query, session=session)
+    ddrqs = session.scalars(query).all()
+    if not ddrqs:
+        raise NotFound(
+            "Pending event not found",
+            detail=f"Pending event with dag_id: `{dag_id}` was not found",
+        )
+    return dataset_dag_run_queue_collection_schema.dump(
+        DatasetDagRunQueueCollection(dataset_dag_run_queues=ddrqs, 
total_entries=total_entries)
+    )
+
+
+@security.requires_access_dataset("DELETE")
+@security.requires_access_dag("DELETE")
+@provide_session
+def delete_dag_dataset_pending_events(
+    *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Delete pending dataset events for a DAG."""
+    where_clauses = _build_get_ddrqs_where_clause(dag_id=dag_id, before=before)
+    query = select(DatasetDagRunQueue).where(*where_clauses)
+    ddrqs = session.scalars(query).all()

Review Comment:
   Here - all matching records are first fetched into the application memory 
with `session.scalars(query).all()` before deletion. This could be inefficient 
and unnecessary. Consider using a single DELETE query to remove the matching 
records directly in the database, which would be more efficient, especially 
with a large number of records.



##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +129,150 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _get_ddrq(dag_id: str, uri: str, session: Session, before: str | None = 
None) -> DatasetDagRunQueue:
+    """Get DatasetDagRunQueue."""
+    dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri == 
uri))
+    dataset_id = dataset.id
+
+    where_clause_conditions = [
+        DatasetDagRunQueue.target_dag_id == dag_id,
+        DatasetDagRunQueue.dataset_id == dataset_id,
+    ]
+    if before is not None:
+        where_clause_conditions.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+
+    return 
session.scalar(select(DatasetDagRunQueue).where(*where_clause_conditions))
+
+
+@security.requires_access_dataset("GET")
+@security.requires_access_dag("GET")
+@provide_session
+def get_dag_dataset_pending_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Get a pending dataset event for a DAG."""
+    ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before)
+    if ddrq is None:
+        raise NotFound(
+            "Pending event not found",
+            detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: 
`{uri}` was not found",
+        )
+    return dataset_dag_run_queue_schema.dump(ddrq)
+
+
+@security.requires_access_dataset("DELETE")
+@security.requires_access_dag("DELETE")
+@provide_session
+def delete_dag_dataset_pending_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Delete a pending dataset event for a DAG."""
+    ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before)
+    if ddrq is None:
+        raise NotFound(
+            "Pending event not found",
+            detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: 
`{uri}` was not found",
+        )
+    session.delete(ddrq)
+    return NoContent, HTTPStatus.NO_CONTENT
+
+
+def _build_get_ddrqs_where_clause(dag_id: str, before: str | None = None):
+    where_clauses = [DatasetDagRunQueue.target_dag_id == dag_id]
+    if before is not None:
+        where_clauses.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+    return where_clauses
+
+
+@security.requires_access_dataset("GET")
+@security.requires_access_dag("GET")
+@provide_session
+def get_dag_dataset_pending_events(
+    *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Get pending dataset events for a DAG."""
+    where_clauses = _build_get_ddrqs_where_clause(dag_id=dag_id, before=before)
+    query = select(DatasetDagRunQueue).where(*where_clauses)
+    total_entries = get_query_count(query, session=session)
+    ddrqs = session.scalars(query).all()
+    if not ddrqs:
+        raise NotFound(
+            "Pending event not found",
+            detail=f"Pending event with dag_id: `{dag_id}` was not found",
+        )
+    return dataset_dag_run_queue_collection_schema.dump(
+        DatasetDagRunQueueCollection(dataset_dag_run_queues=ddrqs, 
total_entries=total_entries)
+    )
+
+
+@security.requires_access_dataset("DELETE")
+@security.requires_access_dag("DELETE")
+@provide_session
+def delete_dag_dataset_pending_events(
+    *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Delete pending dataset events for a DAG."""
+    where_clauses = _build_get_ddrqs_where_clause(dag_id=dag_id, before=before)
+    query = select(DatasetDagRunQueue).where(*where_clauses)
+    ddrqs = session.scalars(query).all()
+    if not ddrqs:
+        raise NotFound(
+            "Pending event not found",
+            detail=f"Pending event with dag_id: `{dag_id}` was not found",
+        )
+    delete_stmt = delete(DatasetDagRunQueue).where(*where_clauses)
+    session.execute(delete_stmt)
+    return NoContent, HTTPStatus.NO_CONTENT
+
+
+def _build_get_dataset_ddrqs_where_clause(uri: str, session: Session, before: 
str | None = None):
+    dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri == 
uri))
+    dataset_id = dataset.id
+
+    where_clauses = [DatasetDagRunQueue.dataset_id == dataset_id]
+    if before is not None:
+        where_clauses.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+    return where_clauses
+
+
+@security.requires_access_dataset("GET")
+@security.requires_access_dag("GET")
+@provide_session
+def get_dataset_pending_events(
+    *, uri: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Get pending dataset events."""
+    where_clauses = _build_get_dataset_ddrqs_where_clause(uri=uri, 
session=session, before=before)
+    query = select(DatasetDagRunQueue).where(*where_clauses)
+    total_entries = get_query_count(query, session=session)
+    ddrqs = session.scalars(query).all()
+    if not ddrqs:
+        raise NotFound(
+            "Pending event not found",
+            detail=f"Pending event with dataset uri: `{uri}` was not found",
+        )
+    return dataset_dag_run_queue_collection_schema.dump(
+        DatasetDagRunQueueCollection(dataset_dag_run_queues=ddrqs, 
total_entries=total_entries)
+    )
+
+
+@security.requires_access_dataset("DELETE")
+@security.requires_access_dag("DELETE")
+@provide_session
+def delete_dataset_pending_events(
+    *, uri: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Delete pending dataset events."""
+    where_clauses = _build_get_dataset_ddrqs_where_clause(uri=uri, 
session=session, before=before)
+    query = select(DatasetDagRunQueue).where(*where_clauses)
+    ddrqs = session.scalars(query).all()

Review Comment:
   Here - all matching records are first fetched into the application memory 
with `session.scalars(query).all()` before deletion. This could be inefficient 
and unnecessary. Consider using a single DELETE query to remove the matching 
records directly in the database, which would be more efficient, especially 
with a large number of records.



##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +129,150 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _get_ddrq(dag_id: str, uri: str, session: Session, before: str | None = 
None) -> DatasetDagRunQueue:
+    """Get DatasetDagRunQueue."""
+    dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri == 
uri))
+    dataset_id = dataset.id
+
+    where_clause_conditions = [
+        DatasetDagRunQueue.target_dag_id == dag_id,
+        DatasetDagRunQueue.dataset_id == dataset_id,
+    ]
+    if before is not None:
+        where_clause_conditions.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+
+    return 
session.scalar(select(DatasetDagRunQueue).where(*where_clause_conditions))
+
+
+@security.requires_access_dataset("GET")
+@security.requires_access_dag("GET")
+@provide_session
+def get_dag_dataset_pending_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Get a pending dataset event for a DAG."""
+    ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before)
+    if ddrq is None:
+        raise NotFound(
+            "Pending event not found",
+            detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: 
`{uri}` was not found",

Review Comment:
   ```suggestion
               detail=f"Pending event with dag_id: `{dag_id}` and dataset uri: 
`{uri}` was not found",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to