amoghrajesh commented on code in PR #46613:
URL: https://github.com/apache/airflow/pull/46613#discussion_r1949376999
##########
task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py:
##########
@@ -136,6 +140,81 @@ class AbstractOperator(Templater, DAGNode):
)
)
+ def unmap(self, resolve) -> BaseOperator:
+ """
+ Get the "normal" operator from current abstract operator.
+
+ MappedOperator uses this to unmap itself based on the map index. A non-
+ mapped operator (i.e. BaseOperator subclass) simply returns itself.
+
+ :meta private:
+ """
+ raise NotImplementedError()
+
+ @cached_property
+ def global_operator_extra_link_dict(self) -> dict[str, Any]:
+ """Returns dictionary of all global extra links."""
+ from airflow import plugins_manager
+
+ plugins_manager.initialize_extra_operators_links_plugins()
+ if plugins_manager.global_operator_extra_links is None:
+ raise AirflowException("Can't load operators")
+ return {link.name: link for link in
plugins_manager.global_operator_extra_links}
+
+ @cached_property
+ def extra_links(self) -> list[str]:
+ return
sorted(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
+
+ @cached_property
+ def operator_extra_link_dict(self) -> dict[str, Any]:
+ """Returns dictionary of all extra links for the operator."""
+ op_extra_links_from_plugin: dict[str, Any] = {}
+ from airflow import plugins_manager
+
+ plugins_manager.initialize_extra_operators_links_plugins()
+ if plugins_manager.operator_extra_links is None:
+ raise AirflowException("Can't load operators")
+ for ope in plugins_manager.operator_extra_links:
+ if ope.operators and self.operator_class in ope.operators:
+ op_extra_links_from_plugin.update({ope.name: ope})
+
+ operator_extra_links_all = {link.name: link for link in
self.operator_extra_links}
+ # Extra links defined in Plugins overrides operator links defined in
operator
+ operator_extra_links_all.update(op_extra_links_from_plugin)
+
+ return operator_extra_links_all
+
+ def get_extra_links(self, ti: TaskInstance, link_name: str) -> str | None:
+ """
+ For an operator, gets the URLs that the ``extra_links`` entry points
to.
+
+ :meta private:
+
+ :raise ValueError: The error message of a ValueError will be passed on
through to
+ the fronted to show up as a tooltip on the disabled link.
+ :param ti: The TaskInstance for the URL being searched for.
+ :param link_name: The name of the link we're looking for the URL for.
Should be
+ one of the options specified in ``extra_links``.
+ """
+ link = self.operator_extra_link_dict.get(link_name)
+ if not link:
+ link = self.global_operator_extra_link_dict.get(link_name)
+ if not link:
+ return None
+
+ # TODO: replace with isinstance checks when operator links ported to
sdk
+ try:
+ el = link.get_link(ti=ti)
+ # Temporary workaround till
https://github.com/apache/airflow/issues/46513 is handled.
+ return el.strip('"')
Review Comment:
The return of the actual operator link class is pushed to xcom during task
runtime and when its pushed to XCOM, it gets those extra string quotes. For
example, this link:
```
class FooBarLink(BaseOperatorLink):
name = "foo-bar"
def get_link(self, operator, *, ti_key):
return f"http://www.example.com"
```
Pushes xcom which looks like in the table:
```
"\"\\\"http://www.example.com\\\"\""
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]