New submission from Aritn Sarraf <sarraf.ar...@gmail.com>:
For those not familiar, the dask delayed interface allows a user to define a DAG through a functional invocation interface. Dask docs here: https://docs.dask.org/en/latest/delayed.html Another example of this kind of interface is airflow's new TaskFlow api: https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html The proposed solution would look something like this. Essentially all we're doing is defining a decorator that will allow you to pass in coroutines to another coroutine, and will resolve the dependent coroutines before passing the results to your dependent coroutine. # Note0: can be removed, see Note2 below async def task_wrapper(val): return val def task(afunc): # open to other names for the decorator since it might be a bit ambiguous async def inner(*args): # Note1: real solution would be expanded to args/kwargs # Note2: `task_wrapper` kind of unneccesary, we can just conditionally not gather in those cases args = [arg if inspect.isawaitable(arg) else task_wrapper(arg) for arg in args] args = await asyncio.gather(*args) return await afunc(*args) return inner The advantage this gives us in asyncio is that we can easily build processing pipelines where each piece is completely independent and does not know anything about any other piece of the pipeline. Obviously this is already possible currently, but this simple wrapper will provide a very clean way to connect it all together. Take the following example, where we want to fetch data for various ids and post process/upload them. @task async def fetch(x): # Note3: timings here defined to demo obvious expected async behavior in completion order of print statements sleep_time = {'a1': 1, 'a2': 2, 'b1': 4, 'b2': 0.5, 'c1': 6, 'c2': 3.5}[x] await asyncio.sleep(sleep_time) ret_val = f'f({x})' print(f'Done {ret_val}') return ret_val async def process(x1, x2): await asyncio.sleep(1) ret_val = f'p({x1}, {x2})' print(f'Done {ret_val}') return ret_val Notice we didn't decorate `process`, this is to allow us to demonstrate how you can still use the interface on functions that you can't or don't want to decorate. Now to define/execute our pipeline we can simply do this. : async def main(): fa1 = fetch('a1') fa2 = fetch('a2') fb1 = fetch('b1') fb2 = fetch('b2') fc1 = fetch('c1') fc2 = fetch('c2') pa = task(process)(fa1, fa2) pb = task(process)(fb1, fb2) pc = task(process)(fc1, fc2) return await asyncio.gather(pa, pb, pc) loop = asyncio.new_event_loop() loop.run_until_complete(main()) This will be a very simple non-breaking inclusion to the library, that will allow users to build clean/straightforward asynchronous processing pipelines/DAGs. ---------- components: asyncio messages: 397274 nosy: asarraf, asvetlov, yselivanov priority: normal severity: normal status: open title: [Enhancement] Asyncio task decorator to provide functionality similar to dask's delayed interface type: enhancement versions: Python 3.11 _______________________________________ Python tracker <rep...@bugs.python.org> <https://bugs.python.org/issue44604> _______________________________________ _______________________________________________ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com