If we don't have many Asyncio tasks running in the event loop, there will
not be any benefit from migrating to asynchronous, IMHO it will be anyway
rewritten to be asynchronous as a part of AIP-70
<https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-70+Migrating+to+asynchronous+programming>
(WIP)
where we will need to rewrite the scheduler if the AIP is accepted.

On Fri, May 3, 2024 at 2:49 PM Ryan Hatter
<ryan.hat...@astronomer.io.invalid> wrote:

> This might be a dumb question as I don't have experience with asyncio, but
> should the EventScheduler
> <
> https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L930
> >
> in
> the Airflow scheduler be rewritten to be asynchronous?
>
> The so called "timed events" (e.g. zombie reaping, handling tasks stuck in
> queued, etc <import asyncio import threading import time class
> AsyncEventScheduler: def __init__(self): self.tasks = [] async def
> call_regular_interval(self, interval, action, *args, **kwargs):
> """Schedules action to be called every `interval` seconds
> asynchronously.""" while True: await asyncio.sleep(interval) await
> action(*args, **kwargs) def schedule_task(self, interval, action, *args,
> **kwargs): """Add tasks that run periodically in an asynchronous manner."""
> task = asyncio.create_task( self.call_regular_interval(interval, action,
> *args, **kwargs) ) self.tasks.append(task) async def detect_zombies():
> print("🧟") async def detect_stuck_queued_tasks(): print("Oh no! A task is
> stuck in queued!") def scheduler_loop(): while True: print("Starting
> scheduling loop...") time.sleep(10) def _do_scheduling(): thread =
> threading.Thread(target=scheduler_loop) thread.start() async def main():
> scheduler = AsyncEventScheduler() scheduler.schedule_task(3,
> detect_zombies) scheduler.schedule_task(5, detect_stuck_queued_tasks)
> _do_scheduling() while True: print("EventScheduler running") await
> asyncio.sleep(1) asyncio.run(main())>) scheduled by this EventScheduler are
> blocking and run queries against the DB that can occasionally be expensive
> and cause substantial delays in the scheduler, which can result in repeated
> scheduler restarts.
>
> Below is a trivialized example of what this might look like -- curious to
> hear your thoughts!
>
> import asyncio
> import threading
> import time
>
> class AsyncEventScheduler:
> def __init__(self):
> self.tasks = []
>
> async def call_regular_interval(self, interval, action, *args, **kwargs):
> """Schedules action to be called every `interval` seconds
> asynchronously."""
> while True:
> await asyncio.sleep(interval)
> await action(*args, **kwargs)
>
> def schedule_task(self, interval, action, *args, **kwargs):
> """Add tasks that run periodically in an asynchronous manner."""
> task = asyncio.create_task(
> self.call_regular_interval(interval, action, *args, **kwargs)
> )
> self.tasks.append(task)
>
> async def detect_zombies():
> print("🧟")
>
> async def detect_stuck_queued_tasks():
> print("Oh no! A task is stuck in queued!")
>
> def scheduler_loop():
> while True:
> print("Starting scheduling loop...")
> time.sleep(10)
>
> def _do_scheduling():
> thread = threading.Thread(target=scheduler_loop)
> thread.start()
>
> async def main():
> scheduler = AsyncEventScheduler()
> scheduler.schedule_task(3, detect_zombies)
> scheduler.schedule_task(5, detect_stuck_queued_tasks)
>
> _do_scheduling()
>
> while True:
> print("EventScheduler running")
> await asyncio.sleep(1)
>
> asyncio.run(main())
>

Reply via email to