This is an automated email from the ASF dual-hosted git repository.
ebenizzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/burr.git
The following commit(s) were added to refs/heads/main by this push:
new 48db8297 Fixes type annotations to be consistent on application state
type (#588)
48db8297 is described below
commit 48db8297b7d216f772f0a46c2f0ab638606484be
Author: Elijah ben Izzy <[email protected]>
AuthorDate: Sun Nov 9 19:31:17 2025 -0800
Fixes type annotations to be consistent on application state type (#588)
Co-authored-by: Elijah ben Izzy <[email protected]>
---
burr/core/application.py | 22 +++++++++++++---------
1 file changed, 13 insertions(+), 9 deletions(-)
diff --git a/burr/core/application.py b/burr/core/application.py
index 9831e1af..0450d983 100644
--- a/burr/core/application.py
+++ b/burr/core/application.py
@@ -430,13 +430,13 @@ async def _arun_single_step_streaming_action(
def _run_multi_step_streaming_action(
action: StreamingAction,
- state: State,
+ state: State[ApplicationStateType],
inputs: Optional[Dict[str, Any]],
sequence_id: int,
app_id: str,
partition_key: Optional[str],
lifecycle_adapters: LifecycleAdapterSet = LifecycleAdapterSet(),
-) -> Generator[Tuple[dict, Optional[State]], None, None]:
+) -> Generator[Tuple[dict, Optional[State[ApplicationStateType]]], None, None]:
"""Runs a multi-step streaming action. E.G. one with a run/reduce step.
This API is internal-facing. Note that this converts the shape of a
multi-step streaming action to yielding the results of the run step
@@ -480,13 +480,13 @@ def _run_multi_step_streaming_action(
async def _arun_multi_step_streaming_action(
action: AsyncStreamingAction,
- state: State,
+ state: State[ApplicationStateType],
inputs: Optional[Dict[str, Any]],
sequence_id: int,
app_id: str,
partition_key: Optional[str],
lifecycle_adapters: LifecycleAdapterSet = LifecycleAdapterSet(),
-) -> AsyncGenerator[Tuple[dict, Optional[State]], None]:
+) -> AsyncGenerator[Tuple[dict, Optional[State[ApplicationStateType]]], None]:
"""Runs a multi-step streaming action in async. See the synchronous
version for more details."""
action.validate_inputs(inputs)
stream_initialize_time = system.now()
@@ -1314,7 +1314,7 @@ class Application(Generic[ApplicationStateType]):
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
- ) -> Tuple[Action, Optional[dict], State]:
+ ) -> Tuple[Action, Optional[dict], State[ApplicationStateType]]:
"""Runs your application through until completion, using async. Does
not give access to the state along the way -- if you want that, use
iterate().
@@ -1486,14 +1486,16 @@ class Application(Generic[ApplicationStateType]):
# we need to track if there's any exceptions that occur during this
try:
- def process_result(result: dict, state: State) -> Tuple[Dict[str,
Any], State]:
+ def process_result(
+ result: dict, state: State[ApplicationStateType]
+ ) -> Tuple[Dict[str, Any], State[ApplicationStateType]]:
new_state = self._update_internal_state_value(state,
next_action)
self._set_state(new_state)
return result, new_state
def callback(
result: Optional[dict],
- state: State,
+ state: State[ApplicationStateType],
exc: Optional[Exception] = None,
):
self._adapter_set.call_all_lifecycle_hooks_sync(
@@ -1736,14 +1738,16 @@ class Application(Generic[ApplicationStateType]):
)
try:
- def process_result(result: dict, state: State) -> Tuple[Dict[str,
Any], State]:
+ def process_result(
+ result: dict, state: State[ApplicationStateType]
+ ) -> Tuple[Dict[str, Any], State[ApplicationStateType]]:
new_state = self._update_internal_state_value(state,
next_action)
self._set_state(new_state)
return result, new_state
async def callback(
result: Optional[dict],
- state: State,
+ state: State[ApplicationStateType],
exc: Optional[Exception] = None,
):
await
self._adapter_set.call_all_lifecycle_hooks_sync_and_async(