jedcunningham commented on code in PR #48174:
URL: https://github.com/apache/airflow/pull/48174#discussion_r2010968505


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##########
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
     parsing_request = DagPriorityParsingRequest(fileloc=path)
     session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+    "/manage/reserialize",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_409_CONFLICT,
+            status.HTTP_500_INTERNAL_SERVER_ERROR,
+        ]
+    ),
+    dependencies=[Depends(action_logging())],
+)
+def reserialize_dags(
+    request: DagReserializePostBody,
+    session: SessionDep,  # Add your session dependency
+):
+    """
+    Reserialize DAG bundles in Airflow.
+
+    - **bundle_names**: List of specific bundles to reserialize (all if empty)
+    """
+    try:
+        manager = DagBundlesManager()
+
+        # Getting all bundle names which was retrieved in validation function
+        manager.sync_bundles_to_db()
+        all_bundle_names = set(manager.get_all_bundle_names())
+
+        # Validate bundle names if specified
+        if request.bundle_names:
+            bundles_to_process = set(request.bundle_names)
+            if len(bundles_to_process - all_bundle_names) > 0:
+                raise HTTPException(
+                    status.HTTP_400_BAD_REQUEST,
+                    f"Invalid bundle name: {bundles_to_process - 
all_bundle_names}",
+                )
+        else:
+            bundles_to_process = all_bundle_names
+
+        file_locations = session.scalars(
+            
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))

Review Comment:
   ```suggestion
               
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))
   ```
   
   We probably want a way to signal that we want to reparse the whole bundle vs 
going file by file like this. Especially since I think we should also force 
bundle refresh once per bundle vs per file like this would do.
   
   Relatedly, I'd just pushed up some WIP changes I had to refactor the api 
side of this: #48216
   I haven't done the dag processor side quite yet, but will in the next day or 
so. It might make sense to hold off on further changes until that lands, then 
we can determine how we modify the DagPriorityParsingRequest model and the dag 
processor to enable this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to