x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912116437
> [@x42005e1f](https://github.com/x42005e1f) can i have your example you tested? In our trigger process we use lock inside sync_to_async. Yes, here is a simple mixed test: ```python import asyncio import sys from concurrent.futures import ThreadPoolExecutor, as_completed import aiologic from asgiref.sync import sync_to_async if sys.version_info >= (3, 11): WaitTimeout = TimeoutError else: from concurrent.futures import TimeoutError as WaitTimeout TIMEOUT = 6.0 # must be reached (success) THREADS = 100 TASKS = 100 lock = aiologic.Lock() try: stopped = aiologic.Flag() except AttributeError: # aiologic<0.15.0 stopped = aiologic.lowlevel.Flag() def sync_acquire_release(): with lock: pass async def async_acquire_release(i): if i % 2: while True: await sync_to_async(sync_acquire_release)() if stopped: break else: while True: async with lock: await asyncio.sleep(0) if stopped: break async def hub(): await asyncio.gather(*( asyncio.create_task(async_acquire_release(i)) for i in range(TASKS) )) if __name__ == "__main__": with ThreadPoolExecutor(THREADS) as executor: interval = sys.getswitchinterval() sys.setswitchinterval(min(1e-6, interval)) try: futures = [ executor.submit(asyncio.run, hub()) for _ in range(THREADS) ] for future in as_completed(futures, timeout=TIMEOUT): future.result() # reraise except WaitTimeout: pass finally: sys.setswitchinterval(interval) stopped.set() ``` But note that `sync_to_async()` uses only one worker thread by default, which blocks all further calls until the first one is completed. This can be seen, for example, in this test: ```python import asyncio import sys import threading from asgiref.sync import sync_to_async async def main(): loop = asyncio.get_running_loop() first = asyncio.Event() second = threading.Event() def a(): print("'a' started!") loop.call_soon_threadsafe(first.set) second.wait() def b(): print("'b' started!") # will never be printed! second.set() f1 = asyncio.create_task(sync_to_async(a)()) await first.wait() print("nofitied!") f2 = asyncio.create_task(sync_to_async(b)()) await asyncio.gather(f1, f2) if __name__ == "__main__": asyncio.run(main()) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org