Copilot commented on code in PR #62079:
URL: https://github.com/apache/airflow/pull/62079#discussion_r3066490428
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -341,6 +342,89 @@ def _delete_variable(key: str) -> None:
SecretCache.invalidate_variable(key)
+class ExtraLinksAccessor(MutableMapping[str, str]):
+ """Wrapper over ``TaskFlowExtraLink`` instances that pushes URLs via
supervisor comms."""
+
+ def __init__(
+ self,
+ links: Iterable[TaskFlowExtraLink] | None = None,
+ *,
+ dag_id: str = "",
+ task_id: str = "",
+ run_id: str = "",
+ map_index: int | None = -1,
+ ) -> None:
+ self._links: dict[str, TaskFlowExtraLink] = {}
+ for link in links or ():
+ self._links[link.name] = link
+ self._dag_id = dag_id
+ self._task_id = task_id
+ self._run_id = run_id
+ self._map_index = map_index
+
+ @classmethod
+ def from_operator_links(
+ cls,
+ operator_links: Iterable[BaseOperatorLink] | None,
+ *,
+ dag_id: str = "",
+ task_id: str = "",
+ run_id: str = "",
+ map_index: int | None = -1,
+ ) -> ExtraLinksAccessor:
+ from airflow.sdk.bases.operatorlink import TaskFlowExtraLink
+
+ return cls(
Review Comment:
`ExtraLinksAccessor.from_operator_links()` does a local import of
`TaskFlowExtraLink`. This doesn’t appear necessary for circular-import
avoidance (the class is already referenced in this module), and keeping it at
module scope improves clarity and follows the repository’s import conventions.
##########
task-sdk/src/airflow/sdk/bases/decorator.py:
##########
@@ -506,13 +528,22 @@ def __call__(self, *args: FParams.args, **kwargs:
FParams.kwargs) -> XComArg:
raise ValueError("Trigger rule not configurable for teardown
tasks.")
self.kwargs.update(trigger_rule=TriggerRule.ALL_DONE_SETUP_SUCCESS)
on_failure_fail_dagrun = self.kwargs.pop("on_failure_fail_dagrun",
self.on_failure_fail_dagrun)
+
+ extra_links = self.kwargs.pop("extra_links", [])
+
op = self.operator_class(
python_callable=self.function,
op_args=args,
op_kwargs=kwargs,
multiple_outputs=self.multiple_outputs,
**self.kwargs,
)
Review Comment:
`_TaskDecorator.__call__` mutates `self.kwargs` by `pop`-ing `extra_links`
(and `on_failure_fail_dagrun`). If the same decorated function is invoked more
than once (e.g. reused across multiple DAGs or called twice with overrides),
subsequent calls will silently lose the `extra_links` configuration. Use a
local copy of `self.kwargs` (as `_expand()` already does) and pop from that,
leaving `self.kwargs` unchanged for future calls.
##########
task-sdk/src/airflow/sdk/bases/decorator.py:
##########
@@ -252,6 +254,26 @@ def determine_kwargs(
return KeywordParameters.determine(func, args, kwargs).unpacking()
+def _build_extra_links(
+ extra_links: list[str] | None,
+ existing_links: Collection[BaseOperatorLink] | None,
+) -> tuple[BaseOperatorLink, ...]:
+ from airflow.sdk.bases.operatorlink import TaskFlowExtraLink
+
+ links_by_name: dict[str, BaseOperatorLink] = {link.name: link for link in
existing_links or ()}
Review Comment:
`_build_extra_links()` imports `TaskFlowExtraLink` inside the function. This
adds runtime overhead and violates the project guideline to keep imports at
module scope unless needed for circular-import avoidance/lazy loading. Since
`TaskFlowExtraLink` is lightweight and doesn’t appear to introduce a cycle
here, import it at the top of the module instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]