One thing to bear in mind here is the number of db connections - each connection can only be used by one thread or coroutine at a time, so even when the scheduler is changed to use async db calls, we might not be able to do a lot of the scheduled tasks concurrently.
At least not without thinking a bit more about it anyways. Connection pooling would likely help here (and in fact it's the only time it makes sense, right now there should "never" be a reason for a sync airflow process to have more than one open connection) -ash On 3 May 2024 18:10:45 BST, Daniel Standish <daniel.stand...@astronomer.io.INVALID> wrote: >But you could run them in a thread or subprocess. > >Another option would be to just take all of the timed events and make them >all asyncio and then run them all via asyncio in one continually running >thread. That would be a bite size step towards AIP-70. Though, it might >be a large bite :) > >On Fri, May 3, 2024 at 6:29 AM Hussein Awala <huss...@awala.fr> wrote: > >> If we don't have many Asyncio tasks running in the event loop, there will >> not be any benefit from migrating to asynchronous, IMHO it will be anyway >> rewritten to be asynchronous as a part of AIP-70 >> < >> https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-70+Migrating+to+asynchronous+programming >> > >> (WIP) >> where we will need to rewrite the scheduler if the AIP is accepted. >> >> On Fri, May 3, 2024 at 2:49 PM Ryan Hatter >> <ryan.hat...@astronomer.io.invalid> wrote: >> >> > This might be a dumb question as I don't have experience with asyncio, >> but >> > should the EventScheduler >> > < >> > >> https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L930 >> > > >> > in >> > the Airflow scheduler be rewritten to be asynchronous? >> > >> > The so called "timed events" (e.g. zombie reaping, handling tasks stuck >> in >> > queued, etc <import asyncio import threading import time class >> > AsyncEventScheduler: def __init__(self): self.tasks = [] async def >> > call_regular_interval(self, interval, action, *args, **kwargs): >> > """Schedules action to be called every `interval` seconds >> > asynchronously.""" while True: await asyncio.sleep(interval) await >> > action(*args, **kwargs) def schedule_task(self, interval, action, *args, >> > **kwargs): """Add tasks that run periodically in an asynchronous >> manner.""" >> > task = asyncio.create_task( self.call_regular_interval(interval, action, >> > *args, **kwargs) ) self.tasks.append(task) async def detect_zombies(): >> > print("🧟") async def detect_stuck_queued_tasks(): print("Oh no! A task >> is >> > stuck in queued!") def scheduler_loop(): while True: print("Starting >> > scheduling loop...") time.sleep(10) def _do_scheduling(): thread = >> > threading.Thread(target=scheduler_loop) thread.start() async def main(): >> > scheduler = AsyncEventScheduler() scheduler.schedule_task(3, >> > detect_zombies) scheduler.schedule_task(5, detect_stuck_queued_tasks) >> > _do_scheduling() while True: print("EventScheduler running") await >> > asyncio.sleep(1) asyncio.run(main())>) scheduled by this EventScheduler >> are >> > blocking and run queries against the DB that can occasionally be >> expensive >> > and cause substantial delays in the scheduler, which can result in >> repeated >> > scheduler restarts. >> > >> > Below is a trivialized example of what this might look like -- curious to >> > hear your thoughts! >> > >> > import asyncio >> > import threading >> > import time >> > >> > class AsyncEventScheduler: >> > def __init__(self): >> > self.tasks = [] >> > >> > async def call_regular_interval(self, interval, action, *args, **kwargs): >> > """Schedules action to be called every `interval` seconds >> > asynchronously.""" >> > while True: >> > await asyncio.sleep(interval) >> > await action(*args, **kwargs) >> > >> > def schedule_task(self, interval, action, *args, **kwargs): >> > """Add tasks that run periodically in an asynchronous manner.""" >> > task = asyncio.create_task( >> > self.call_regular_interval(interval, action, *args, **kwargs) >> > ) >> > self.tasks.append(task) >> > >> > async def detect_zombies(): >> > print("🧟") >> > >> > async def detect_stuck_queued_tasks(): >> > print("Oh no! A task is stuck in queued!") >> > >> > def scheduler_loop(): >> > while True: >> > print("Starting scheduling loop...") >> > time.sleep(10) >> > >> > def _do_scheduling(): >> > thread = threading.Thread(target=scheduler_loop) >> > thread.start() >> > >> > async def main(): >> > scheduler = AsyncEventScheduler() >> > scheduler.schedule_task(3, detect_zombies) >> > scheduler.schedule_task(5, detect_stuck_queued_tasks) >> > >> > _do_scheduling() >> > >> > while True: >> > print("EventScheduler running") >> > await asyncio.sleep(1) >> > >> > asyncio.run(main()) >> > >>