amoghrajesh commented on code in PR #66708:
URL: https://github.com/apache/airflow/pull/66708#discussion_r3231738080


##########
shared/state/src/airflow_shared/state/__init__.py:
##########
@@ -103,15 +115,25 @@ def clear(self, scope: StateScope, *, all_map_indices: 
bool = False) -> None:
 
     @abstractmethod
     async def aget(self, scope: StateScope, key: str) -> str | None:
-        """Async variant of get. Must handle both ``TaskScope`` and 
``AssetScope``."""
+        """
+        Async variant of get. Must handle both ``TaskScope`` and 
``AssetScope``.
+
+        Async methods do not take a ``session`` argument — implementations are 
expected
+        to manage their own async sessions internally (e.g. via 
``create_session_async``).

Review Comment:
   The async variants exist for AIP-98 compatibility, ie: async tasks run in 
the triggerer event loop and will need non blocking `await 
task_state.a(set|get...)` eventually. They are in the base interface now so 
backend authors implement them upfront.
   
   The reason it was missed is because async endpoint usage is not wired yet 
because the full async path is non-trivial: the task-sdk HTTP client is sync 
only and the triggerer routes all API calls through a sync supervisor 
subprocess. I realised this and created a ticket for this: 
https://github.com/apache/airflow/issues/66842
   
   I agree on your observation and will add `session: AsyncSession | None = 
None` to the async methods for compatibility



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to