Re: [PR] Added Fastapi endpoint for reserialize all/specific dags [airflow]

2025-04-21 Thread via GitHub


rawwar commented on PR #48174:
URL: https://github.com/apache/airflow/pull/48174#issuecomment-2820201002

   @shubham-pyc , can you resolve conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Added Fastapi endpoint for reserialize all/specific dags [airflow]

2025-04-10 Thread via GitHub


pierrejeambrun commented on PR #48174:
URL: https://github.com/apache/airflow/pull/48174#issuecomment-2792093813

   I'll take a look in the next few days, focusing on the AF3 release and bug 
fixing at the moment. This will make it in 3.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Added Fastapi endpoint for reserialize all/specific dags [airflow]

2025-04-07 Thread via GitHub


shubham-pyc commented on PR #48174:
URL: https://github.com/apache/airflow/pull/48174#issuecomment-2785336348

   @pierrejeambrun Regarding the same PR, I already raised another PR related 
to dag_processor: 
   https://github.com/apache/airflow/pull/48659
   
   Once that is fixed, I will get this one done. Please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Added Fastapi endpoint for reserialize all/specific dags [airflow]

2025-03-27 Thread via GitHub


shubham-pyc commented on code in PR #48174:
URL: https://github.com/apache/airflow/pull/48174#discussion_r2017811153


##
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
 parsing_request = DagPriorityParsingRequest(fileloc=path)
 session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+"/manage/reserialize",
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_404_NOT_FOUND,
+status.HTTP_409_CONFLICT,
+status.HTTP_500_INTERNAL_SERVER_ERROR,
+]
+),
+dependencies=[Depends(action_logging())],
+)
+def reserialize_dags(
+request: DagReserializePostBody,
+session: SessionDep,  # Add your session dependency
+):
+"""
+Reserialize DAG bundles in Airflow.
+
+- **bundle_names**: List of specific bundles to reserialize (all if empty)
+"""
+try:
+manager = DagBundlesManager()
+
+# Getting all bundle names which was retrieved in validation function
+manager.sync_bundles_to_db()
+all_bundle_names = set(manager.get_all_bundle_names())
+
+# Validate bundle names if specified
+if request.bundle_names:
+bundles_to_process = set(request.bundle_names)
+if len(bundles_to_process - all_bundle_names) > 0:
+raise HTTPException(
+status.HTTP_400_BAD_REQUEST,
+f"Invalid bundle name: {bundles_to_process - 
all_bundle_names}",
+)
+else:
+bundles_to_process = all_bundle_names
+
+file_locations = session.scalars(
+
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))

Review Comment:
   @jedcunningham I read through the changes made to pull requests:
   
   https://github.com/apache/airflow/pull/48216
   https://github.com/apache/airflow/pull/48424
   
   For us to signal that the dag processor to prepare all the files do you 
think this approach works?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Added Fastapi endpoint for reserialize all/specific dags [airflow]

2025-03-24 Thread via GitHub


shubham-pyc commented on code in PR #48174:
URL: https://github.com/apache/airflow/pull/48174#discussion_r2011021740


##
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
 parsing_request = DagPriorityParsingRequest(fileloc=path)
 session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+"/manage/reserialize",
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_404_NOT_FOUND,
+status.HTTP_409_CONFLICT,
+status.HTTP_500_INTERNAL_SERVER_ERROR,
+]
+),
+dependencies=[Depends(action_logging())],
+)
+def reserialize_dags(
+request: DagReserializePostBody,
+session: SessionDep,  # Add your session dependency
+):
+"""
+Reserialize DAG bundles in Airflow.
+
+- **bundle_names**: List of specific bundles to reserialize (all if empty)
+"""
+try:
+manager = DagBundlesManager()
+
+# Getting all bundle names which was retrieved in validation function
+manager.sync_bundles_to_db()
+all_bundle_names = set(manager.get_all_bundle_names())
+
+# Validate bundle names if specified
+if request.bundle_names:
+bundles_to_process = set(request.bundle_names)
+if len(bundles_to_process - all_bundle_names) > 0:
+raise HTTPException(
+status.HTTP_400_BAD_REQUEST,
+f"Invalid bundle name: {bundles_to_process - 
all_bundle_names}",
+)
+else:
+bundles_to_process = all_bundle_names
+
+file_locations = session.scalars(
+
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))

Review Comment:
   @jedcunningham thanks for the response.
   
   I read through the code for https://github.com/apache/airflow/pull/48216. 
One possible change we can make to DagPriorityParsingRequest is to make the 
`relative_fileloc` nullable and add another flag called `parse_whole_bundle`.
   
   In the DAG processor logic, we can then add a condition: if 
`parse_whole_bundle` is `True`, we parse all the files in the bundle location. 
Otherwise, we parse just one file.
   
   This would also take care of the hashing logic for the 
DagPriorityParsingRequest ID column, since if you want to parse the whole 
bundle the relative file location would be empty. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Added Fastapi endpoint for reserialize all/specific dags [airflow]

2025-03-24 Thread via GitHub


shubham-pyc commented on code in PR #48174:
URL: https://github.com/apache/airflow/pull/48174#discussion_r2011021740


##
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
 parsing_request = DagPriorityParsingRequest(fileloc=path)
 session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+"/manage/reserialize",
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_404_NOT_FOUND,
+status.HTTP_409_CONFLICT,
+status.HTTP_500_INTERNAL_SERVER_ERROR,
+]
+),
+dependencies=[Depends(action_logging())],
+)
+def reserialize_dags(
+request: DagReserializePostBody,
+session: SessionDep,  # Add your session dependency
+):
+"""
+Reserialize DAG bundles in Airflow.
+
+- **bundle_names**: List of specific bundles to reserialize (all if empty)
+"""
+try:
+manager = DagBundlesManager()
+
+# Getting all bundle names which was retrieved in validation function
+manager.sync_bundles_to_db()
+all_bundle_names = set(manager.get_all_bundle_names())
+
+# Validate bundle names if specified
+if request.bundle_names:
+bundles_to_process = set(request.bundle_names)
+if len(bundles_to_process - all_bundle_names) > 0:
+raise HTTPException(
+status.HTTP_400_BAD_REQUEST,
+f"Invalid bundle name: {bundles_to_process - 
all_bundle_names}",
+)
+else:
+bundles_to_process = all_bundle_names
+
+file_locations = session.scalars(
+
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))

Review Comment:
   @jedcunningham thanks for the response.
   
   I read through the code for https://github.com/apache/airflow/pull/48216. 
One possible change we can make is to make the `relative_fileloc` nullable and 
add another flag called `parse_whole_bundle`.
   
   In the DAG processor logic, we can then add a condition: if 
`parse_whole_bundle` is `True`, we parse all the files in the bundle location. 
Otherwise, we parse just one file.
   
   This would also take care of the hashing logic for the 
DagPriorityParsingRequest ID column, since if you want to parse the whole 
bundle the relative file location would be empty. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Added Fastapi endpoint for reserialize all/specific dags [airflow]

2025-03-24 Thread via GitHub


jedcunningham commented on code in PR #48174:
URL: https://github.com/apache/airflow/pull/48174#discussion_r2010968505


##
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
 parsing_request = DagPriorityParsingRequest(fileloc=path)
 session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+"/manage/reserialize",
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_404_NOT_FOUND,
+status.HTTP_409_CONFLICT,
+status.HTTP_500_INTERNAL_SERVER_ERROR,
+]
+),
+dependencies=[Depends(action_logging())],
+)
+def reserialize_dags(
+request: DagReserializePostBody,
+session: SessionDep,  # Add your session dependency
+):
+"""
+Reserialize DAG bundles in Airflow.
+
+- **bundle_names**: List of specific bundles to reserialize (all if empty)
+"""
+try:
+manager = DagBundlesManager()
+
+# Getting all bundle names which was retrieved in validation function
+manager.sync_bundles_to_db()
+all_bundle_names = set(manager.get_all_bundle_names())
+
+# Validate bundle names if specified
+if request.bundle_names:
+bundles_to_process = set(request.bundle_names)
+if len(bundles_to_process - all_bundle_names) > 0:
+raise HTTPException(
+status.HTTP_400_BAD_REQUEST,
+f"Invalid bundle name: {bundles_to_process - 
all_bundle_names}",
+)
+else:
+bundles_to_process = all_bundle_names
+
+file_locations = session.scalars(
+
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))

Review Comment:
   ```suggestion
   
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))
   ```
   
   We probably want a way to signal that we want to reparse the whole bundle vs 
going file by file like this. Especially since I think we should also force 
bundle refresh once per bundle vs per file like this would do.
   
   Relatedly, I'd just pushed up some WIP changes I had to refactor the api 
side of this: #48216
   I haven't done the dag processor side quite yet, but will in the next day or 
so. It might make sense to hold off on further changes until that lands, then 
we can determine how we modify the DagPriorityParsingRequest model and the dag 
processor to enable this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Added Fastapi endpoint for reserialize all/specific dags [airflow]

2025-03-24 Thread via GitHub


shubham-pyc commented on code in PR #48174:
URL: https://github.com/apache/airflow/pull/48174#discussion_r2010943846


##
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
 parsing_request = DagPriorityParsingRequest(fileloc=path)
 session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+"/manage/reserialize",

Review Comment:
   so basically the router becomes 
   
   for parsing multiple files
   
   `PUT /api/v2/parseDagFiles/`
   
   and for single file
   `PUT /api/v2/parseDagFiles/{file_token}`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Added Fastapi endpoint for reserialize all/specific dags [airflow]

2025-03-24 Thread via GitHub


pierrejeambrun commented on code in PR #48174:
URL: https://github.com/apache/airflow/pull/48174#discussion_r2009982034


##
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
 parsing_request = DagPriorityParsingRequest(fileloc=path)
 session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+"/manage/reserialize",
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_404_NOT_FOUND,
+status.HTTP_409_CONFLICT,
+status.HTTP_500_INTERNAL_SERVER_ERROR,
+]
+),
+dependencies=[Depends(action_logging())],

Review Comment:
   We are missing the permissions. Please check other endpoints.



##
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
 parsing_request = DagPriorityParsingRequest(fileloc=path)
 session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+"/manage/reserialize",
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_404_NOT_FOUND,
+status.HTTP_409_CONFLICT,
+status.HTTP_500_INTERNAL_SERVER_ERROR,
+]
+),
+dependencies=[Depends(action_logging())],
+)
+def reserialize_dags(
+request: DagReserializePostBody,
+session: SessionDep,  # Add your session dependency
+):
+"""
+Reserialize DAG bundles in Airflow.
+
+- **bundle_names**: List of specific bundles to reserialize (all if empty)
+"""
+try:
+manager = DagBundlesManager()
+
+# Getting all bundle names which was retrieved in validation function
+manager.sync_bundles_to_db()
+all_bundle_names = set(manager.get_all_bundle_names())
+
+# Validate bundle names if specified
+if request.bundle_names:

Review Comment:
   There are utility to fetch query parameters. Please check other endpoints. 
You will be able to do validation and provide default values too.



##
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
 parsing_request = DagPriorityParsingRequest(fileloc=path)
 session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+"/manage/reserialize",
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_404_NOT_FOUND,
+status.HTTP_409_CONFLICT,
+status.HTTP_500_INTERNAL_SERVER_ERROR,
+]
+),
+dependencies=[Depends(action_logging())],
+)
+def reserialize_dags(
+request: DagReserializePostBody,
+session: SessionDep,  # Add your session dependency
+):
+"""
+Reserialize DAG bundles in Airflow.
+
+- **bundle_names**: List of specific bundles to reserialize (all if empty)
+"""
+try:
+manager = DagBundlesManager()
+
+# Getting all bundle names which was retrieved in validation function
+manager.sync_bundles_to_db()
+all_bundle_names = set(manager.get_all_bundle_names())
+
+# Validate bundle names if specified
+if request.bundle_names:
+bundles_to_process = set(request.bundle_names)
+if len(bundles_to_process - all_bundle_names) > 0:
+raise HTTPException(
+status.HTTP_400_BAD_REQUEST,
+f"Invalid bundle name: {bundles_to_process - 
all_bundle_names}",
+)
+else:
+bundles_to_process = all_bundle_names
+
+file_locations = session.scalars(
+
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))
+)
+# Process each bundle
+parsing_requests = [DagPriorityParsingRequest(fileloc=fileloc) for 
fileloc in file_locations]
+
+session.add_all(parsing_requests)
+return ReserializeResponse(
+message="DAG bundles reserialized successfully", 
processed_bundles=list(bundles_to_process)
+)
+except HTTPException as e:
+raise e
+
+except Exception as e:
+session.rollback()
+raise HTTPException(
+status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+detail=f"Failed to reserialize DAG bundles: {str(e)}",
+)

Review Comment:
   This is automatically done by the SessionDependency, can be removed.



##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py:
##
@@ -180,3 +180,16 @@ def latest_dag_version(self) -> DagVersionResponse | None:
 if latest_dag_version is None:
 return latest_dag_version
 return DagVersionResponse.model_validate(latest_dag_version)
+
+
+class ReserializeResponse(BaseModel):
+"""DAG Reserialize serializer for responses."""
+
+message: st

Re: [PR] Added Fastapi endpoint for reserialize all/specific dags [airflow]

2025-03-23 Thread via GitHub


shubham-pyc commented on PR #48174:
URL: https://github.com/apache/airflow/pull/48174#issuecomment-2746628549

   @jedcunningham I have incorporated your suggestions for [reparse a file 
feature](https://github.com/apache/airflow/blob/2500dcf20d2782d16da53ee857c0aab21bfdfbf2/airflow/api_fastapi/core_api/routes/public/dag_parsing.py#L47)
 feature. Please take a look 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org