sunank200 commented on code in PR #37176: URL: https://github.com/apache/airflow/pull/37176#discussion_r1480227166
########## airflow/api_connexion/endpoints/dataset_endpoint.py: ########## @@ -124,3 +129,150 @@ def get_dataset_events( return dataset_event_collection_schema.dump( DatasetEventCollection(dataset_events=events, total_entries=total_entries) ) + + +def _get_ddrq(dag_id: str, uri: str, session: Session, before: str | None = None) -> DatasetDagRunQueue: + """Get DatasetDagRunQueue.""" + dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri == uri)) + dataset_id = dataset.id + + where_clause_conditions = [ + DatasetDagRunQueue.target_dag_id == dag_id, + DatasetDagRunQueue.dataset_id == dataset_id, + ] + if before is not None: + where_clause_conditions.append(DatasetDagRunQueue.created_at < format_datetime(before)) + + return session.scalar(select(DatasetDagRunQueue).where(*where_clause_conditions)) + + +@security.requires_access_dataset("GET") +@security.requires_access_dag("GET") +@provide_session +def get_dag_dataset_pending_event( + *, dag_id: str, uri: str, before: str | None = None, session: Session = NEW_SESSION +) -> APIResponse: + """Get a pending dataset event for a DAG.""" + ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before) + if ddrq is None: + raise NotFound( + "Pending event not found", + detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: `{uri}` was not found", + ) + return dataset_dag_run_queue_schema.dump(ddrq) + + +@security.requires_access_dataset("DELETE") +@security.requires_access_dag("DELETE") +@provide_session +def delete_dag_dataset_pending_event( + *, dag_id: str, uri: str, before: str | None = None, session: Session = NEW_SESSION +) -> APIResponse: + """Delete a pending dataset event for a DAG.""" + ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before) + if ddrq is None: + raise NotFound( + "Pending event not found", + detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: `{uri}` was not found", Review Comment: ```suggestion detail=f"Pending event with dag_id: `{dag_id}` and dataset uri: `{uri}` was not found", ``` ########## airflow/api_connexion/endpoints/dataset_endpoint.py: ########## @@ -124,3 +129,150 @@ def get_dataset_events( return dataset_event_collection_schema.dump( DatasetEventCollection(dataset_events=events, total_entries=total_entries) ) + + +def _get_ddrq(dag_id: str, uri: str, session: Session, before: str | None = None) -> DatasetDagRunQueue: + """Get DatasetDagRunQueue.""" + dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri == uri)) + dataset_id = dataset.id + + where_clause_conditions = [ + DatasetDagRunQueue.target_dag_id == dag_id, + DatasetDagRunQueue.dataset_id == dataset_id, + ] + if before is not None: + where_clause_conditions.append(DatasetDagRunQueue.created_at < format_datetime(before)) + + return session.scalar(select(DatasetDagRunQueue).where(*where_clause_conditions)) + + +@security.requires_access_dataset("GET") +@security.requires_access_dag("GET") +@provide_session +def get_dag_dataset_pending_event( + *, dag_id: str, uri: str, before: str | None = None, session: Session = NEW_SESSION +) -> APIResponse: + """Get a pending dataset event for a DAG.""" + ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before) + if ddrq is None: + raise NotFound( + "Pending event not found", + detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: `{uri}` was not found", + ) + return dataset_dag_run_queue_schema.dump(ddrq) + + +@security.requires_access_dataset("DELETE") +@security.requires_access_dag("DELETE") +@provide_session +def delete_dag_dataset_pending_event( + *, dag_id: str, uri: str, before: str | None = None, session: Session = NEW_SESSION +) -> APIResponse: + """Delete a pending dataset event for a DAG.""" + ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before) + if ddrq is None: + raise NotFound( + "Pending event not found", + detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: `{uri}` was not found", + ) + session.delete(ddrq) + return NoContent, HTTPStatus.NO_CONTENT + + +def _build_get_ddrqs_where_clause(dag_id: str, before: str | None = None): + where_clauses = [DatasetDagRunQueue.target_dag_id == dag_id] + if before is not None: + where_clauses.append(DatasetDagRunQueue.created_at < format_datetime(before)) + return where_clauses + + +@security.requires_access_dataset("GET") +@security.requires_access_dag("GET") +@provide_session +def get_dag_dataset_pending_events( + *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION +) -> APIResponse: + """Get pending dataset events for a DAG.""" + where_clauses = _build_get_ddrqs_where_clause(dag_id=dag_id, before=before) + query = select(DatasetDagRunQueue).where(*where_clauses) + total_entries = get_query_count(query, session=session) + ddrqs = session.scalars(query).all() + if not ddrqs: + raise NotFound( + "Pending event not found", + detail=f"Pending event with dag_id: `{dag_id}` was not found", + ) + return dataset_dag_run_queue_collection_schema.dump( + DatasetDagRunQueueCollection(dataset_dag_run_queues=ddrqs, total_entries=total_entries) + ) + + +@security.requires_access_dataset("DELETE") +@security.requires_access_dag("DELETE") +@provide_session +def delete_dag_dataset_pending_events( + *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION +) -> APIResponse: + """Delete pending dataset events for a DAG.""" + where_clauses = _build_get_ddrqs_where_clause(dag_id=dag_id, before=before) + query = select(DatasetDagRunQueue).where(*where_clauses) + ddrqs = session.scalars(query).all() Review Comment: Here - all matching records are first fetched into the application memory with `session.scalars(query).all()` before deletion. This could be inefficient and unnecessary. Consider using a single DELETE query to remove the matching records directly in the database, which would be more efficient, especially with a large number of records. ########## airflow/api_connexion/endpoints/dataset_endpoint.py: ########## @@ -124,3 +129,150 @@ def get_dataset_events( return dataset_event_collection_schema.dump( DatasetEventCollection(dataset_events=events, total_entries=total_entries) ) + + +def _get_ddrq(dag_id: str, uri: str, session: Session, before: str | None = None) -> DatasetDagRunQueue: + """Get DatasetDagRunQueue.""" + dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri == uri)) + dataset_id = dataset.id + + where_clause_conditions = [ + DatasetDagRunQueue.target_dag_id == dag_id, + DatasetDagRunQueue.dataset_id == dataset_id, + ] + if before is not None: + where_clause_conditions.append(DatasetDagRunQueue.created_at < format_datetime(before)) + + return session.scalar(select(DatasetDagRunQueue).where(*where_clause_conditions)) + + +@security.requires_access_dataset("GET") +@security.requires_access_dag("GET") +@provide_session +def get_dag_dataset_pending_event( + *, dag_id: str, uri: str, before: str | None = None, session: Session = NEW_SESSION +) -> APIResponse: + """Get a pending dataset event for a DAG.""" + ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before) + if ddrq is None: + raise NotFound( + "Pending event not found", + detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: `{uri}` was not found", + ) + return dataset_dag_run_queue_schema.dump(ddrq) + + +@security.requires_access_dataset("DELETE") +@security.requires_access_dag("DELETE") +@provide_session +def delete_dag_dataset_pending_event( + *, dag_id: str, uri: str, before: str | None = None, session: Session = NEW_SESSION +) -> APIResponse: + """Delete a pending dataset event for a DAG.""" + ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before) + if ddrq is None: + raise NotFound( + "Pending event not found", + detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: `{uri}` was not found", + ) + session.delete(ddrq) + return NoContent, HTTPStatus.NO_CONTENT + + +def _build_get_ddrqs_where_clause(dag_id: str, before: str | None = None): + where_clauses = [DatasetDagRunQueue.target_dag_id == dag_id] + if before is not None: + where_clauses.append(DatasetDagRunQueue.created_at < format_datetime(before)) + return where_clauses + + +@security.requires_access_dataset("GET") +@security.requires_access_dag("GET") +@provide_session +def get_dag_dataset_pending_events( + *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION +) -> APIResponse: + """Get pending dataset events for a DAG.""" + where_clauses = _build_get_ddrqs_where_clause(dag_id=dag_id, before=before) + query = select(DatasetDagRunQueue).where(*where_clauses) + total_entries = get_query_count(query, session=session) + ddrqs = session.scalars(query).all() + if not ddrqs: + raise NotFound( + "Pending event not found", + detail=f"Pending event with dag_id: `{dag_id}` was not found", + ) + return dataset_dag_run_queue_collection_schema.dump( + DatasetDagRunQueueCollection(dataset_dag_run_queues=ddrqs, total_entries=total_entries) + ) + + +@security.requires_access_dataset("DELETE") +@security.requires_access_dag("DELETE") +@provide_session +def delete_dag_dataset_pending_events( + *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION +) -> APIResponse: + """Delete pending dataset events for a DAG.""" + where_clauses = _build_get_ddrqs_where_clause(dag_id=dag_id, before=before) + query = select(DatasetDagRunQueue).where(*where_clauses) + ddrqs = session.scalars(query).all() + if not ddrqs: + raise NotFound( + "Pending event not found", + detail=f"Pending event with dag_id: `{dag_id}` was not found", + ) + delete_stmt = delete(DatasetDagRunQueue).where(*where_clauses) + session.execute(delete_stmt) + return NoContent, HTTPStatus.NO_CONTENT + + +def _build_get_dataset_ddrqs_where_clause(uri: str, session: Session, before: str | None = None): + dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri == uri)) + dataset_id = dataset.id + + where_clauses = [DatasetDagRunQueue.dataset_id == dataset_id] + if before is not None: + where_clauses.append(DatasetDagRunQueue.created_at < format_datetime(before)) + return where_clauses + + +@security.requires_access_dataset("GET") +@security.requires_access_dag("GET") +@provide_session +def get_dataset_pending_events( + *, uri: str, before: str | None = None, session: Session = NEW_SESSION +) -> APIResponse: + """Get pending dataset events.""" + where_clauses = _build_get_dataset_ddrqs_where_clause(uri=uri, session=session, before=before) + query = select(DatasetDagRunQueue).where(*where_clauses) + total_entries = get_query_count(query, session=session) + ddrqs = session.scalars(query).all() + if not ddrqs: + raise NotFound( + "Pending event not found", + detail=f"Pending event with dataset uri: `{uri}` was not found", + ) + return dataset_dag_run_queue_collection_schema.dump( + DatasetDagRunQueueCollection(dataset_dag_run_queues=ddrqs, total_entries=total_entries) + ) + + +@security.requires_access_dataset("DELETE") +@security.requires_access_dag("DELETE") +@provide_session +def delete_dataset_pending_events( + *, uri: str, before: str | None = None, session: Session = NEW_SESSION +) -> APIResponse: + """Delete pending dataset events.""" + where_clauses = _build_get_dataset_ddrqs_where_clause(uri=uri, session=session, before=before) + query = select(DatasetDagRunQueue).where(*where_clauses) + ddrqs = session.scalars(query).all() Review Comment: Here - all matching records are first fetched into the application memory with `session.scalars(query).all()` before deletion. This could be inefficient and unnecessary. Consider using a single DELETE query to remove the matching records directly in the database, which would be more efficient, especially with a large number of records. ########## airflow/api_connexion/endpoints/dataset_endpoint.py: ########## @@ -124,3 +129,150 @@ def get_dataset_events( return dataset_event_collection_schema.dump( DatasetEventCollection(dataset_events=events, total_entries=total_entries) ) + + +def _get_ddrq(dag_id: str, uri: str, session: Session, before: str | None = None) -> DatasetDagRunQueue: + """Get DatasetDagRunQueue.""" + dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri == uri)) + dataset_id = dataset.id + + where_clause_conditions = [ + DatasetDagRunQueue.target_dag_id == dag_id, + DatasetDagRunQueue.dataset_id == dataset_id, + ] + if before is not None: + where_clause_conditions.append(DatasetDagRunQueue.created_at < format_datetime(before)) + + return session.scalar(select(DatasetDagRunQueue).where(*where_clause_conditions)) + + +@security.requires_access_dataset("GET") +@security.requires_access_dag("GET") +@provide_session +def get_dag_dataset_pending_event( + *, dag_id: str, uri: str, before: str | None = None, session: Session = NEW_SESSION +) -> APIResponse: + """Get a pending dataset event for a DAG.""" + ddrq = _get_ddrq(dag_id=dag_id, uri=uri, session=session, before=before) + if ddrq is None: + raise NotFound( + "Pending event not found", + detail=f"Pending event with dag_id: `{dag_id}` and daatset uri: `{uri}` was not found", Review Comment: ```suggestion detail=f"Pending event with dag_id: `{dag_id}` and dataset uri: `{uri}` was not found", ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org