gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917855355
> > [@x42005e1f](https://github.com/x42005e1f) please add your thoughts. > > I do not think that making the function return type dependent on runtime is a good pattern. This approach is poorly compatible with static analyzers and it will also not work correctly in all cases. Instead, I suggest shifting the focus not on how to create greenlets, but on how to pass awaitable objects from them. Here is how it can be implemented with a ready-made example for `aiologic.Lock`: > > from __future__ import annotations > > import inspect > import sys > > from collections.abc import Awaitable, Callable > from functools import wraps > from typing import TypeVar, cast > > import aiologic > import anyio > import greenback > > if sys.version_info >= (3, 10): > from typing import ParamSpec > else: > from typing_extensions import ParamSpec > > _T = TypeVar("_T") > _P = ParamSpec("_P") > > > def sync_or_async( > sync_func: Callable[_P, _T], > async_func: Callable[_P, _T] | Callable[_P, Awaitable[_T]], > ) -> Callable[_P, _T]: > if inspect.iscoroutinefunction(async_func): > async_impl = greenback.autoawait(async_func) > else: > async_impl = async_func > > async_impl = cast(Callable[_P, _T], async_impl) > > @wraps(sync_func) > def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: > if greenback.has_portal(): > return async_impl(*args, **kwargs) > else: > return sync_func(*args, **kwargs) > > return wrapper > > > class CommsLock(aiologic.Lock): > __slots__ = () > > async def _async_acquire_with_timeout( > self, > /, > *, > blocking: bool = True, > timeout: float | None = None, > ) -> bool: > with anyio.move_on_after(timeout): > return await self.async_acquire(blocking=blocking) > > return False > > green_acquire = sync_or_async( > aiologic.Lock.green_acquire, > _async_acquire_with_timeout, # type: ignore[arg-type] > ) > green_release = sync_or_async( > aiologic.Lock.green_release, > aiologic.Lock.async_release, > ) > lock = CommsLock() > > async def noop() -> None: > pass > > async def holding() -> None: > async with lock: > await asyncio.sleep(0) # switch back to the event loop > > def acquire_release() -> None: > with lock: > pass # do something > > # make CommsLock implicitly awaitable for the current task > await greenback.ensure_portal() > > # hold the lock with another task > holder = asyncio.create_task(holding()) > await asyncio.sleep(0) > assert lock.locked() > > # a task to verify that CommsLock does indeed yield to the event loop > task = asyncio.create_task(noop()) > assert not task.done() > > acquire_release() # sync-or-async call > assert task.done() # there was a context switch! can this `greenback.ensure_portal()` be opened on global level in trigger process, why i am asking is its bit problematic to add this before every function that calls from triggerer. Also users have their own custom triggers they have to update this before every function call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
