One thing to bear in mind here is the number of db connections - each 
connection can only be used by one thread or coroutine at a time, so even when 
the scheduler is changed to use async db calls, we might not be able to do a 
lot of the scheduled tasks concurrently.

At least not without thinking a bit more about it anyways. Connection pooling 
would likely help here (and in fact it's the only time it makes sense, right 
now there should "never" be a reason for a sync airflow process to have more 
than one open connection)

-ash

On 3 May 2024 18:10:45 BST, Daniel Standish 
<daniel.stand...@astronomer.io.INVALID> wrote:
>But you could run them in a thread or subprocess.
>
>Another option would be to just take all of the timed events and make them
>all asyncio and then run them all via asyncio in one continually running
>thread.  That would be a bite size step towards AIP-70.  Though, it might
>be a large bite :)
>
>On Fri, May 3, 2024 at 6:29 AM Hussein Awala <huss...@awala.fr> wrote:
>
>> If we don't have many Asyncio tasks running in the event loop, there will
>> not be any benefit from migrating to asynchronous, IMHO it will be anyway
>> rewritten to be asynchronous as a part of AIP-70
>> <
>> https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-70+Migrating+to+asynchronous+programming
>> >
>> (WIP)
>> where we will need to rewrite the scheduler if the AIP is accepted.
>>
>> On Fri, May 3, 2024 at 2:49 PM Ryan Hatter
>> <ryan.hat...@astronomer.io.invalid> wrote:
>>
>> > This might be a dumb question as I don't have experience with asyncio,
>> but
>> > should the EventScheduler
>> > <
>> >
>> https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L930
>> > >
>> > in
>> > the Airflow scheduler be rewritten to be asynchronous?
>> >
>> > The so called "timed events" (e.g. zombie reaping, handling tasks stuck
>> in
>> > queued, etc <import asyncio import threading import time class
>> > AsyncEventScheduler: def __init__(self): self.tasks = [] async def
>> > call_regular_interval(self, interval, action, *args, **kwargs):
>> > """Schedules action to be called every `interval` seconds
>> > asynchronously.""" while True: await asyncio.sleep(interval) await
>> > action(*args, **kwargs) def schedule_task(self, interval, action, *args,
>> > **kwargs): """Add tasks that run periodically in an asynchronous
>> manner."""
>> > task = asyncio.create_task( self.call_regular_interval(interval, action,
>> > *args, **kwargs) ) self.tasks.append(task) async def detect_zombies():
>> > print("🧟") async def detect_stuck_queued_tasks(): print("Oh no! A task
>> is
>> > stuck in queued!") def scheduler_loop(): while True: print("Starting
>> > scheduling loop...") time.sleep(10) def _do_scheduling(): thread =
>> > threading.Thread(target=scheduler_loop) thread.start() async def main():
>> > scheduler = AsyncEventScheduler() scheduler.schedule_task(3,
>> > detect_zombies) scheduler.schedule_task(5, detect_stuck_queued_tasks)
>> > _do_scheduling() while True: print("EventScheduler running") await
>> > asyncio.sleep(1) asyncio.run(main())>) scheduled by this EventScheduler
>> are
>> > blocking and run queries against the DB that can occasionally be
>> expensive
>> > and cause substantial delays in the scheduler, which can result in
>> repeated
>> > scheduler restarts.
>> >
>> > Below is a trivialized example of what this might look like -- curious to
>> > hear your thoughts!
>> >
>> > import asyncio
>> > import threading
>> > import time
>> >
>> > class AsyncEventScheduler:
>> > def __init__(self):
>> > self.tasks = []
>> >
>> > async def call_regular_interval(self, interval, action, *args, **kwargs):
>> > """Schedules action to be called every `interval` seconds
>> > asynchronously."""
>> > while True:
>> > await asyncio.sleep(interval)
>> > await action(*args, **kwargs)
>> >
>> > def schedule_task(self, interval, action, *args, **kwargs):
>> > """Add tasks that run periodically in an asynchronous manner."""
>> > task = asyncio.create_task(
>> > self.call_regular_interval(interval, action, *args, **kwargs)
>> > )
>> > self.tasks.append(task)
>> >
>> > async def detect_zombies():
>> > print("🧟")
>> >
>> > async def detect_stuck_queued_tasks():
>> > print("Oh no! A task is stuck in queued!")
>> >
>> > def scheduler_loop():
>> > while True:
>> > print("Starting scheduling loop...")
>> > time.sleep(10)
>> >
>> > def _do_scheduling():
>> > thread = threading.Thread(target=scheduler_loop)
>> > thread.start()
>> >
>> > async def main():
>> > scheduler = AsyncEventScheduler()
>> > scheduler.schedule_task(3, detect_zombies)
>> > scheduler.schedule_task(5, detect_stuck_queued_tasks)
>> >
>> > _do_scheduling()
>> >
>> > while True:
>> > print("EventScheduler running")
>> > await asyncio.sleep(1)
>> >
>> > asyncio.run(main())
>> >
>>

Reply via email to