x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917682923
> [@x42005e1f](https://github.com/x42005e1f) please add your thoughts. I do not think that making the function return type dependent on runtime is a good pattern. This approach is poorly compatible with static analyzers and it will also not work correctly in all cases. Instead, I suggest shifting the focus not on how to create greenlets, but on how to pass awaitable objects from them. Here is how it can be implemented with a ready-made example for `aiologic.Lock`: ```python from __future__ import annotations import inspect import sys from collections.abc import Awaitable, Callable from functools import wraps from typing import TypeVar, cast import aiologic import greenback if sys.version_info >= (3, 10): from typing import ParamSpec else: from typing_extensions import ParamSpec _T = TypeVar("_T") _P = ParamSpec("_P") def sync_or_async( sync_func: Callable[_P, _T], async_func: Callable[_P, _T] | Callable[_P, Awaitable[_T]], ) -> Callable[_P, _T]: if inspect.iscoroutinefunction(async_func): async_impl = greenback.autoawait(async_func) else: async_impl = async_func async_impl = cast(Callable[_P, _T], async_impl) @wraps(sync_func) def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: if greenback.has_portal(): return async_impl(*args, **kwargs) else: return sync_func(*args, **kwargs) return wrapper class CommsLock(aiologic.Lock): __slots__ = () async def _async_acquire_with_timeout( self, /, *, blocking: bool = True, timeout: float | None = None, ) -> bool: return await self.async_acquire(blocking=blocking) green_acquire = sync_or_async( aiologic.Lock.green_acquire, _async_acquire_with_timeout, # type: ignore[arg-type] ) green_release = sync_or_async( aiologic.Lock.green_release, aiologic.Lock.async_release, ) ``` ```python lock = CommsLock() async def noop() -> None: pass async def holding() -> None: async with lock: await asyncio.sleep(0) # switch back to the event loop def acquire_release() -> None: with lock: pass # do something # make CommsLock implicitly awaitable for the current task await greenback.ensure_portal() # hold the lock with another task holder = asyncio.create_task(holding()) await asyncio.sleep(0) assert lock.locked() # a task to verify that CommsLock does indeed yield to the event loop task = asyncio.create_task(noop()) assert not task.done() acquire_release() # sync-or-async call assert task.done() # there was a context switch! ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
