ashb commented on code in PR #46613:
URL: https://github.com/apache/airflow/pull/46613#discussion_r1948907944


##########
airflow/serialization/serialized_objects.py:
##########
@@ -1520,76 +1523,43 @@ def _deserialize_operator_extra_links(cls, 
encoded_op_links: list) -> dict[str,
             raise AirflowException("Can't load plugins")
         op_predefined_extra_links = {}
 
-        for _operator_links_source in encoded_op_links:
-            # Get the key, value pair as Tuple where key is OperatorLink 
ClassName
-            # and value is the dictionary containing the arguments passed to 
the OperatorLink
+        for item in encoded_op_links.items():
+            # Get the name and xcom_key of the encoded operator and use it to 
create a GenericOperatorLink object
+            # during deserialization.
             #
-            # Example of a single iteration:
-            #
-            #   _operator_links_source =
-            #   {
-            #       
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink':
 {
-            #           'index': 0
-            #       }
-            #   },
-            #
-            #   list(_operator_links_source.items()) =
-            #   [
-            #       (
-            #           
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
-            #           {'index': 0}
-            #       )
-            #   ]
-            #
-            #   list(_operator_links_source.items())[0] =
-            #   (
-            #       
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
-            #       {
-            #           'index': 0
-            #       }
-            #   )
-
-            _operator_link_class_path, data = 
next(iter(_operator_links_source.items()))
-            if _operator_link_class_path in get_operator_extra_links():
-                single_op_link_class = import_string(_operator_link_class_path)
-            elif _operator_link_class_path in 
plugins_manager.registered_operator_link_classes:
-                single_op_link_class = 
plugins_manager.registered_operator_link_classes[
-                    _operator_link_class_path
-                ]
-            else:
-                log.error("Operator Link class %r not registered", 
_operator_link_class_path)
-                return {}
-
-            op_link_parameters = {param: cls.deserialize(value) for param, 
value in data.items()}
-            op_predefined_extra_link: BaseOperatorLink = 
single_op_link_class(**op_link_parameters)
-
+            # Example:
+            # enc_operator['_operator_extra_links'] =
+            # {
+            #     'airflow': 'airflow_link_key',
+            #     'foo-bar': 'link-key',
+            #     'no_response': 'key',
+            #     'raise_error': 'key'
+            # }
+
+            name, xcom_key = item

Review Comment:
   ```suggestion
   ```



##########
airflow/serialization/serialized_objects.py:
##########
@@ -1520,76 +1523,43 @@ def _deserialize_operator_extra_links(cls, 
encoded_op_links: list) -> dict[str,
             raise AirflowException("Can't load plugins")
         op_predefined_extra_links = {}
 
-        for _operator_links_source in encoded_op_links:
-            # Get the key, value pair as Tuple where key is OperatorLink 
ClassName
-            # and value is the dictionary containing the arguments passed to 
the OperatorLink
+        for item in encoded_op_links.items():
+            # Get the name and xcom_key of the encoded operator and use it to 
create a GenericOperatorLink object
+            # during deserialization.
             #
-            # Example of a single iteration:
-            #
-            #   _operator_links_source =
-            #   {
-            #       
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink':
 {
-            #           'index': 0
-            #       }
-            #   },
-            #
-            #   list(_operator_links_source.items()) =
-            #   [
-            #       (
-            #           
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
-            #           {'index': 0}
-            #       )
-            #   ]
-            #
-            #   list(_operator_links_source.items())[0] =
-            #   (
-            #       
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
-            #       {
-            #           'index': 0
-            #       }
-            #   )
-
-            _operator_link_class_path, data = 
next(iter(_operator_links_source.items()))
-            if _operator_link_class_path in get_operator_extra_links():
-                single_op_link_class = import_string(_operator_link_class_path)
-            elif _operator_link_class_path in 
plugins_manager.registered_operator_link_classes:
-                single_op_link_class = 
plugins_manager.registered_operator_link_classes[
-                    _operator_link_class_path
-                ]
-            else:
-                log.error("Operator Link class %r not registered", 
_operator_link_class_path)
-                return {}
-
-            op_link_parameters = {param: cls.deserialize(value) for param, 
value in data.items()}
-            op_predefined_extra_link: BaseOperatorLink = 
single_op_link_class(**op_link_parameters)
-
+            # Example:
+            # enc_operator['_operator_extra_links'] =
+            # {
+            #     'airflow': 'airflow_link_key',
+            #     'foo-bar': 'link-key',
+            #     'no_response': 'key',
+            #     'raise_error': 'key'
+            # }
+
+            name, xcom_key = item
+            op_predefined_extra_link = GenericOperatorLink(name=name, 
xcom_key=xcom_key)
             op_predefined_extra_links.update({op_predefined_extra_link.name: 
op_predefined_extra_link})

Review Comment:
   I think this new link class is going to break the API server -- it's going 
to call `link.get_link(operator=tasl, ti_key=ti.key)` which isn't defined on 
GenericOperatorLink.
   
   Either we need to make that new GOL class have the same interface (and call 
`XCom.pull()` directly) or change the class/type hint for extra links. I think 
the former is better, as that way we can still keep global plugin links working 
with the same interface
   
   



##########
task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py:
##########
@@ -136,6 +140,81 @@ class AbstractOperator(Templater, DAGNode):
         )
     )
 
+    def unmap(self, resolve) -> BaseOperator:
+        """
+        Get the "normal" operator from current abstract operator.
+
+        MappedOperator uses this to unmap itself based on the map index. A non-
+        mapped operator (i.e. BaseOperator subclass) simply returns itself.
+
+        :meta private:
+        """
+        raise NotImplementedError()
+
+    @cached_property
+    def global_operator_extra_link_dict(self) -> dict[str, Any]:
+        """Returns dictionary of all global extra links."""
+        from airflow import plugins_manager
+
+        plugins_manager.initialize_extra_operators_links_plugins()
+        if plugins_manager.global_operator_extra_links is None:
+            raise AirflowException("Can't load operators")
+        return {link.name: link for link in 
plugins_manager.global_operator_extra_links}
+
+    @cached_property
+    def extra_links(self) -> list[str]:
+        return 
sorted(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
+
+    @cached_property
+    def operator_extra_link_dict(self) -> dict[str, Any]:
+        """Returns dictionary of all extra links for the operator."""
+        op_extra_links_from_plugin: dict[str, Any] = {}
+        from airflow import plugins_manager
+
+        plugins_manager.initialize_extra_operators_links_plugins()
+        if plugins_manager.operator_extra_links is None:
+            raise AirflowException("Can't load operators")
+        for ope in plugins_manager.operator_extra_links:
+            if ope.operators and self.operator_class in ope.operators:
+                op_extra_links_from_plugin.update({ope.name: ope})
+
+        operator_extra_links_all = {link.name: link for link in 
self.operator_extra_links}
+        # Extra links defined in Plugins overrides operator links defined in 
operator
+        operator_extra_links_all.update(op_extra_links_from_plugin)
+
+        return operator_extra_links_all
+
+    def get_extra_links(self, ti: TaskInstance, link_name: str) -> str | None:
+        """
+        For an operator, gets the URLs that the ``extra_links`` entry points 
to.
+
+        :meta private:
+
+        :raise ValueError: The error message of a ValueError will be passed on 
through to
+            the fronted to show up as a tooltip on the disabled link.
+        :param ti: The TaskInstance for the URL being searched for.
+        :param link_name: The name of the link we're looking for the URL for. 
Should be
+            one of the options specified in ``extra_links``.
+        """
+        link = self.operator_extra_link_dict.get(link_name)
+        if not link:
+            link = self.global_operator_extra_link_dict.get(link_name)
+            if not link:
+                return None
+
+        # TODO: replace with isinstance checks when operator links ported to 
sdk
+        try:
+            el = link.get_link(ti=ti)
+            # Temporary workaround till 
https://github.com/apache/airflow/issues/46513 is handled.
+            return el.strip('"')

Review Comment:
   This doesn't make sense -- we aren't operating on XCom values here -- but 
the return of the actual OperatorLink class I thought?



##########
airflow/serialization/serialized_objects.py:
##########
@@ -1520,76 +1523,43 @@ def _deserialize_operator_extra_links(cls, 
encoded_op_links: list) -> dict[str,
             raise AirflowException("Can't load plugins")
         op_predefined_extra_links = {}
 
-        for _operator_links_source in encoded_op_links:
-            # Get the key, value pair as Tuple where key is OperatorLink 
ClassName
-            # and value is the dictionary containing the arguments passed to 
the OperatorLink
+        for item in encoded_op_links.items():

Review Comment:
   Nit of sorts.
   ```suggestion
           for name, xcom_key in encoded_op_links.items():
   ```



##########
newsfragments/46613.feature.rst:
##########
@@ -0,0 +1 @@
+Operator Links interface changed to not run user code in Airflow Webserver The 
Operator Extra links, which can be defined either via plugins or custom 
operators now do not execute any user code in the Airflow Webserver, but 
instead push the "full" links to XCom backend and the value is again fetched 
from the XCom backend when viewing task details in grid view.

Review Comment:
   Can you give some more info/short example code about what users who have a 
custom link class written have to do?



##########
task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py:
##########
@@ -136,6 +140,81 @@ class AbstractOperator(Templater, DAGNode):
         )
     )
 
+    def unmap(self, resolve) -> BaseOperator:
+        """
+        Get the "normal" operator from current abstract operator.
+
+        MappedOperator uses this to unmap itself based on the map index. A non-
+        mapped operator (i.e. BaseOperator subclass) simply returns itself.
+
+        :meta private:
+        """
+        raise NotImplementedError()
+
+    @cached_property
+    def global_operator_extra_link_dict(self) -> dict[str, Any]:
+        """Returns dictionary of all global extra links."""
+        from airflow import plugins_manager
+
+        plugins_manager.initialize_extra_operators_links_plugins()
+        if plugins_manager.global_operator_extra_links is None:
+            raise AirflowException("Can't load operators")
+        return {link.name: link for link in 
plugins_manager.global_operator_extra_links}

Review Comment:
   I wonder if we should keep global operator links defined /loaded from the 
webserver instead of in the task sdk
   
   (I think yes, that way we don't need to use plugin manager in task SDK.



##########
airflow/serialization/serialized_objects.py:
##########
@@ -1520,76 +1523,43 @@ def _deserialize_operator_extra_links(cls, 
encoded_op_links: list) -> dict[str,
             raise AirflowException("Can't load plugins")
         op_predefined_extra_links = {}
 
-        for _operator_links_source in encoded_op_links:
-            # Get the key, value pair as Tuple where key is OperatorLink 
ClassName
-            # and value is the dictionary containing the arguments passed to 
the OperatorLink
+        for item in encoded_op_links.items():
+            # Get the name and xcom_key of the encoded operator and use it to 
create a GenericOperatorLink object
+            # during deserialization.
             #
-            # Example of a single iteration:
-            #
-            #   _operator_links_source =
-            #   {
-            #       
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink':
 {
-            #           'index': 0
-            #       }
-            #   },
-            #
-            #   list(_operator_links_source.items()) =
-            #   [
-            #       (
-            #           
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
-            #           {'index': 0}
-            #       )
-            #   ]
-            #
-            #   list(_operator_links_source.items())[0] =
-            #   (
-            #       
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
-            #       {
-            #           'index': 0
-            #       }
-            #   )
-
-            _operator_link_class_path, data = 
next(iter(_operator_links_source.items()))
-            if _operator_link_class_path in get_operator_extra_links():
-                single_op_link_class = import_string(_operator_link_class_path)
-            elif _operator_link_class_path in 
plugins_manager.registered_operator_link_classes:
-                single_op_link_class = 
plugins_manager.registered_operator_link_classes[
-                    _operator_link_class_path
-                ]
-            else:
-                log.error("Operator Link class %r not registered", 
_operator_link_class_path)
-                return {}
-
-            op_link_parameters = {param: cls.deserialize(value) for param, 
value in data.items()}
-            op_predefined_extra_link: BaseOperatorLink = 
single_op_link_class(**op_link_parameters)
-
+            # Example:
+            # enc_operator['_operator_extra_links'] =
+            # {
+            #     'airflow': 'airflow_link_key',
+            #     'foo-bar': 'link-key',
+            #     'no_response': 'key',
+            #     'raise_error': 'key'
+            # }
+
+            name, xcom_key = item
+            op_predefined_extra_link = GenericOperatorLink(name=name, 
xcom_key=xcom_key)
             op_predefined_extra_links.update({op_predefined_extra_link.name: 
op_predefined_extra_link})
 
         return op_predefined_extra_links
 
     @classmethod
-    def _serialize_operator_extra_links(cls, operator_extra_links: 
Iterable[BaseOperatorLink]):
+    def _serialize_operator_extra_links(
+        cls, operator_extra_links: Iterable[BaseOperatorLink]
+    ) -> dict[str, str]:
         """
         Serialize Operator Links.
 
-        Store the import path of the OperatorLink and the arguments passed to 
it.
+        Store the "name" of the link mapped with the xcom_key which can be 
later used to retrieve this
+        operator extra link from XComs.
         For example:
-        
``[{'airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink': {}}]``
+        ``{'link-name-1': 'xcom-key-1'}``
 
         :param operator_extra_links: Operator Link
         :return: Serialized Operator Link
         """
-        serialize_operator_extra_links = []
+        serialize_operator_extra_links = {}
         for operator_extra_link in operator_extra_links:
-            op_link_arguments = {
-                param: cls.serialize(value) for param, value in 
attrs.asdict(operator_extra_link).items()
-            }
-
-            module_path = (
-                
f"{operator_extra_link.__class__.__module__}.{operator_extra_link.__class__.__name__}"
-            )
-            serialize_operator_extra_links.append({module_path: 
op_link_arguments})
-
+            serialize_operator_extra_links[operator_extra_link.name] = 
operator_extra_link.xcom_key
         return serialize_operator_extra_links

Review Comment:
   We can simplify this whole function:
   
   ```python
       return {
           link.name: link.xcom_key for link in operator_extra_links
       }
   ```



##########
task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py:
##########
@@ -136,6 +140,81 @@ class AbstractOperator(Templater, DAGNode):
         )
     )
 
+    def unmap(self, resolve) -> BaseOperator:
+        """
+        Get the "normal" operator from current abstract operator.
+
+        MappedOperator uses this to unmap itself based on the map index. A non-
+        mapped operator (i.e. BaseOperator subclass) simply returns itself.
+
+        :meta private:
+        """
+        raise NotImplementedError()
+
+    @cached_property
+    def global_operator_extra_link_dict(self) -> dict[str, Any]:
+        """Returns dictionary of all global extra links."""
+        from airflow import plugins_manager
+
+        plugins_manager.initialize_extra_operators_links_plugins()
+        if plugins_manager.global_operator_extra_links is None:
+            raise AirflowException("Can't load operators")
+        return {link.name: link for link in 
plugins_manager.global_operator_extra_links}
+
+    @cached_property
+    def extra_links(self) -> list[str]:
+        return 
sorted(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
+
+    @cached_property
+    def operator_extra_link_dict(self) -> dict[str, Any]:
+        """Returns dictionary of all extra links for the operator."""
+        op_extra_links_from_plugin: dict[str, Any] = {}
+        from airflow import plugins_manager
+
+        plugins_manager.initialize_extra_operators_links_plugins()
+        if plugins_manager.operator_extra_links is None:
+            raise AirflowException("Can't load operators")
+        for ope in plugins_manager.operator_extra_links:
+            if ope.operators and self.operator_class in ope.operators:
+                op_extra_links_from_plugin.update({ope.name: ope})
+
+        operator_extra_links_all = {link.name: link for link in 
self.operator_extra_links}
+        # Extra links defined in Plugins overrides operator links defined in 
operator
+        operator_extra_links_all.update(op_extra_links_from_plugin)
+
+        return operator_extra_links_all
+
+    def get_extra_links(self, ti: TaskInstance, link_name: str) -> str | None:
+        """
+        For an operator, gets the URLs that the ``extra_links`` entry points 
to.
+
+        :meta private:
+
+        :raise ValueError: The error message of a ValueError will be passed on 
through to
+            the fronted to show up as a tooltip on the disabled link.
+        :param ti: The TaskInstance for the URL being searched for.
+        :param link_name: The name of the link we're looking for the URL for. 
Should be
+            one of the options specified in ``extra_links``.
+        """
+        link = self.operator_extra_link_dict.get(link_name)
+        if not link:
+            link = self.global_operator_extra_link_dict.get(link_name)
+            if not link:
+                return None
+
+        # TODO: replace with isinstance checks when operator links ported to 
sdk
+        try:
+            el = link.get_link(ti=ti)

Review Comment:
   In the TaskSDK we should never call this form -- the one that takes the `ti` 
is on `GenericOperatorLink` which is used only in the Webserver, which 
shouldn't be called from this path -- or to put it another way: we shouldn't 
have Webserver specific code here in the TaskSDK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to