This might be a dumb question as I don't have experience with asyncio, but
should the EventScheduler
<https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L930>
in
the Airflow scheduler be rewritten to be asynchronous?

The so called "timed events" (e.g. zombie reaping, handling tasks stuck in
queued, etc <import asyncio import threading import time class
AsyncEventScheduler: def __init__(self): self.tasks = [] async def
call_regular_interval(self, interval, action, *args, **kwargs):
"""Schedules action to be called every `interval` seconds
asynchronously.""" while True: await asyncio.sleep(interval) await
action(*args, **kwargs) def schedule_task(self, interval, action, *args,
**kwargs): """Add tasks that run periodically in an asynchronous manner."""
task = asyncio.create_task( self.call_regular_interval(interval, action,
*args, **kwargs) ) self.tasks.append(task) async def detect_zombies():
print("🧟") async def detect_stuck_queued_tasks(): print("Oh no! A task is
stuck in queued!") def scheduler_loop(): while True: print("Starting
scheduling loop...") time.sleep(10) def _do_scheduling(): thread =
threading.Thread(target=scheduler_loop) thread.start() async def main():
scheduler = AsyncEventScheduler() scheduler.schedule_task(3,
detect_zombies) scheduler.schedule_task(5, detect_stuck_queued_tasks)
_do_scheduling() while True: print("EventScheduler running") await
asyncio.sleep(1) asyncio.run(main())>) scheduled by this EventScheduler are
blocking and run queries against the DB that can occasionally be expensive
and cause substantial delays in the scheduler, which can result in repeated
scheduler restarts.

Below is a trivialized example of what this might look like -- curious to
hear your thoughts!

import asyncio
import threading
import time

class AsyncEventScheduler:
def __init__(self):
self.tasks = []

async def call_regular_interval(self, interval, action, *args, **kwargs):
"""Schedules action to be called every `interval` seconds asynchronously."""
while True:
await asyncio.sleep(interval)
await action(*args, **kwargs)

def schedule_task(self, interval, action, *args, **kwargs):
"""Add tasks that run periodically in an asynchronous manner."""
task = asyncio.create_task(
self.call_regular_interval(interval, action, *args, **kwargs)
)
self.tasks.append(task)

async def detect_zombies():
print("🧟")

async def detect_stuck_queued_tasks():
print("Oh no! A task is stuck in queued!")

def scheduler_loop():
while True:
print("Starting scheduling loop...")
time.sleep(10)

def _do_scheduling():
thread = threading.Thread(target=scheduler_loop)
thread.start()

async def main():
scheduler = AsyncEventScheduler()
scheduler.schedule_task(3, detect_zombies)
scheduler.schedule_task(5, detect_stuck_queued_tasks)

_do_scheduling()

while True:
print("EventScheduler running")
await asyncio.sleep(1)

asyncio.run(main())

Reply via email to