amoghrajesh opened a new pull request, #50238:
URL: https://github.com/apache/airflow/pull/50238
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
closes: [API error on trying to view details tab of a mapped task instance
to access extra links](https://github.com/apache/airflow/issues/49773)
## Problem
The API server reports error when trying to access extra links defined for
mapped tasks. Error stack:
```
Traceback (most recent call last):
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py",
line 409, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py",
line 60, in __call__
return await self.app(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/applications.py",
line 1054, in __call__
await super().__call__(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/applications.py",
line 112, in __call__
await self.middleware_stack(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py",
line 187, in __call__
raise exc
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py",
line 165, in __call__
await self.app(scope, receive, _send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
line 29, in __call__
await responder(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
line 126, in __call__
await super().__call__(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
line 46, in __call__
await self.app(scope, receive, self.send_with_compression)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/cors.py",
line 85, in __call__
await self.app(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
line 176, in __call__
with recv_stream, send_stream, collapse_excgroups():
File "/usr/lib/python3.11/contextlib.py", line 158, in __exit__
self.gen.throw(typ, value, traceback)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_utils.py",
line 82, in collapse_excgroups
raise exc
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
line 178, in __call__
response = await self.dispatch_func(request, call_next)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/middleware.py",
line 28, in dispatch
response = await call_next(request)
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
line 156, in call_next
raise app_exc
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
line 141, in coro
await self.app(scope, receive_or_disconnect, send_no_error)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py",
line 62, in __call__
await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
line 53, in wrapped_app
raise exc
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
line 42, in wrapped_app
await app(scope, receive, sender)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
line 714, in __call__
await self.middleware_stack(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
line 734, in app
await route.handle(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
line 288, in handle
await self.app(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
line 76, in app
await wrap_app_handling_exceptions(app, request)(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
line 53, in wrapped_app
raise exc
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
line 42, in wrapped_app
await app(scope, receive, sender)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
line 73, in app
response = await f(request)
^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/routing.py",
line 301, in app
raw_response = await run_endpoint_function(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/routing.py",
line 214, in run_endpoint_function
return await run_in_threadpool(dependant.call, **values)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/concurrency.py",
line 37, in run_in_threadpool
return await anyio.to_thread.run_sync(func)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/to_thread.py",
line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py",
line 2470, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py",
line 967, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/extra_links.py",
line 83, in get_extra_links
(link_name, task.get_extra_links(ti, link_name)) for link_name in
task.extra_links
^^^^^^^^^^^^^^^^
AttributeError: 'MappedOperator' object has no attribute 'extra_links'
```
## Resolution
The issue happens due to moving code during
https://github.com/apache/airflow/pull/46613, the code that is relevant to API
server was moved from abstarct operator => serialised operators.
This worked in most cases but started failing for mapped operators because
mapped operators inherited from abstarctoeprators where this code was defined.
Code:
https://github.com/apache/airflow/pull/46613/files#diff-f373d874912ccfa03918e853ad15aa91d6bfaa1ee75f1676f78c8a756f332ed0L160-L217
The correct fix here would be to have `get_extra_links` defined on the
mapped operator, but not at the sdk side which is defn only, but on the
execution side, so basically in `airflow.models.mappedoperator.MappedOperator`.
## Testing
Defined plugin:
```
from airflow.providers.standard.decorators.python import
_PythonDecoratedOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.providers.standard.operators.python import PythonOperator
# define the extra link
class HTTPDocsLink(BaseOperatorLink):
# name the link button in the UI
name = "HTTP docs"
# add the button to one or more operators
operators = [PythonOperator, _PythonDecoratedOperator]
# provide the link
def get_link(self, operator, *, ti_key=None):
return "https://developer.mozilla.org/en-US/docs/Web/HTTP"
# define the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
HTTPDocsLink(),
]
```
This plugin adds extra links to all python operators, either via taskflow or
not.
Using DAG:
```
from datetime import datetime
from airflow.decorators import task
from airflow.sdk import DAG
with DAG(dag_id="task_mapping", schedule=None, start_date=datetime(2022, 3,
4)) as dag:
@task
def add_one(x: int):
return x + 1
@task
def sum_it(values):
print(type(add_one))
total = sum(values)
print(f"Total was {total}")
added_values = add_one.expand(x=[1, 2, 3])
sum_it(added_values)
```
Run the dag:

Access the extra links for mapped task first:

Access the extra links for the normal task second:

<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]