KennethanCeyer opened a new issue #18265: URL: https://github.com/apache/airflow/issues/18265
### Description ## Purpose A lot of data processing logic runs in Airflow. In data processing, the proportion of logic for IO-bound processing is also high, and asyncio can be a good alternative to optimize it. The problem is, Current airflow version is not supported async keyword for @task decorated function or Operator. We need to define the `event_loop` and need to call a control methods such as `loop.run_until_complete`. This process requires a function to wrap once more to use a library that provides an `async` function, and configuring such a function complicates the DAG and task. Therefore, I would like to propose a design that allows the `async` keyword for DAGs, tasks, and operators. ## Design **TO-BE** ```python import asyncio import datetime from typing import List from airflow import DAG from airflow.decorators import task @task async def get_nums() -> List[int]: await asyncio.sleep(5) return list(range(5)) @task def print_nums(nums: List[int]) -> None: print(nums) async with DAG( "example_asyncio", schedule_interval="@daily", start_date=datetime.datetime.now() - datetime.timedelta(days=1), ) as dag: nums = await get_nums() print_nums(nums) ``` **Current version** ```python import asyncio import datetime from typing import List from airflow import DAG from airflow.decorators import task @task async def get_nums() -> List[int]: loop = asyncio.get_event_loop() return loop.run_until_complete(get_nums_async()) @task async def get_nums_async() -> List[int]: await asyncio.sleep(5) return list(range(5)) @task def print_nums(nums: List[int]) -> None: print(nums) with DAG( "example_asyncio", schedule_interval="@daily", start_date=datetime.datetime.now() - datetime.timedelta(days=1), ) as dag: nums = get_nums() print_nums(nums) ``` ### Use case/motivation Below code won't work ```python import asyncio import datetime from typing import List from airflow import DAG from airflow.decorators import task @task async def get_nums() -> List[int]: await asyncio.sleep(5) return list(range(5)) @task def print_nums(nums: List[int]) -> None: print(nums) async with DAG( "example_asyncio", schedule_interval="@daily", start_date=datetime.datetime.now() - datetime.timedelta(days=1), ) as dag: nums = await get_nums() print_nums(nums) ``` **OUTPUT** ```plaintext Broken DAG: [/opt/airflow/dags/repo/dags/example_asyncio.py] Traceback (most recent call last): File "<frozen importlib._bootstrap_external>", line 911, in source_to_code File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed File "/opt/airflow/dags/repo/dags/example_asyncio.py", line 20 async with DAG( ^ SyntaxError: 'async with' outside async function ``` --- We need to wrap this code with `loop.run_util_complete` in current Airflow version. ```python import asyncio import datetime from typing import List from airflow import DAG from airflow.decorators import task @task async def get_nums() -> List[int]: # We need this wrapping function. loop = asyncio.get_event_loop() return loop.run_until_complete(get_nums_async()) @task async def get_nums_async() -> List[int]: await asyncio.sleep(5) return list(range(5)) @task def print_nums(nums: List[int]) -> None: print(nums) with DAG( "example_asyncio", schedule_interval="@daily", start_date=datetime.datetime.now() - datetime.timedelta(days=1), ) as dag: nums = get_nums() print_nums(nums) ``` ### Related issues _No response_ ### Are you willing to submit a PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org