Copilot commented on code in PR #64577:
URL: https://github.com/apache/airflow/pull/64577#discussion_r3025322072


##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -53,12 +53,19 @@ async def _get_dag_run(self) -> DagRun:
             return await 
session.scalar(select(DagRun).filter_by(dag_id=self.dag_id, run_id=self.run_id))
 
     def _serialize_xcoms(self) -> dict[str, Any]:
-        xcom_query = XComModel.get_many(
-            run_id=self.run_id,
-            key=XCOM_RETURN_KEY,
-            task_ids=self.result_task_ids,
-            dag_ids=self.dag_id,
-        )
+        if self.result_task_ids is None:
+            xcom_query = XComModel.get_many(
+                run_id=self.run_id,
+                dag_ids=self.dag_id,
+            )
+            xcom_query = xcom_query.where(XComModel.dag_result == true())
+        else:
+            xcom_query = XComModel.get_many(
+                run_id=self.run_id,
+                key=XCOM_RETURN_KEY,
+                task_ids=self.result_task_ids,
+                dag_ids=self.dag_id,
+            )
         xcom_results: ScalarResult[tuple[XComModel]] = self.session.scalars(
             xcom_query.order_by(XComModel.task_id, XComModel.map_index)

Review Comment:
   XComModel.get_many() already applies an ORDER BY (DagRun.logical_date desc, 
XCom timestamp desc). Calling order_by() again here appends additional criteria 
instead of replacing them, so the result rows may not be grouped contiguously 
by task_id; itertools.groupby can then split a single task into multiple groups 
and produce incorrect "results" output. Clear the existing ordering before 
ordering by (task_id, map_index) (or avoid adding ordering in get_many for this 
use case).
   ```suggestion
               xcom_query.order_by(None).order_by(XComModel.task_id, 
XComModel.map_index)
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -53,12 +53,19 @@ async def _get_dag_run(self) -> DagRun:
             return await 
session.scalar(select(DagRun).filter_by(dag_id=self.dag_id, run_id=self.run_id))
 
     def _serialize_xcoms(self) -> dict[str, Any]:
-        xcom_query = XComModel.get_many(
-            run_id=self.run_id,
-            key=XCOM_RETURN_KEY,
-            task_ids=self.result_task_ids,
-            dag_ids=self.dag_id,
-        )
+        if self.result_task_ids is None:
+            xcom_query = XComModel.get_many(
+                run_id=self.run_id,
+                dag_ids=self.dag_id,
+            )
+            xcom_query = xcom_query.where(XComModel.dag_result == true())
+        else:

Review Comment:
   This change introduces/changes public behavior for the 
/dagRuns/{run_id}/wait endpoint (defaulting to returning dag_result XComs when 
the result parameter is omitted), but the existing tests currently assert that 
omitting the result parameter returns only {"state": ...}. Add/update unit 
tests to cover: (1) default dag_result results included when present, (2) 
results omitted when no dag_result XCom exists, and (3) authorization behavior 
for reading results.



##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -53,12 +53,19 @@ async def _get_dag_run(self) -> DagRun:
             return await 
session.scalar(select(DagRun).filter_by(dag_id=self.dag_id, run_id=self.run_id))
 
     def _serialize_xcoms(self) -> dict[str, Any]:
-        xcom_query = XComModel.get_many(
-            run_id=self.run_id,
-            key=XCOM_RETURN_KEY,
-            task_ids=self.result_task_ids,
-            dag_ids=self.dag_id,
-        )
+        if self.result_task_ids is None:
+            xcom_query = XComModel.get_many(
+                run_id=self.run_id,
+                dag_ids=self.dag_id,
+            )

Review Comment:
   If the intended behavior is to return dag_result XComs even when the client 
does not pass result_task_ids, make sure the call path enforces XCom read 
authorization for that default case too. Currently, the route only performs the 
XCom permission check when the "result" query parameter is provided, so 
returning results by default could expose XCom data without the expected 
authorization gate.



##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -53,12 +53,19 @@ async def _get_dag_run(self) -> DagRun:
             return await 
session.scalar(select(DagRun).filter_by(dag_id=self.dag_id, run_id=self.run_id))
 
     def _serialize_xcoms(self) -> dict[str, Any]:
-        xcom_query = XComModel.get_many(
-            run_id=self.run_id,
-            key=XCOM_RETURN_KEY,
-            task_ids=self.result_task_ids,
-            dag_ids=self.dag_id,
-        )
+        if self.result_task_ids is None:
+            xcom_query = XComModel.get_many(
+                run_id=self.run_id,
+                dag_ids=self.dag_id,
+            )

Review Comment:
   The new default branch (when result_task_ids is None) is never actually 
returned to clients: _serialize_response only adds the "results" field when 
self.result_task_ids is truthy, so the dag_result query path introduced here is 
effectively dead code. Consider treating None as "use dag_result defaults" (and 
include results only if the query returns any rows) so the behavior matches the 
PR description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to