This is an automated email from the ASF dual-hosted git repository.

ebenizzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/burr.git


The following commit(s) were added to refs/heads/main by this push:
     new 48db8297 Fixes type annotations to be consistent on application state 
type (#588)
48db8297 is described below

commit 48db8297b7d216f772f0a46c2f0ab638606484be
Author: Elijah ben Izzy <[email protected]>
AuthorDate: Sun Nov 9 19:31:17 2025 -0800

    Fixes type annotations to be consistent on application state type (#588)
    
    Co-authored-by: Elijah ben Izzy <[email protected]>
---
 burr/core/application.py | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git a/burr/core/application.py b/burr/core/application.py
index 9831e1af..0450d983 100644
--- a/burr/core/application.py
+++ b/burr/core/application.py
@@ -430,13 +430,13 @@ async def _arun_single_step_streaming_action(
 
 def _run_multi_step_streaming_action(
     action: StreamingAction,
-    state: State,
+    state: State[ApplicationStateType],
     inputs: Optional[Dict[str, Any]],
     sequence_id: int,
     app_id: str,
     partition_key: Optional[str],
     lifecycle_adapters: LifecycleAdapterSet = LifecycleAdapterSet(),
-) -> Generator[Tuple[dict, Optional[State]], None, None]:
+) -> Generator[Tuple[dict, Optional[State[ApplicationStateType]]], None, None]:
     """Runs a multi-step streaming action. E.G. one with a run/reduce step.
     This API is internal-facing. Note that this converts the shape of a
     multi-step streaming action to yielding the results of the run step
@@ -480,13 +480,13 @@ def _run_multi_step_streaming_action(
 
 async def _arun_multi_step_streaming_action(
     action: AsyncStreamingAction,
-    state: State,
+    state: State[ApplicationStateType],
     inputs: Optional[Dict[str, Any]],
     sequence_id: int,
     app_id: str,
     partition_key: Optional[str],
     lifecycle_adapters: LifecycleAdapterSet = LifecycleAdapterSet(),
-) -> AsyncGenerator[Tuple[dict, Optional[State]], None]:
+) -> AsyncGenerator[Tuple[dict, Optional[State[ApplicationStateType]]], None]:
     """Runs a multi-step streaming action in async. See the synchronous 
version for more details."""
     action.validate_inputs(inputs)
     stream_initialize_time = system.now()
@@ -1314,7 +1314,7 @@ class Application(Generic[ApplicationStateType]):
         halt_before: list[str] = None,
         halt_after: list[str] = None,
         inputs: Optional[Dict[str, Any]] = None,
-    ) -> Tuple[Action, Optional[dict], State]:
+    ) -> Tuple[Action, Optional[dict], State[ApplicationStateType]]:
         """Runs your application through until completion, using async. Does
         not give access to the state along the way -- if you want that, use 
iterate().
 
@@ -1486,14 +1486,16 @@ class Application(Generic[ApplicationStateType]):
         # we need to track if there's any exceptions that occur during this
         try:
 
-            def process_result(result: dict, state: State) -> Tuple[Dict[str, 
Any], State]:
+            def process_result(
+                result: dict, state: State[ApplicationStateType]
+            ) -> Tuple[Dict[str, Any], State[ApplicationStateType]]:
                 new_state = self._update_internal_state_value(state, 
next_action)
                 self._set_state(new_state)
                 return result, new_state
 
             def callback(
                 result: Optional[dict],
-                state: State,
+                state: State[ApplicationStateType],
                 exc: Optional[Exception] = None,
             ):
                 self._adapter_set.call_all_lifecycle_hooks_sync(
@@ -1736,14 +1738,16 @@ class Application(Generic[ApplicationStateType]):
         )
         try:
 
-            def process_result(result: dict, state: State) -> Tuple[Dict[str, 
Any], State]:
+            def process_result(
+                result: dict, state: State[ApplicationStateType]
+            ) -> Tuple[Dict[str, Any], State[ApplicationStateType]]:
                 new_state = self._update_internal_state_value(state, 
next_action)
                 self._set_state(new_state)
                 return result, new_state
 
             async def callback(
                 result: Optional[dict],
-                state: State,
+                state: State[ApplicationStateType],
                 exc: Optional[Exception] = None,
             ):
                 await 
self._adapter_set.call_all_lifecycle_hooks_sync_and_async(

Reply via email to