amoghrajesh commented on code in PR #59874:
URL: https://github.com/apache/airflow/pull/59874#discussion_r2986360296


##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -473,6 +475,8 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: 
FilteringBoundLogger, r
             self.client.variables.set(msg.key, msg.value, msg.description)
         elif isinstance(msg, DeleteXCom):
             self.client.xcoms.delete(msg.dag_id, msg.run_id, msg.task_id, 
msg.key, msg.map_index)
+        elif isinstance(msg, BulkDeleteXCom):
+            self.client.xcoms.delete_all(msg.dag_id, msg.run_id, msg.task_id, 
msg.key, msg.map_index)
         elif isinstance(msg, GetXCom):

Review Comment:
   Is there a reason not to support this in dag processor too if we do it in 
triggerer?



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py:
##########
@@ -437,3 +437,37 @@ def delete_xcom(
     session.execute(query)
     session.commit()
     return {"message": f"XCom with key: {key} successfully deleted."}
+
+
[email protected](
+    "/{dag_id}/{run_id}",
+    responses={status.HTTP_404_NOT_FOUND: {"description": "XComs not found"}},

Review Comment:
   We should be raising this somewhere right?



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py:
##########
@@ -437,3 +437,37 @@ def delete_xcom(
     session.execute(query)
     session.commit()
     return {"message": f"XCom with key: {key} successfully deleted."}
+
+
[email protected](
+    "/{dag_id}/{run_id}",
+    responses={status.HTTP_404_NOT_FOUND: {"description": "XComs not found"}},

Review Comment:
   Or remove it from here if never raised



##########
airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py:
##########
@@ -95,3 +95,13 @@ def ensure_start_date_in_dag_run(response: ResponseInfo) -> 
None:  # type: ignor
         """Ensure start_date is never None in direct DagRun responses for 
previous API versions."""
         if response.body.get("start_date") is None:
             response.body["start_date"] = response.body.get("run_after")
+
+
+class AddXcomBulkDeleteEndpoint(VersionChange):
+    """Add XCom bulk delete endpoint."""
+
+    description = __doc__
+
+    instructions_to_migrate_to_previous_version = (
+        endpoint("xcoms/{dag_id}/{run_id}", ["DELETE"]).didnt_exist,

Review Comment:
   Looks ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to