amoghrajesh opened a new pull request, #50238:
URL: https://github.com/apache/airflow/pull/50238

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   closes: [API error on trying to view details tab of a mapped task instance 
to access extra links](https://github.com/apache/airflow/issues/49773)
   
   
   ## Problem
   
   The API server reports error when trying to access extra links defined for 
mapped tasks. Error stack:
   ```
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py",
 line 409, in run_asgi
       result = await app(  # type: ignore[func-returns-value]
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py",
 line 60, in __call__
       return await self.app(scope, receive, send)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/applications.py",
 line 1054, in __call__
       await super().__call__(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/applications.py",
 line 112, in __call__
       await self.middleware_stack(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py",
 line 187, in __call__
       raise exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py",
 line 165, in __call__
       await self.app(scope, receive, _send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
 line 29, in __call__
       await responder(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
 line 126, in __call__
       await super().__call__(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
 line 46, in __call__
       await self.app(scope, receive, self.send_with_compression)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/cors.py",
 line 85, in __call__
       await self.app(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
 line 176, in __call__
       with recv_stream, send_stream, collapse_excgroups():
     File "/usr/lib/python3.11/contextlib.py", line 158, in __exit__
       self.gen.throw(typ, value, traceback)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_utils.py",
 line 82, in collapse_excgroups
       raise exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
 line 178, in __call__
       response = await self.dispatch_func(request, call_next)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/middleware.py",
 line 28, in dispatch
       response = await call_next(request)
                  ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
 line 156, in call_next
       raise app_exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
 line 141, in coro
       await self.app(scope, receive_or_disconnect, send_no_error)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py",
 line 62, in __call__
       await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
 line 53, in wrapped_app
       raise exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
 line 42, in wrapped_app
       await app(scope, receive, sender)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 714, in __call__
       await self.middleware_stack(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 734, in app
       await route.handle(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 288, in handle
       await self.app(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 76, in app
       await wrap_app_handling_exceptions(app, request)(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
 line 53, in wrapped_app
       raise exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
 line 42, in wrapped_app
       await app(scope, receive, sender)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 73, in app
       response = await f(request)
                  ^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/routing.py",
 line 301, in app
       raw_response = await run_endpoint_function(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/routing.py",
 line 214, in run_endpoint_function
       return await run_in_threadpool(dependant.call, **values)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/concurrency.py",
 line 37, in run_in_threadpool
       return await anyio.to_thread.run_sync(func)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/to_thread.py",
 line 56, in run_sync
       return await get_async_backend().run_sync_in_worker_thread(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py",
 line 2470, in run_sync_in_worker_thread
       return await future
              ^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py",
 line 967, in run
       result = context.run(func, *args)
                ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/extra_links.py",
 line 83, in get_extra_links
       (link_name, task.get_extra_links(ti, link_name)) for link_name in 
task.extra_links
                                                                         
^^^^^^^^^^^^^^^^
   AttributeError: 'MappedOperator' object has no attribute 'extra_links'
   ```
   
   ## Resolution
   
   The issue happens due to moving code during 
https://github.com/apache/airflow/pull/46613, the code that is relevant to API 
server was moved from abstarct operator => serialised operators.
   
   This worked in most cases but started failing for mapped operators because 
mapped operators inherited from abstarctoeprators where this code was defined. 
Code: 
https://github.com/apache/airflow/pull/46613/files#diff-f373d874912ccfa03918e853ad15aa91d6bfaa1ee75f1676f78c8a756f332ed0L160-L217
   
   
   The correct fix here would be to have `get_extra_links` defined on the 
mapped operator, but not at the sdk side which is defn only, but on the 
execution side, so basically in `airflow.models.mappedoperator.MappedOperator`.
   
   
   ## Testing
   
   Defined plugin:
   ```
   from airflow.providers.standard.decorators.python import 
_PythonDecoratedOperator
   
   from airflow.plugins_manager import AirflowPlugin
   from airflow.models.baseoperatorlink import BaseOperatorLink
   from airflow.providers.standard.operators.python import PythonOperator
   
   
   # define the extra link
   class HTTPDocsLink(BaseOperatorLink):
       # name the link button in the UI
       name = "HTTP docs"
   
       # add the button to one or more operators
       operators = [PythonOperator, _PythonDecoratedOperator]
   
       # provide the link
       def get_link(self, operator, *, ti_key=None):
           return "https://developer.mozilla.org/en-US/docs/Web/HTTP";
   
   # define the plugin class
   class AirflowExtraLinkPlugin(AirflowPlugin):
       name = "extra_link_plugin"
       operator_extra_links = [
           HTTPDocsLink(),
       ]
   
   ```
   
   This plugin adds extra links to all python operators, either via taskflow or 
not.
   
   Using DAG:
   ```
   from datetime import datetime
   
   from airflow.decorators import task
   from airflow.sdk import DAG
   
   with DAG(dag_id="task_mapping", schedule=None, start_date=datetime(2022, 3, 
4)) as dag:
   
       @task
       def add_one(x: int):
           return x + 1
   
       @task
       def sum_it(values):
           print(type(add_one))
           total = sum(values)
           print(f"Total was {total}")
   
       added_values = add_one.expand(x=[1, 2, 3])
       sum_it(added_values)
   
   ```
   
   
   Run the dag:
   
![image](https://github.com/user-attachments/assets/7499f0db-b98f-4194-bfcb-e6d07c61f83b)
   
   
   Access the extra links for mapped task first:
   
![image](https://github.com/user-attachments/assets/f26e148b-c156-49fb-912d-ed3ef189adb5)
   
   
   Access the extra links for the normal task second:
   
![image](https://github.com/user-attachments/assets/8c599e04-42b1-4f80-a7cc-e2b46a10df16)
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to