kaxil commented on code in PR #64522:
URL: https://github.com/apache/airflow/pull/64522#discussion_r3018435333
##########
task-sdk/src/airflow/sdk/definitions/decorators/__init__.py:
##########
@@ -25,6 +25,12 @@
from airflow.sdk.definitions.decorators.task_group import task_group
from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
+if TYPE_CHECKING:
+ from collections.abc import Callable
+ from typing import TypeVar
+
+ T = TypeVar("T", bound=Callable)
Review Comment:
This `T` TypeVar is defined but never used anywhere in this file. Looks like
leftover from an earlier iteration. The `Callable` import on line 29 is also
only needed for `T`'s bound, so both can be removed (along with the `TypeVar`
import).
##########
airflow-core/src/airflow/models/xcom.py:
##########
@@ -66,6 +67,7 @@ class XComModel(TaskInstanceDependencies):
task_id: Mapped[str] = mapped_column(String(ID_LEN, **COLLATION_ARGS),
nullable=False, primary_key=True)
map_index: Mapped[int] = mapped_column(Integer, primary_key=True,
nullable=False, server_default="-1")
key: Mapped[str] = mapped_column(String(512, **COLLATION_ARGS),
nullable=False, primary_key=True)
+ dag_result: Mapped[bool | None] = mapped_column(Boolean, nullable=True,
default=False)
Review Comment:
The `default=False` here is a Python-side ORM default, so existing rows and
anything inserted outside the ORM will get `NULL`. Adding
`server_default=sa.false()` (and the same in the migration) would give every
row a consistent value and avoid NULL-vs-False gotchas in future queries.
##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -529,12 +529,14 @@ def set(
key: str,
value,
map_index: int | None = None,
+ *,
+ dag_result: bool = False,
mapped_length: int | None = None,
) -> OKResponse:
"""Set a XCom value via the API server."""
# TODO: check if we need to use map_index as params in the uri
# ref:
https://github.com/apache/airflow/blob/v2-10-stable/airflow/api_connexion/openapi/v1.yaml#L1785C1-L1785C81
- params = {}
+ params: dict[str, Any] = {"dag_result": dag_result}
if map_index is not None and map_index >= 0:
params = {"map_index": map_index}
Review Comment:
Bug: `params = {"map_index": map_index}` overwrites the dict that was just
initialized with `dag_result` on line 539. For any mapped task with `map_index
>= 0`, the `dag_result` flag is silently dropped.
Should be `params["map_index"] = map_index` instead (same pattern as the
`mapped_length` line below it).
##########
task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py:
##########
@@ -113,6 +113,7 @@ class AbstractOperator(Templater, DAGNode):
_on_failure_fail_dagrun = False
is_setup: bool = False
is_teardown: bool = False
+ returns_dag_result: bool = False
Review Comment:
Consider adding `returns_dag_result` to `HIDE_ATTRS_FROM_UI` below -- it is
an internal scheduling flag, not something operators would want to see in the
Graph view tooltip.
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -715,6 +721,7 @@ def _xcom_push(ti: RuntimeTaskInstance, key: str, value:
Any, mapped_length: int
task_id=ti.task_id,
run_id=ti.run_id,
map_index=ti.map_index,
+ dag_result=ti.task.returns_dag_result,
Review Comment:
`ti.task.returns_dag_result` reads this attribute at execution time, but
`returns_dag_result` is not included in
`SerializedBaseOperator.get_serialized_fields()` (in
`airflow-core/src/airflow/serialization/definitions/baseoperator.py`). That
means it gets dropped during DAG serialization and always deserializes back to
`False`.
The test passes because it constructs the task in-process without going
through serialize/deserialize. You would need to add it to both the class
attributes and the `get_serialized_fields()` frozenset, similar to how
`is_setup`/`is_teardown` are handled there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]