This might be a dumb question as I don't have experience with asyncio, but should the EventScheduler <https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L930> in the Airflow scheduler be rewritten to be asynchronous?
The so called "timed events" (e.g. zombie reaping, handling tasks stuck in queued, etc <import asyncio import threading import time class AsyncEventScheduler: def __init__(self): self.tasks = [] async def call_regular_interval(self, interval, action, *args, **kwargs): """Schedules action to be called every `interval` seconds asynchronously.""" while True: await asyncio.sleep(interval) await action(*args, **kwargs) def schedule_task(self, interval, action, *args, **kwargs): """Add tasks that run periodically in an asynchronous manner.""" task = asyncio.create_task( self.call_regular_interval(interval, action, *args, **kwargs) ) self.tasks.append(task) async def detect_zombies(): print("🧟") async def detect_stuck_queued_tasks(): print("Oh no! A task is stuck in queued!") def scheduler_loop(): while True: print("Starting scheduling loop...") time.sleep(10) def _do_scheduling(): thread = threading.Thread(target=scheduler_loop) thread.start() async def main(): scheduler = AsyncEventScheduler() scheduler.schedule_task(3, detect_zombies) scheduler.schedule_task(5, detect_stuck_queued_tasks) _do_scheduling() while True: print("EventScheduler running") await asyncio.sleep(1) asyncio.run(main())>) scheduled by this EventScheduler are blocking and run queries against the DB that can occasionally be expensive and cause substantial delays in the scheduler, which can result in repeated scheduler restarts. Below is a trivialized example of what this might look like -- curious to hear your thoughts! import asyncio import threading import time class AsyncEventScheduler: def __init__(self): self.tasks = [] async def call_regular_interval(self, interval, action, *args, **kwargs): """Schedules action to be called every `interval` seconds asynchronously.""" while True: await asyncio.sleep(interval) await action(*args, **kwargs) def schedule_task(self, interval, action, *args, **kwargs): """Add tasks that run periodically in an asynchronous manner.""" task = asyncio.create_task( self.call_regular_interval(interval, action, *args, **kwargs) ) self.tasks.append(task) async def detect_zombies(): print("🧟") async def detect_stuck_queued_tasks(): print("Oh no! A task is stuck in queued!") def scheduler_loop(): while True: print("Starting scheduling loop...") time.sleep(10) def _do_scheduling(): thread = threading.Thread(target=scheduler_loop) thread.start() async def main(): scheduler = AsyncEventScheduler() scheduler.schedule_task(3, detect_zombies) scheduler.schedule_task(5, detect_stuck_queued_tasks) _do_scheduling() while True: print("EventScheduler running") await asyncio.sleep(1) asyncio.run(main())