New submission from Aritn Sarraf <sarraf.ar...@gmail.com>:

For those not familiar, the dask delayed interface allows a user to define a 
DAG through a functional invocation interface. Dask docs here: 
https://docs.dask.org/en/latest/delayed.html
Another example of this kind of interface is airflow's new TaskFlow api: 
https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html

The proposed solution would look something like this. Essentially all we're 
doing is defining a decorator that will allow you to pass in coroutines to 
another coroutine, and will resolve the dependent coroutines before passing the 
results to your dependent coroutine.

    # Note0: can be removed, see Note2 below
    async def task_wrapper(val):
        return val

    def task(afunc):  # open to other names for the decorator since it might be 
a bit ambiguous
        async def inner(*args):  # Note1: real solution would be expanded to 
args/kwargs
            # Note2: `task_wrapper` kind of unneccesary, we can just 
conditionally not gather in those cases
            args = [arg if inspect.isawaitable(arg) else task_wrapper(arg) for 
arg in args]
            args = await asyncio.gather(*args)
            return await afunc(*args)
        return inner


The advantage this gives us in asyncio is that we can easily build processing 
pipelines where each piece is completely independent and does not know anything 
about any other piece of the pipeline. Obviously this is already possible 
currently, but this simple wrapper will provide a very clean way to connect it 
all together.

Take the following example, where we want to fetch data for various ids and 
post process/upload them.

    @task
    async def fetch(x):
        # Note3: timings here defined to demo obvious expected async behavior 
in completion order of print statements
        sleep_time = {'a1': 1, 'a2': 2, 'b1': 4, 'b2': 0.5, 'c1': 6, 'c2': 
3.5}[x]
        await asyncio.sleep(sleep_time)
        ret_val = f'f({x})'
        print(f'Done {ret_val}')
        return ret_val

    async def process(x1, x2):
        await asyncio.sleep(1)
        ret_val = f'p({x1}, {x2})'
        print(f'Done {ret_val}')
        return ret_val

Notice we didn't decorate `process`, this is to allow us to demonstrate how you 
can still use the interface on functions that you can't or don't want to 
decorate. Now to define/execute our pipeline we can simply do this. :


    async def main():
        fa1 = fetch('a1')
        fa2 = fetch('a2')
        fb1 = fetch('b1')
        fb2 = fetch('b2')
        fc1 = fetch('c1')
        fc2 = fetch('c2')
        pa = task(process)(fa1, fa2)
        pb = task(process)(fb1, fb2)
        pc = task(process)(fc1, fc2)
        return await asyncio.gather(pa, pb, pc)
 
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())

This will be a very simple non-breaking inclusion to the library, that will 
allow users to build clean/straightforward asynchronous processing 
pipelines/DAGs.

----------
components: asyncio
messages: 397274
nosy: asarraf, asvetlov, yselivanov
priority: normal
severity: normal
status: open
title: [Enhancement] Asyncio task decorator to provide functionality similar to 
dask's delayed interface
type: enhancement
versions: Python 3.11

_______________________________________
Python tracker <rep...@bugs.python.org>
<https://bugs.python.org/issue44604>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to