x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917682923

   > [@x42005e1f](https://github.com/x42005e1f) please add your thoughts.
   
   I do not think that making the function return type dependent on runtime is 
a good pattern. This approach is poorly compatible with static analyzers and it 
will also not work correctly in all cases. Instead, I suggest shifting the 
focus not on how to create greenlets, but on how to pass awaitable objects from 
them. Here is how it can be implemented with a ready-made example for 
`aiologic.Lock`:
   
   ```python
   from __future__ import annotations
   
   import inspect
   import sys
   
   from collections.abc import Awaitable, Callable
   from functools import wraps
   from typing import TypeVar, cast
   
   import aiologic
   import greenback
   
   if sys.version_info >= (3, 10):
       from typing import ParamSpec
   else:
       from typing_extensions import ParamSpec
   
   _T = TypeVar("_T")
   _P = ParamSpec("_P")
   
   
   def sync_or_async(
       sync_func: Callable[_P, _T],
       async_func: Callable[_P, _T] | Callable[_P, Awaitable[_T]],
   ) -> Callable[_P, _T]:
       if inspect.iscoroutinefunction(async_func):
           async_impl = greenback.autoawait(async_func)
       else:
           async_impl = async_func
   
       async_impl = cast(Callable[_P, _T], async_impl)
   
       @wraps(sync_func)
       def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T:
           if greenback.has_portal():
               return async_impl(*args, **kwargs)
           else:
               return sync_func(*args, **kwargs)
   
       return wrapper
   
   
   class CommsLock(aiologic.Lock):
       __slots__ = ()
   
       async def _async_acquire_with_timeout(
           self,
           /,
           *,
           blocking: bool = True,
           timeout: float | None = None,
       ) -> bool:
           return await self.async_acquire(blocking=blocking)
   
       green_acquire = sync_or_async(
           aiologic.Lock.green_acquire,
           _async_acquire_with_timeout,  # type: ignore[arg-type]
       )
       green_release = sync_or_async(
           aiologic.Lock.green_release,
           aiologic.Lock.async_release,
       )
   ```
   
   ```python
   lock = CommsLock()
   
   async def noop() -> None:
       pass
   
   async def holding() -> None:
       async with lock:
           await asyncio.sleep(0)  # switch back to the event loop
   
   def acquire_release() -> None:
       with lock:
           pass  # do something
   
   # make CommsLock implicitly awaitable for the current task
   await greenback.ensure_portal()
   
   # hold the lock with another task
   holder = asyncio.create_task(holding())
   await asyncio.sleep(0)
   assert lock.locked()
   
   # a task to verify that CommsLock does indeed yield to the event loop
   task = asyncio.create_task(noop())
   assert not task.done()
   
   acquire_release()  # sync-or-async call
   assert task.done()  # there was a context switch!
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to