amoghrajesh opened a new pull request, #46613:
URL: https://github.com/apache/airflow/pull/46613

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   closes: https://github.com/apache/airflow/issues/46609
   
   ## Why?
   If you want to add further links to operators you can define them via a 
plugin or provider package. Extra links will be displayed in task details page 
in Grid view.
   
   Currently, users define operator links in one of two ways:
   1. Using plugins and defining operators in those plugins for defining extra 
links 
   2. Directly using custom airflow operators to define operator extra links.
   
   This feature doesn't work with Airflow 3, so while porting this to task sdk 
so it can be used by users, we decided to simplify the extra links code-flow in 
a way to also reduce / decouple the user code that runs using `get_link` of a 
operator extra link in the airflow webserver to outside the context of the 
webserver.
   
   
   ## What?
   To get this working, a simple interface change is made and the following 
things get easier:
   1. While defining an extra link, add a "xcom_key" required field to it.
   
   For example:
   ```
   class FooBarLink(BaseOperatorLink):
       name = "foo-bar"
       xcom_key = "link-key"
   
       def get_link(self, operator, *, ti_key):
           return f"http://www.example.com";
   ```
   
   2. At runtime, from the task runner, after the task runs (when its 
`finalize`ing), it requests to push an XCOM which contains the "entire" link 
for the extra link with the xcom key and task id.
   
   Ex:
   ```
   def finalize(ti: RuntimeTaskInstance, log: Logger):
       for oe in ti.task.operator_extra_link_dict.values():
           link, xcom_key = oe.get_link(operator=ti.task, ti_key=ti.id), 
oe.xcom_key  # type: ignore[arg-type]
           log.debug("Setting xcom for operator extra link", link=link, 
xcom_key=xcom_key)
           _xcom_push(ti, key=xcom_key, value=link)
   ```
   
   3. After the task has completed running, when the user goes to grid view, he 
can see the extra button.
   The link in this button comes by reading from xcom instead of running user 
code
   
   ## How?
   
   ### Changes of note:
   
   #### BaseOperatorLink Model
   Introducing a new `xcom_key` datafield that carries the xcom_key with which 
the entire link will be pushed.
   
   Introduced a new slim interface that we will use during deserialisation 
which can return the link from xcoms instead of user defined "get_link".
   ```
   @attrs.define()
   class GenericOperatorLink(LoggingMixin):
       """A generic operator link class that can retrieve link only using 
XCOMs. Used while deserializing operators."""
   
       name: str
       xcom_key: str
   
       def get_link(self, ti: TaskInstance) -> str:
           """
           Retrieve the link from the XComs.
           :param ti: Task instance from which to retrieve the link
           :return: link to external system, but by pulling it from XComs
           """
           self.log.info("Retrieving link from XComs with key: %s for task id: 
%s", self.xcom_key, ti.task_id)
           return ti.xcom_pull(key=self.xcom_key, task_ids=ti.task_id)  # type: 
ignore
   ```
   
   
   #### DAG serialisation/ deserialisation
   We need something in the serdag to say what links a task has, which we 
already have but the format is overly complex.
   
   Difference in formats (then vs now):
   <meta charset="utf-8"><b style="font-weight:normal;" 
id="docs-internal-guid-aeba09dd-7fff-6e38-86db-19bab8077e4b"><br /><br /><div 
dir="ltr" style="margin-left:0pt;" align="left">
     | Older | Newer
   -- | -- | --
   Encoded Operator | '_operator_extra_links': 
[{'tests.www.views.test_views_extra_links.RaiseErrorLink': {}},  
{'tests.www.views.test_views_extra_links.NoResponseLink': {}},  
{'tests.www.views.test_views_extra_links.FooBarLink': {}},  
{'tests_common.test_utils.mock_operators.AirflowLink': {}}]} | 
'_operator_extra_links': {'raise_error': 'key',  'no_response': 'key',  
'foo-bar': 'link-key',  'airflow': 'airflow_link_key'}}
   Decoded Operator | _operator_links_source ={   
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink':
 {       'index': 0   }},list(_operator_links_source.items()) =[   (       
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
       {'index': 0}   )]list(_operator_links_source.items())[0] =(   
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
   {       'index': 0   }) | deserialised_operator = 
SerializedBaseOperator.deserialize_operator(enc_operator)deserialised_operator.operator_extra_linksOut[8]:
 [GenericOperatorLink(name='raise_error', xcom_key='key'), 
GenericOperatorLink(name='no_response', xcom_key='key'), 
GenericOperatorLink(name='foo-bar', xcom_key='link-key'), 
GenericOperatorLink(name='airflow', xcom_key='airflow_link_key')]
   
   </div><br /></b>
   
   Example of how its stored in the serialised dag now:
   ```
   {"__version": 1, "dag": {"tags": ["extra_links"], "edge_info": {}, 
"task_group": {"_group_id": null, "group_display_name": "", "prefix_group_id": 
true, "tooltip": "", "ui_color": "CornflowerBlue", "ui_fgcolor": "#000", 
"children": {"task": ["operator", "task"]}, "upstream_group_ids": [], 
"downstream_group_ids": [], "upstream_task_ids": [], "downstream_task_ids": 
[]}, "timetable": {"__type": "airflow.timetables.simple.NullTimetable", 
"__var": {}}, "fileloc": "/files/dags/dags/extra_links_custom_operator.py", 
"dag_id": "extra_links_custom_operator", "timezone": "UTC", "catchup": false, 
"relative_fileloc": "dags/extra_links_custom_operator.py", 
"_processor_dags_folder": "/files/dags", "tasks": [{"__var": {"pool": 
"default_pool", "task_type": "DummyTestOperator", "downstream_task_ids": [], 
"template_fields": [], "on_failure_fail_dagrun": false, 
"template_fields_renderers": {}, "task_id": "task", "is_teardown": false, 
"ui_color": "#fff", "weight_rule": "downstream", "ui_fgcolor": "#000
 ", "start_from_trigger": false, "_needs_expansion": false, "template_ext": [], 
"is_setup": false, "_task_module": 
"unusual_prefix_86147154bd7e391eb5735fc97db99fe835bcf5b0_extra_links_custom_operator",
 "_is_empty": false, "start_trigger_args": null, "_operator_extra_links": 
{"no_response": "key", "foo-bar": "link-key"}, "label": "task", 
"_operator_name": "DummyTestOperator"}, "__type": "operator"}], 
"dag_dependencies": [], "params": []}}
   ```
   
   - During deserialisation, we create a new GenericOperatorLink type link for 
each link defined per operator in the serdag
   
   #### Abstract Operator
   
   - The definitions for utils for operator extra links have been moved to task 
sdk.
   - The models/abstractoperator inherits from the task sdk abstarctoperator.
   - The `get_extra_links` in task sdk now follows the new function signature
   
   ## Testing
   
   1. Operator Links Defined at Operator Level
   
   DAG:
   ```
   from airflow.models import BaseOperatorLink, BaseOperator
   from airflow.sdk import DAG
   
   class NoResponseLink(BaseOperatorLink):
       name = "no_response"
       xcom_key = "key"
   
       def get_link(self, operator, *, ti_key):
           return None
   
   
   class FooBarLink(BaseOperatorLink):
       name = "foo-bar"
       xcom_key = "link-key"
   
       def get_link(self, operator, *, ti_key):
           return f"http://www.example.com";
   
   
   class DummyTestOperator(BaseOperator):
       operator_extra_links = (
           NoResponseLink(),
           FooBarLink(),
       )
   
       def execute(self, context):
           print("Hello from custom operator", self.operator_extra_links)
   
   with DAG(
       dag_id="extra_links_custom_operator",
       schedule=None,
       catchup=False,
       tags=["extra_links"],
   ):
   
       t = DummyTestOperator(
           task_id="task"
       )
   
   ```
   
   
![image](https://github.com/user-attachments/assets/fbe502d1-ad5b-46e3-b323-0852a1be002b)
   
   
   The link elements:
   ```
   <a target="_blank" class="chakra-link chakra-button c-h1717v" 
href="http://www.example.com";>foo-bar</a>
   <a class="chakra-link chakra-button c-h1717v" href="null">no_response</a>
   ```
   
   XCOMS pushed:
   <img width="1471" alt="image" 
src="https://github.com/user-attachments/assets/bf3bfcc4-485c-473d-922b-e7aa2fe31ad8";
 />
   
   Logs:
   <img width="1471" alt="image" 
src="https://github.com/user-attachments/assets/dd4b8747-be43-47c7-82e5-9e72d210d55b";
 />
   
   Last few lines:
   ```
   {"timestamp":"2025-02-10T09:15:06.800071","level":"debug","event":"Sending 
request","json":"{\"state\":\"success\",\"end_date\":\"2025-02-10T09:15:06.800015Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n","logger":"task"}
   {"timestamp":"2025-02-10T09:15:06.800145","level":"debug","event":"Plugins 
are already loaded. Skipping.","logger":"airflow.plugins_manager"}
   
{"timestamp":"2025-02-10T09:15:06.800175","level":"debug","event":"Initialize 
extra operators links plugins","logger":"airflow.plugins_manager"}
   {"timestamp":"2025-02-10T09:15:06.800263","level":"debug","event":"Setting 
xcom for operator extra link","link":null,"xcom_key":"key","logger":"task"}
   {"timestamp":"2025-02-10T09:15:06.800411","level":"debug","event":"Sending 
request","json":"{\"key\":\"key\",\"value\":\"null\",\"dag_id\":\"extra_links_custom_operator\",\"run_id\":\"manual__2025-02-10T09:15:05.441050+00:00\",\"task_id\":\"task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n","logger":"task"}
   {"timestamp":"2025-02-10T09:15:06.800430","level":"debug","event":"Setting 
xcom for operator extra 
link","link":"http://www.example.com/","xcom_key":"link-key","logger":"task"}
   {"timestamp":"2025-02-10T09:15:06.800457","level":"debug","event":"Sending 
request","json":"{\"key\":\"link-key\",\"value\":\"\\\"http://www.example.com/\\\"\",\"dag_id\":\"extra_links_custom_operator\",\"run_id\":\"manual__2025-02-10T09:15:05.441050+00:00\",\"task_id\":\"task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n","logger":"task"}
   ```
   
   
   2. Operator Links Defined as plugins
   Plugin:
   ```
   from airflow.plugins_manager import AirflowPlugin
   from airflow.models.baseoperatorlink import BaseOperatorLink
   from airflow.providers.standard.operators.python import PythonOperator
   
   
   # define the extra link
   class HTTPDocsLink(BaseOperatorLink):
       # name the link button in the UI
       name = "HTTP docs"
       xcom_key = "key"
   
       # add the button to one or more operators
       operators = [PythonOperator]
   
       # provide the link
       def get_link(self, operator, *, ti_key=None):
           return "https://developer.mozilla.org/en-US/docs/Web/HTTP";
   
   # define the plugin class
   class AirflowExtraLinkPlugin(AirflowPlugin):
       name = "extra_link_plugin"
       operator_extra_links = [
           HTTPDocsLink(),
       ]
   
   ```
   
   DAG:
   ```
   from airflow.models.dag import DAG
   from pendulum import datetime
   
   from airflow.providers.standard.operators.python import PythonOperator
   
   
   def handler():
       print("Hello from the python operator!!")
   
   with DAG(
       dag_id="extra_links_plugin",
       start_date=datetime(2022, 11, 1),
       schedule=None,
       catchup=False,
       tags=["extra_links"],
   ):
   
       call_api_simple = PythonOperator(
           task_id="call_api_simple",
           python_callable=handler,
       )
   
   ```
   
   
   <img width="1471" alt="image" 
src="https://github.com/user-attachments/assets/89bf6e7b-58d9-4801-a8f3-35f7b258e78b";
 />
   
   The link elements:
   ```
   <a target="_blank" class="chakra-link chakra-button c-h1717v" 
href="https://developer.mozilla.org/en-US/docs/Web/HTTP";>HTTP docs</a>
   ```
   
   XComs pushed:
   <img width="1471" alt="image" 
src="https://github.com/user-attachments/assets/615f195d-8a63-4d7a-9f80-c6893631f2ba";
 />
   
   Logs:
   <img width="1471" alt="image" 
src="https://github.com/user-attachments/assets/96f8d4ba-eee6-4ec0-8ed9-adf6efab8b90";
 />
   
   Last few logs lines:
   ```
   {"timestamp":"2025-02-10T09:19:37.131169","level":"debug","event":"Sending 
request","json":"{\"state\":\"success\",\"end_date\":\"2025-02-10T09:19:37.131068Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n","logger":"task"}
   {"timestamp":"2025-02-10T09:19:37.131332","level":"debug","event":"Plugins 
are already loaded. Skipping.","logger":"airflow.plugins_manager"}
   
{"timestamp":"2025-02-10T09:19:37.131368","level":"debug","event":"Initialize 
extra operators links plugins","logger":"airflow.plugins_manager"}
   {"timestamp":"2025-02-10T09:19:37.131524","level":"debug","event":"Setting 
xcom for operator extra 
link","link":"[https://developer.mozilla.org/en-US/docs/Web/HTTP","xcom_key":"key","logger":"task";](https://developer.mozilla.org/en-US/docs/Web/HTTP%22,%22xcom_key%22:%22key%22,%22logger%22:%22task%22)}
   {"timestamp":"2025-02-10T09:19:37.131681","level":"debug","event":"Sending 
request","json":"{\"key\":\"key\",\"value\":\"\\\"https://developer.mozilla.org/en-US/docs/Web/HTTP\\\"\",\"dag_id\":\"extra_links_plugin\",\"run_id\":\"manual__2025-02-10T09:19:36.380827+00:00\",\"task_id\":\"call_api_simple\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n","logger":"task"}
   {"timestamp":"2025-02-10T09:19:37.877771Z","level":"info","event":"Hello 
from the python operator!!","chan":"stdout","logger":"task"}
   ```
   
   ## Whats pending?
   - [ ] Adding newsfragments (significant / behaviour change?)
   - [ ] Testing with SDK baseoperator
   - [ ] Moving baseoperatorlink model to sdk definitions
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to