x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912116437

   > [@x42005e1f](https://github.com/x42005e1f) can i have your example you 
tested? In our trigger process we use lock inside sync_to_async.
   
   Yes, here is a simple mixed test:
   
   ```python
   import asyncio
   import sys
   
   from concurrent.futures import ThreadPoolExecutor, as_completed
   
   import aiologic
   
   from asgiref.sync import sync_to_async
   
   if sys.version_info >= (3, 11):
       WaitTimeout = TimeoutError
   else:
       from concurrent.futures import TimeoutError as WaitTimeout
   
   TIMEOUT = 6.0  # must be reached (success)
   THREADS = 100
   TASKS = 100
   
   lock = aiologic.Lock()
   try:
       stopped = aiologic.Flag()
   except AttributeError:  # aiologic<0.15.0
       stopped = aiologic.lowlevel.Flag()
   
   
   def sync_acquire_release():
       with lock:
           pass
   
   
   async def async_acquire_release(i):
       if i % 2:
           while True:
               await sync_to_async(sync_acquire_release)()
   
               if stopped:
                   break
       else:
           while True:
               async with lock:
                   await asyncio.sleep(0)
   
               if stopped:
                   break
   
   
   async def hub():
       await asyncio.gather(*(
           asyncio.create_task(async_acquire_release(i))
           for i in range(TASKS)
       ))
   
   
   if __name__ == "__main__":
       with ThreadPoolExecutor(THREADS) as executor:
           interval = sys.getswitchinterval()
           sys.setswitchinterval(min(1e-6, interval))
   
           try:
               futures = [
                   executor.submit(asyncio.run, hub())
                   for _ in range(THREADS)
               ]
   
               for future in as_completed(futures, timeout=TIMEOUT):
                   future.result()  # reraise
           except WaitTimeout:
               pass
           finally:
               sys.setswitchinterval(interval)
               stopped.set()
   ```
   
   But note that `sync_to_async()` uses only one worker thread by default, 
which blocks all further calls until the first one is completed. This can be 
seen, for example, in this test:
   
   ```python
   import asyncio
   import sys
   import threading
   
   from asgiref.sync import sync_to_async
   
   
   async def main():
       loop = asyncio.get_running_loop()
       first = asyncio.Event()
       second = threading.Event()
   
       def a():
           print("'a' started!")
           loop.call_soon_threadsafe(first.set)
           second.wait()
   
       def b():
           print("'b' started!")  # will never be printed!
           second.set()
   
       f1 = asyncio.create_task(sync_to_async(a)())
       await first.wait()
       print("nofitied!")
       f2 = asyncio.create_task(sync_to_async(b)())
   
       await asyncio.gather(f1, f2)
   
   
   if __name__ == "__main__":
       asyncio.run(main())
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to