If we don't have many Asyncio tasks running in the event loop, there will not be any benefit from migrating to asynchronous, IMHO it will be anyway rewritten to be asynchronous as a part of AIP-70 <https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-70+Migrating+to+asynchronous+programming> (WIP) where we will need to rewrite the scheduler if the AIP is accepted.
On Fri, May 3, 2024 at 2:49 PM Ryan Hatter <ryan.hat...@astronomer.io.invalid> wrote: > This might be a dumb question as I don't have experience with asyncio, but > should the EventScheduler > < > https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L930 > > > in > the Airflow scheduler be rewritten to be asynchronous? > > The so called "timed events" (e.g. zombie reaping, handling tasks stuck in > queued, etc <import asyncio import threading import time class > AsyncEventScheduler: def __init__(self): self.tasks = [] async def > call_regular_interval(self, interval, action, *args, **kwargs): > """Schedules action to be called every `interval` seconds > asynchronously.""" while True: await asyncio.sleep(interval) await > action(*args, **kwargs) def schedule_task(self, interval, action, *args, > **kwargs): """Add tasks that run periodically in an asynchronous manner.""" > task = asyncio.create_task( self.call_regular_interval(interval, action, > *args, **kwargs) ) self.tasks.append(task) async def detect_zombies(): > print("🧟") async def detect_stuck_queued_tasks(): print("Oh no! A task is > stuck in queued!") def scheduler_loop(): while True: print("Starting > scheduling loop...") time.sleep(10) def _do_scheduling(): thread = > threading.Thread(target=scheduler_loop) thread.start() async def main(): > scheduler = AsyncEventScheduler() scheduler.schedule_task(3, > detect_zombies) scheduler.schedule_task(5, detect_stuck_queued_tasks) > _do_scheduling() while True: print("EventScheduler running") await > asyncio.sleep(1) asyncio.run(main())>) scheduled by this EventScheduler are > blocking and run queries against the DB that can occasionally be expensive > and cause substantial delays in the scheduler, which can result in repeated > scheduler restarts. > > Below is a trivialized example of what this might look like -- curious to > hear your thoughts! > > import asyncio > import threading > import time > > class AsyncEventScheduler: > def __init__(self): > self.tasks = [] > > async def call_regular_interval(self, interval, action, *args, **kwargs): > """Schedules action to be called every `interval` seconds > asynchronously.""" > while True: > await asyncio.sleep(interval) > await action(*args, **kwargs) > > def schedule_task(self, interval, action, *args, **kwargs): > """Add tasks that run periodically in an asynchronous manner.""" > task = asyncio.create_task( > self.call_regular_interval(interval, action, *args, **kwargs) > ) > self.tasks.append(task) > > async def detect_zombies(): > print("🧟") > > async def detect_stuck_queued_tasks(): > print("Oh no! A task is stuck in queued!") > > def scheduler_loop(): > while True: > print("Starting scheduling loop...") > time.sleep(10) > > def _do_scheduling(): > thread = threading.Thread(target=scheduler_loop) > thread.start() > > async def main(): > scheduler = AsyncEventScheduler() > scheduler.schedule_task(3, detect_zombies) > scheduler.schedule_task(5, detect_stuck_queued_tasks) > > _do_scheduling() > > while True: > print("EventScheduler running") > await asyncio.sleep(1) > > asyncio.run(main()) >