Copilot commented on code in PR #64577:
URL: https://github.com/apache/airflow/pull/64577#discussion_r3025322072
##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -53,12 +53,19 @@ async def _get_dag_run(self) -> DagRun:
return await
session.scalar(select(DagRun).filter_by(dag_id=self.dag_id, run_id=self.run_id))
def _serialize_xcoms(self) -> dict[str, Any]:
- xcom_query = XComModel.get_many(
- run_id=self.run_id,
- key=XCOM_RETURN_KEY,
- task_ids=self.result_task_ids,
- dag_ids=self.dag_id,
- )
+ if self.result_task_ids is None:
+ xcom_query = XComModel.get_many(
+ run_id=self.run_id,
+ dag_ids=self.dag_id,
+ )
+ xcom_query = xcom_query.where(XComModel.dag_result == true())
+ else:
+ xcom_query = XComModel.get_many(
+ run_id=self.run_id,
+ key=XCOM_RETURN_KEY,
+ task_ids=self.result_task_ids,
+ dag_ids=self.dag_id,
+ )
xcom_results: ScalarResult[tuple[XComModel]] = self.session.scalars(
xcom_query.order_by(XComModel.task_id, XComModel.map_index)
Review Comment:
XComModel.get_many() already applies an ORDER BY (DagRun.logical_date desc,
XCom timestamp desc). Calling order_by() again here appends additional criteria
instead of replacing them, so the result rows may not be grouped contiguously
by task_id; itertools.groupby can then split a single task into multiple groups
and produce incorrect "results" output. Clear the existing ordering before
ordering by (task_id, map_index) (or avoid adding ordering in get_many for this
use case).
```suggestion
xcom_query.order_by(None).order_by(XComModel.task_id,
XComModel.map_index)
```
##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -53,12 +53,19 @@ async def _get_dag_run(self) -> DagRun:
return await
session.scalar(select(DagRun).filter_by(dag_id=self.dag_id, run_id=self.run_id))
def _serialize_xcoms(self) -> dict[str, Any]:
- xcom_query = XComModel.get_many(
- run_id=self.run_id,
- key=XCOM_RETURN_KEY,
- task_ids=self.result_task_ids,
- dag_ids=self.dag_id,
- )
+ if self.result_task_ids is None:
+ xcom_query = XComModel.get_many(
+ run_id=self.run_id,
+ dag_ids=self.dag_id,
+ )
+ xcom_query = xcom_query.where(XComModel.dag_result == true())
+ else:
Review Comment:
This change introduces/changes public behavior for the
/dagRuns/{run_id}/wait endpoint (defaulting to returning dag_result XComs when
the result parameter is omitted), but the existing tests currently assert that
omitting the result parameter returns only {"state": ...}. Add/update unit
tests to cover: (1) default dag_result results included when present, (2)
results omitted when no dag_result XCom exists, and (3) authorization behavior
for reading results.
##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -53,12 +53,19 @@ async def _get_dag_run(self) -> DagRun:
return await
session.scalar(select(DagRun).filter_by(dag_id=self.dag_id, run_id=self.run_id))
def _serialize_xcoms(self) -> dict[str, Any]:
- xcom_query = XComModel.get_many(
- run_id=self.run_id,
- key=XCOM_RETURN_KEY,
- task_ids=self.result_task_ids,
- dag_ids=self.dag_id,
- )
+ if self.result_task_ids is None:
+ xcom_query = XComModel.get_many(
+ run_id=self.run_id,
+ dag_ids=self.dag_id,
+ )
Review Comment:
If the intended behavior is to return dag_result XComs even when the client
does not pass result_task_ids, make sure the call path enforces XCom read
authorization for that default case too. Currently, the route only performs the
XCom permission check when the "result" query parameter is provided, so
returning results by default could expose XCom data without the expected
authorization gate.
##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -53,12 +53,19 @@ async def _get_dag_run(self) -> DagRun:
return await
session.scalar(select(DagRun).filter_by(dag_id=self.dag_id, run_id=self.run_id))
def _serialize_xcoms(self) -> dict[str, Any]:
- xcom_query = XComModel.get_many(
- run_id=self.run_id,
- key=XCOM_RETURN_KEY,
- task_ids=self.result_task_ids,
- dag_ids=self.dag_id,
- )
+ if self.result_task_ids is None:
+ xcom_query = XComModel.get_many(
+ run_id=self.run_id,
+ dag_ids=self.dag_id,
+ )
Review Comment:
The new default branch (when result_task_ids is None) is never actually
returned to clients: _serialize_response only adds the "results" field when
self.result_task_ids is truthy, so the dag_result query path introduced here is
effectively dead code. Consider treating None as "use dag_result defaults" (and
include results only if the query returns any rows) so the behavior matches the
PR description.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]