amoghrajesh commented on code in PR #50238:
URL: https://github.com/apache/airflow/pull/50238#discussion_r2075401923


##########
airflow-core/src/airflow/models/mappedoperator.py:
##########
@@ -118,3 +122,54 @@ def expand_start_trigger_args(self, *, context: Context, 
session: Session) -> St
             next_kwargs=next_kwargs,
             timeout=timeout,
         )
+
+    @cached_property
+    def operator_extra_link_dict(self) -> dict[str, BaseOperatorLink]:
+        """Returns dictionary of all extra links for the operator."""
+        op_extra_links_from_plugin: dict[str, Any] = {}
+        from airflow import plugins_manager
+
+        plugins_manager.initialize_extra_operators_links_plugins()
+        if plugins_manager.operator_extra_links is None:
+            raise AirflowException("Can't load operators")
+        operator_class_type = self.operator_class["task_type"]  # type: ignore
+        for ope in plugins_manager.operator_extra_links:
+            if ope.operators and any(operator_class_type in cls.__name__ for 
cls in ope.operators):
+                op_extra_links_from_plugin.update({ope.name: ope})

Review Comment:
   Had to do this part slightly different compared to what we do for 
baseoperator: 
https://github.com/apache/airflow/pull/46613/files#diff-f373d874912ccfa03918e853ad15aa91d6bfaa1ee75f1676f78c8a756f332ed0L170
   
   This is because the `operator_class` for baseoperator returns the class name:
   ```
       @property
       def operator_class(self) -> type[BaseOperator]:  # type: ignore[override]
           return self.__class__
   
   ```
   
   But not the case here. It looks like this for mapped operators:
   ```
   operator class is {'task_id': 'add_one', 'template_fields_renderers': 
{'templates_dict': 'json', 'op_args': 'py', 'op_kwargs': 'py'}, 'template_ext': 
[], 'start_from_trigger': False, 'template_fields': ['templates_dict', 
'op_args', 'op_kwargs'], 'ui_fgcolor': '#000', 'ui_color': '#ffefeb', 
'task_type': '_PythonDecoratedOperator', 'downstream_task_ids': ['sum_it'], 
'start_trigger_args': None
   ```
   So retrieved the task_type and used it in comparison with `ope.operators`



##########
airflow-core/src/airflow/models/mappedoperator.py:
##########
@@ -118,3 +122,54 @@ def expand_start_trigger_args(self, *, context: Context, 
session: Session) -> St
             next_kwargs=next_kwargs,
             timeout=timeout,
         )
+
+    @cached_property
+    def operator_extra_link_dict(self) -> dict[str, BaseOperatorLink]:
+        """Returns dictionary of all extra links for the operator."""
+        op_extra_links_from_plugin: dict[str, Any] = {}
+        from airflow import plugins_manager
+
+        plugins_manager.initialize_extra_operators_links_plugins()
+        if plugins_manager.operator_extra_links is None:
+            raise AirflowException("Can't load operators")
+        operator_class_type = self.operator_class["task_type"]  # type: ignore
+        for ope in plugins_manager.operator_extra_links:
+            if ope.operators and any(operator_class_type in cls.__name__ for 
cls in ope.operators):
+                op_extra_links_from_plugin.update({ope.name: ope})
+
+        operator_extra_links_all = {link.name: link for link in 
self.operator_extra_links}
+        # Extra links defined in Plugins overrides operator links defined in 
operator
+        operator_extra_links_all.update(op_extra_links_from_plugin)
+
+        return operator_extra_links_all
+
+    @cached_property
+    def global_operator_extra_link_dict(self) -> dict[str, Any]:
+        """Returns dictionary of all global extra links."""
+        from airflow import plugins_manager
+
+        plugins_manager.initialize_extra_operators_links_plugins()
+        if plugins_manager.global_operator_extra_links is None:
+            raise AirflowException("Can't load operators")
+        return {link.name: link for link in 
plugins_manager.global_operator_extra_links}
+
+    @cached_property
+    def extra_links(self) -> list[str]:
+        return 
sorted(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
+
+    def get_extra_links(self, ti: TaskInstance, name: str) -> str | None:
+        """
+        For an operator, gets the URLs that the ``extra_links`` entry points 
to.
+
+        :meta private:
+
+        :raise ValueError: The error message of a ValueError will be passed on 
through to
+            the fronted to show up as a tooltip on the disabled link.
+        :param ti: The TaskInstance for the URL being searched for.
+        :param name: The name of the link we're looking for the URL for. 
Should be
+            one of the options specified in ``extra_links``.
+        """
+        link = self.operator_extra_link_dict.get(name) or 
self.global_operator_extra_link_dict.get(name)
+        if not link:
+            return None
+        return link.get_link(self, ti_key=ti.key)  # type: ignore[arg-type]

Review Comment:
   No need to use `self.unmap` in the first param anymore.
   Unmap was to get the "real" task for a mapped version, but now that the only 
thing we do with extra links is read XCom we don't care about any of that.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to