This is an automated email from the ASF dual-hosted git repository. ebenizzy pushed a commit to branch application-function-name-typing in repository https://gitbox.apache.org/repos/asf/burr.git
commit 41c52a396e7e1694eee3e64e0d14b412ccf03ea2 Author: Elijah ben Izzy <[email protected]> AuthorDate: Sun Oct 19 11:32:26 2025 -0700 Fixes type annotations to be consistent on application state type --- burr/core/application.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/burr/core/application.py b/burr/core/application.py index 9831e1af..0450d983 100644 --- a/burr/core/application.py +++ b/burr/core/application.py @@ -430,13 +430,13 @@ async def _arun_single_step_streaming_action( def _run_multi_step_streaming_action( action: StreamingAction, - state: State, + state: State[ApplicationStateType], inputs: Optional[Dict[str, Any]], sequence_id: int, app_id: str, partition_key: Optional[str], lifecycle_adapters: LifecycleAdapterSet = LifecycleAdapterSet(), -) -> Generator[Tuple[dict, Optional[State]], None, None]: +) -> Generator[Tuple[dict, Optional[State[ApplicationStateType]]], None, None]: """Runs a multi-step streaming action. E.G. one with a run/reduce step. This API is internal-facing. Note that this converts the shape of a multi-step streaming action to yielding the results of the run step @@ -480,13 +480,13 @@ def _run_multi_step_streaming_action( async def _arun_multi_step_streaming_action( action: AsyncStreamingAction, - state: State, + state: State[ApplicationStateType], inputs: Optional[Dict[str, Any]], sequence_id: int, app_id: str, partition_key: Optional[str], lifecycle_adapters: LifecycleAdapterSet = LifecycleAdapterSet(), -) -> AsyncGenerator[Tuple[dict, Optional[State]], None]: +) -> AsyncGenerator[Tuple[dict, Optional[State[ApplicationStateType]]], None]: """Runs a multi-step streaming action in async. See the synchronous version for more details.""" action.validate_inputs(inputs) stream_initialize_time = system.now() @@ -1314,7 +1314,7 @@ class Application(Generic[ApplicationStateType]): halt_before: list[str] = None, halt_after: list[str] = None, inputs: Optional[Dict[str, Any]] = None, - ) -> Tuple[Action, Optional[dict], State]: + ) -> Tuple[Action, Optional[dict], State[ApplicationStateType]]: """Runs your application through until completion, using async. Does not give access to the state along the way -- if you want that, use iterate(). @@ -1486,14 +1486,16 @@ class Application(Generic[ApplicationStateType]): # we need to track if there's any exceptions that occur during this try: - def process_result(result: dict, state: State) -> Tuple[Dict[str, Any], State]: + def process_result( + result: dict, state: State[ApplicationStateType] + ) -> Tuple[Dict[str, Any], State[ApplicationStateType]]: new_state = self._update_internal_state_value(state, next_action) self._set_state(new_state) return result, new_state def callback( result: Optional[dict], - state: State, + state: State[ApplicationStateType], exc: Optional[Exception] = None, ): self._adapter_set.call_all_lifecycle_hooks_sync( @@ -1736,14 +1738,16 @@ class Application(Generic[ApplicationStateType]): ) try: - def process_result(result: dict, state: State) -> Tuple[Dict[str, Any], State]: + def process_result( + result: dict, state: State[ApplicationStateType] + ) -> Tuple[Dict[str, Any], State[ApplicationStateType]]: new_state = self._update_internal_state_value(state, next_action) self._set_state(new_state) return result, new_state async def callback( result: Optional[dict], - state: State, + state: State[ApplicationStateType], exc: Optional[Exception] = None, ): await self._adapter_set.call_all_lifecycle_hooks_sync_and_async(
