amoghrajesh opened a new pull request, #46613:
URL: https://github.com/apache/airflow/pull/46613
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
closes: https://github.com/apache/airflow/issues/46609
## Why?
If you want to add further links to operators you can define them via a
plugin or provider package. Extra links will be displayed in task details page
in Grid view.
Currently, users define operator links in one of two ways:
1. Using plugins and defining operators in those plugins for defining extra
links
2. Directly using custom airflow operators to define operator extra links.
This feature doesn't work with Airflow 3, so while porting this to task sdk
so it can be used by users, we decided to simplify the extra links code-flow in
a way to also reduce / decouple the user code that runs using `get_link` of a
operator extra link in the airflow webserver to outside the context of the
webserver.
## What?
To get this working, a simple interface change is made and the following
things get easier:
1. While defining an extra link, add a "xcom_key" required field to it.
For example:
```
class FooBarLink(BaseOperatorLink):
name = "foo-bar"
xcom_key = "link-key"
def get_link(self, operator, *, ti_key):
return f"http://www.example.com"
```
2. At runtime, from the task runner, after the task runs (when its
`finalize`ing), it requests to push an XCOM which contains the "entire" link
for the extra link with the xcom key and task id.
Ex:
```
def finalize(ti: RuntimeTaskInstance, log: Logger):
for oe in ti.task.operator_extra_link_dict.values():
link, xcom_key = oe.get_link(operator=ti.task, ti_key=ti.id),
oe.xcom_key # type: ignore[arg-type]
log.debug("Setting xcom for operator extra link", link=link,
xcom_key=xcom_key)
_xcom_push(ti, key=xcom_key, value=link)
```
3. After the task has completed running, when the user goes to grid view, he
can see the extra button.
The link in this button comes by reading from xcom instead of running user
code
## How?
### Changes of note:
#### BaseOperatorLink Model
Introducing a new `xcom_key` datafield that carries the xcom_key with which
the entire link will be pushed.
Introduced a new slim interface that we will use during deserialisation
which can return the link from xcoms instead of user defined "get_link".
```
@attrs.define()
class GenericOperatorLink(LoggingMixin):
"""A generic operator link class that can retrieve link only using
XCOMs. Used while deserializing operators."""
name: str
xcom_key: str
def get_link(self, ti: TaskInstance) -> str:
"""
Retrieve the link from the XComs.
:param ti: Task instance from which to retrieve the link
:return: link to external system, but by pulling it from XComs
"""
self.log.info("Retrieving link from XComs with key: %s for task id:
%s", self.xcom_key, ti.task_id)
return ti.xcom_pull(key=self.xcom_key, task_ids=ti.task_id) # type:
ignore
```
#### DAG serialisation/ deserialisation
We need something in the serdag to say what links a task has, which we
already have but the format is overly complex.
Difference in formats (then vs now):
<meta charset="utf-8"><b style="font-weight:normal;"
id="docs-internal-guid-aeba09dd-7fff-6e38-86db-19bab8077e4b"><br /><br /><div
dir="ltr" style="margin-left:0pt;" align="left">
| Older | Newer
-- | -- | --
Encoded Operator | '_operator_extra_links':
[{'tests.www.views.test_views_extra_links.RaiseErrorLink': {}},
{'tests.www.views.test_views_extra_links.NoResponseLink': {}},
{'tests.www.views.test_views_extra_links.FooBarLink': {}},
{'tests_common.test_utils.mock_operators.AirflowLink': {}}]} |
'_operator_extra_links': {'raise_error': 'key', 'no_response': 'key',
'foo-bar': 'link-key', 'airflow': 'airflow_link_key'}}
Decoded Operator | _operator_links_source ={
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink':
{ 'index': 0 }},list(_operator_links_source.items()) =[ (
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
{'index': 0} )]list(_operator_links_source.items())[0] =(
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
{ 'index': 0 }) | deserialised_operator =
SerializedBaseOperator.deserialize_operator(enc_operator)deserialised_operator.operator_extra_linksOut[8]:
[GenericOperatorLink(name='raise_error', xcom_key='key'),
GenericOperatorLink(name='no_response', xcom_key='key'),
GenericOperatorLink(name='foo-bar', xcom_key='link-key'),
GenericOperatorLink(name='airflow', xcom_key='airflow_link_key')]
</div><br /></b>
Example of how its stored in the serialised dag now:
```
{"__version": 1, "dag": {"tags": ["extra_links"], "edge_info": {},
"task_group": {"_group_id": null, "group_display_name": "", "prefix_group_id":
true, "tooltip": "", "ui_color": "CornflowerBlue", "ui_fgcolor": "#000",
"children": {"task": ["operator", "task"]}, "upstream_group_ids": [],
"downstream_group_ids": [], "upstream_task_ids": [], "downstream_task_ids":
[]}, "timetable": {"__type": "airflow.timetables.simple.NullTimetable",
"__var": {}}, "fileloc": "/files/dags/dags/extra_links_custom_operator.py",
"dag_id": "extra_links_custom_operator", "timezone": "UTC", "catchup": false,
"relative_fileloc": "dags/extra_links_custom_operator.py",
"_processor_dags_folder": "/files/dags", "tasks": [{"__var": {"pool":
"default_pool", "task_type": "DummyTestOperator", "downstream_task_ids": [],
"template_fields": [], "on_failure_fail_dagrun": false,
"template_fields_renderers": {}, "task_id": "task", "is_teardown": false,
"ui_color": "#fff", "weight_rule": "downstream", "ui_fgcolor": "#000
", "start_from_trigger": false, "_needs_expansion": false, "template_ext": [],
"is_setup": false, "_task_module":
"unusual_prefix_86147154bd7e391eb5735fc97db99fe835bcf5b0_extra_links_custom_operator",
"_is_empty": false, "start_trigger_args": null, "_operator_extra_links":
{"no_response": "key", "foo-bar": "link-key"}, "label": "task",
"_operator_name": "DummyTestOperator"}, "__type": "operator"}],
"dag_dependencies": [], "params": []}}
```
- During deserialisation, we create a new GenericOperatorLink type link for
each link defined per operator in the serdag
#### Abstract Operator
- The definitions for utils for operator extra links have been moved to task
sdk.
- The models/abstractoperator inherits from the task sdk abstarctoperator.
- The `get_extra_links` in task sdk now follows the new function signature
## Testing
1. Operator Links Defined at Operator Level
DAG:
```
from airflow.models import BaseOperatorLink, BaseOperator
from airflow.sdk import DAG
class NoResponseLink(BaseOperatorLink):
name = "no_response"
xcom_key = "key"
def get_link(self, operator, *, ti_key):
return None
class FooBarLink(BaseOperatorLink):
name = "foo-bar"
xcom_key = "link-key"
def get_link(self, operator, *, ti_key):
return f"http://www.example.com"
class DummyTestOperator(BaseOperator):
operator_extra_links = (
NoResponseLink(),
FooBarLink(),
)
def execute(self, context):
print("Hello from custom operator", self.operator_extra_links)
with DAG(
dag_id="extra_links_custom_operator",
schedule=None,
catchup=False,
tags=["extra_links"],
):
t = DummyTestOperator(
task_id="task"
)
```

The link elements:
```
<a target="_blank" class="chakra-link chakra-button c-h1717v"
href="http://www.example.com">foo-bar</a>
<a class="chakra-link chakra-button c-h1717v" href="null">no_response</a>
```
XCOMS pushed:
<img width="1471" alt="image"
src="https://github.com/user-attachments/assets/bf3bfcc4-485c-473d-922b-e7aa2fe31ad8"
/>
Logs:
<img width="1471" alt="image"
src="https://github.com/user-attachments/assets/dd4b8747-be43-47c7-82e5-9e72d210d55b"
/>
Last few lines:
```
{"timestamp":"2025-02-10T09:15:06.800071","level":"debug","event":"Sending
request","json":"{\"state\":\"success\",\"end_date\":\"2025-02-10T09:15:06.800015Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n","logger":"task"}
{"timestamp":"2025-02-10T09:15:06.800145","level":"debug","event":"Plugins
are already loaded. Skipping.","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-10T09:15:06.800175","level":"debug","event":"Initialize
extra operators links plugins","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-10T09:15:06.800263","level":"debug","event":"Setting
xcom for operator extra link","link":null,"xcom_key":"key","logger":"task"}
{"timestamp":"2025-02-10T09:15:06.800411","level":"debug","event":"Sending
request","json":"{\"key\":\"key\",\"value\":\"null\",\"dag_id\":\"extra_links_custom_operator\",\"run_id\":\"manual__2025-02-10T09:15:05.441050+00:00\",\"task_id\":\"task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n","logger":"task"}
{"timestamp":"2025-02-10T09:15:06.800430","level":"debug","event":"Setting
xcom for operator extra
link","link":"http://www.example.com/","xcom_key":"link-key","logger":"task"}
{"timestamp":"2025-02-10T09:15:06.800457","level":"debug","event":"Sending
request","json":"{\"key\":\"link-key\",\"value\":\"\\\"http://www.example.com/\\\"\",\"dag_id\":\"extra_links_custom_operator\",\"run_id\":\"manual__2025-02-10T09:15:05.441050+00:00\",\"task_id\":\"task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n","logger":"task"}
```
2. Operator Links Defined as plugins
Plugin:
```
from airflow.plugins_manager import AirflowPlugin
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.providers.standard.operators.python import PythonOperator
# define the extra link
class HTTPDocsLink(BaseOperatorLink):
# name the link button in the UI
name = "HTTP docs"
xcom_key = "key"
# add the button to one or more operators
operators = [PythonOperator]
# provide the link
def get_link(self, operator, *, ti_key=None):
return "https://developer.mozilla.org/en-US/docs/Web/HTTP"
# define the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
HTTPDocsLink(),
]
```
DAG:
```
from airflow.models.dag import DAG
from pendulum import datetime
from airflow.providers.standard.operators.python import PythonOperator
def handler():
print("Hello from the python operator!!")
with DAG(
dag_id="extra_links_plugin",
start_date=datetime(2022, 11, 1),
schedule=None,
catchup=False,
tags=["extra_links"],
):
call_api_simple = PythonOperator(
task_id="call_api_simple",
python_callable=handler,
)
```
<img width="1471" alt="image"
src="https://github.com/user-attachments/assets/89bf6e7b-58d9-4801-a8f3-35f7b258e78b"
/>
The link elements:
```
<a target="_blank" class="chakra-link chakra-button c-h1717v"
href="https://developer.mozilla.org/en-US/docs/Web/HTTP">HTTP docs</a>
```
XComs pushed:
<img width="1471" alt="image"
src="https://github.com/user-attachments/assets/615f195d-8a63-4d7a-9f80-c6893631f2ba"
/>
Logs:
<img width="1471" alt="image"
src="https://github.com/user-attachments/assets/96f8d4ba-eee6-4ec0-8ed9-adf6efab8b90"
/>
Last few logs lines:
```
{"timestamp":"2025-02-10T09:19:37.131169","level":"debug","event":"Sending
request","json":"{\"state\":\"success\",\"end_date\":\"2025-02-10T09:19:37.131068Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n","logger":"task"}
{"timestamp":"2025-02-10T09:19:37.131332","level":"debug","event":"Plugins
are already loaded. Skipping.","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-10T09:19:37.131368","level":"debug","event":"Initialize
extra operators links plugins","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-10T09:19:37.131524","level":"debug","event":"Setting
xcom for operator extra
link","link":"[https://developer.mozilla.org/en-US/docs/Web/HTTP","xcom_key":"key","logger":"task"](https://developer.mozilla.org/en-US/docs/Web/HTTP%22,%22xcom_key%22:%22key%22,%22logger%22:%22task%22)}
{"timestamp":"2025-02-10T09:19:37.131681","level":"debug","event":"Sending
request","json":"{\"key\":\"key\",\"value\":\"\\\"https://developer.mozilla.org/en-US/docs/Web/HTTP\\\"\",\"dag_id\":\"extra_links_plugin\",\"run_id\":\"manual__2025-02-10T09:19:36.380827+00:00\",\"task_id\":\"call_api_simple\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n","logger":"task"}
{"timestamp":"2025-02-10T09:19:37.877771Z","level":"info","event":"Hello
from the python operator!!","chan":"stdout","logger":"task"}
```
## Whats pending?
- [ ] Adding newsfragments (significant / behaviour change?)
- [ ] Testing with SDK baseoperator
- [ ] Moving baseoperatorlink model to sdk definitions
<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]