KennethanCeyer opened a new issue #18265:
URL: https://github.com/apache/airflow/issues/18265


   ### Description
   
   ## Purpose
   
   A lot of data processing logic runs in Airflow. In data processing, the 
proportion of logic for IO-bound processing is also high, and asyncio can be a 
good alternative to optimize it.
   
   The problem is, Current airflow version is not supported async keyword for 
@task decorated function or Operator. We need to define the `event_loop` and 
need to call a control methods such as `loop.run_until_complete`.
   
   This process requires a function to wrap once more to use a library that 
provides an `async` function, and configuring such a function complicates the 
DAG and task. Therefore, I would like to propose a design that allows the 
`async` keyword for DAGs, tasks, and operators.
   
   ## Design
   
   **TO-BE**
   
   ```python
   import asyncio
   import datetime
   from typing import List
   
   from airflow import DAG
   from airflow.decorators import task
   
   
   @task
   async def get_nums() -> List[int]:
       await asyncio.sleep(5)
       return list(range(5))
   
   
   @task
   def print_nums(nums: List[int]) -> None:
       print(nums)
   
   
   async with DAG(
       "example_asyncio",
       schedule_interval="@daily",
       start_date=datetime.datetime.now() - datetime.timedelta(days=1),
   ) as dag:
       nums = await get_nums()
       print_nums(nums)
   ```
   
   **Current version**
   
   ```python
   import asyncio
   import datetime
   from typing import List
   
   from airflow import DAG
   from airflow.decorators import task
   
   
   @task
   async def get_nums() -> List[int]:
       loop = asyncio.get_event_loop()
       return loop.run_until_complete(get_nums_async())
   
   @task
   async def get_nums_async() -> List[int]:
       await asyncio.sleep(5)
       return list(range(5))
   
   
   @task
   def print_nums(nums: List[int]) -> None:
       print(nums)
   
   
   with DAG(
       "example_asyncio",
       schedule_interval="@daily",
       start_date=datetime.datetime.now() - datetime.timedelta(days=1),
   ) as dag:
       nums = get_nums()
       print_nums(nums)
   ```
   
   ### Use case/motivation
   
   Below code won't work
   
   ```python
   import asyncio
   import datetime
   from typing import List
   
   from airflow import DAG
   from airflow.decorators import task
   
   
   @task
   async def get_nums() -> List[int]:
       await asyncio.sleep(5)
       return list(range(5))
   
   
   @task
   def print_nums(nums: List[int]) -> None:
       print(nums)
   
   
   async with DAG(
       "example_asyncio",
       schedule_interval="@daily",
       start_date=datetime.datetime.now() - datetime.timedelta(days=1),
   ) as dag:
       nums = await get_nums()
       print_nums(nums)
   
   ```
   
   **OUTPUT**
   
   ```plaintext
   Broken DAG: [/opt/airflow/dags/repo/dags/example_asyncio.py] Traceback (most 
recent call last):
     File "<frozen importlib._bootstrap_external>", line 911, in source_to_code
     File "<frozen importlib._bootstrap>", line 219, in 
_call_with_frames_removed
     File "/opt/airflow/dags/repo/dags/example_asyncio.py", line 20
       async with DAG(
       ^
   SyntaxError: 'async with' outside async function
   ```
   
   ---
   
   We need to wrap this code with `loop.run_util_complete` in current Airflow 
version.
   
   ```python
   import asyncio
   import datetime
   from typing import List
   
   from airflow import DAG
   from airflow.decorators import task
   
   
   @task
   async def get_nums() -> List[int]: # We need this wrapping function.
       loop = asyncio.get_event_loop()
       return loop.run_until_complete(get_nums_async())
   
   @task
   async def get_nums_async() -> List[int]:
       await asyncio.sleep(5)
       return list(range(5))
   
   
   @task
   def print_nums(nums: List[int]) -> None:
       print(nums)
   
   
   with DAG(
       "example_asyncio",
       schedule_interval="@daily",
       start_date=datetime.datetime.now() - datetime.timedelta(days=1),
   ) as dag:
       nums = get_nums()
       print_nums(nums)
   
   ```
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to