This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch improve_state_machine_apply in repository https://gitbox.apache.org/repos/asf/iggy.git
commit c1d56fa11da934874787e12411597d887a615613 Author: numinex <[email protected]> AuthorDate: Tue Feb 17 14:04:22 2026 +0100 refactor(metadata): improve the stm update method --- core/common/src/lib.rs | 1 + core/common/src/types/{mod.rs => either.rs} | 28 ++++-------------- core/common/src/types/mod.rs | 1 + core/metadata/src/stm/mod.rs | 46 +++++++++++++++++++---------- core/metadata/src/stm/mux.rs | 28 +++++++++++------- 5 files changed, 55 insertions(+), 49 deletions(-) diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index 20e0de846..a87c07b5c 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -90,6 +90,7 @@ pub use types::consumer::consumer_offset::*; pub use types::consumer::consumer_offset_info::*; pub use types::consumer::consumer_offsets::*; pub use types::diagnostic::diagnostic_event::DiagnosticEvent; +pub use types::either::Either; pub use types::identifier::*; pub use types::message::*; pub use types::partition::*; diff --git a/core/common/src/types/mod.rs b/core/common/src/types/either.rs similarity index 57% copy from core/common/src/types/mod.rs copy to core/common/src/types/either.rs index 79c1a1df7..684ce90dd 100644 --- a/core/common/src/types/mod.rs +++ b/core/common/src/types/either.rs @@ -15,26 +15,8 @@ // specific language governing permissions and limitations // under the License. -pub(crate) mod args; -pub(crate) mod client; -pub(crate) mod client_state; -pub(crate) mod cluster; -pub(crate) mod command; -pub(crate) mod compression; -pub(crate) mod configuration; -pub(crate) mod consensus; -pub(crate) mod consumer; -pub(crate) mod diagnostic; -pub(crate) mod identifier; -pub(crate) mod message; -pub(crate) mod partition; -pub(crate) mod permissions; -pub(crate) mod personal_access_tokens; -pub(crate) mod segment; -pub(crate) mod segment_storage; -pub(crate) mod snapshot; -pub(crate) mod stats; -pub(crate) mod stream; -pub(crate) mod streaming_stats; -pub(crate) mod topic; -pub(crate) mod user; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Either<L, R> { + Left(L), + Right(R), +} diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs index 79c1a1df7..1aaf2aae7 100644 --- a/core/common/src/types/mod.rs +++ b/core/common/src/types/mod.rs @@ -25,6 +25,7 @@ pub(crate) mod configuration; pub(crate) mod consensus; pub(crate) mod consumer; pub(crate) mod diagnostic; +pub(crate) mod either; pub(crate) mod identifier; pub(crate) mod message; pub(crate) mod partition; diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index d5f3d573e..7de1992d0 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -21,6 +21,7 @@ pub mod snapshot; pub mod stream; pub mod user; +use iggy_common::Either; use left_right::*; use std::cell::UnsafeCell; use std::sync::Arc; @@ -63,12 +64,16 @@ where } /// Parses type-erased input into a command. Macro-generated. -/// Returns `Ok(cmd)` if applicable, `Err(input)` to pass ownership back. +/// Returns: +/// - `Ok(Either::Left(cmd))` if applicable +/// - `Ok(Either::Right(input))` to pass ownership back +/// - `Err(error)` for malformed payload/parse errors pub trait Command { type Cmd; type Input; + type Error; - fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>; + fn parse(input: Self::Input) -> Result<Either<Self::Cmd, Self::Input>, Self::Error>; } /// Per-command handler for a given state type. @@ -126,19 +131,20 @@ where } } -/// Public interface for state machines. -/// Returns `Ok(output)` if applicable, `Err(input)` to pass ownership back. +/// Public interface for state handlers. pub trait State { type Output; type Input; + type Error; - fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input>; + fn apply(&self, input: Self::Input) -> Result<Either<Self::Output, Self::Input>, Self::Error>; } pub trait StateMachine { type Input; type Output; - fn update(&self, input: Self::Input) -> Self::Output; + type Error; + fn update(&self, input: Self::Input) -> Result<Self::Output, Self::Error>; } /// Generates the state's inner struct and wrapper type. @@ -223,21 +229,22 @@ macro_rules! collect_handlers { impl $crate::stm::Command for [<$state Inner>] { type Cmd = [<$state Command>]; type Input = ::iggy_common::message::Message<::iggy_common::header::PrepareHeader>; + type Error = ::iggy_common::IggyError; - fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input> { + fn parse(input: Self::Input) -> Result<::iggy_common::Either<Self::Cmd, Self::Input>, Self::Error> { use ::iggy_common::BytesSerializable; + use ::iggy_common::Either; use ::iggy_common::header::Operation; match input.header().operation { $( Operation::$operation => { let body = input.body_bytes(); - Ok([<$state Command>]::$operation( - $operation::from_bytes(body).unwrap() - )) + let cmd = $operation::from_bytes(body)?; + Ok(Either::Left([<$state Command>]::$operation(cmd))) }, )* - _ => Err(input), + _ => Ok(Either::Right(input)), } } } @@ -257,11 +264,20 @@ macro_rules! collect_handlers { impl $crate::stm::State for $state { type Input = <[<$state Inner>] as $crate::stm::Command>::Input; type Output = (); + type Error = <[<$state Inner>] as $crate::stm::Command>::Error; - fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input> { - let cmd = <[<$state Inner>] as $crate::stm::Command>::parse(input)?; - self.inner.do_apply(cmd); - Ok(()) + fn apply(&self, input: Self::Input) -> Result<::iggy_common::Either<Self::Output, Self::Input>, Self::Error> { + use ::iggy_common::Either; + + match <[<$state Inner>] as $crate::stm::Command>::parse(input)? { + Either::Left(cmd) => { + self.inner.do_apply(cmd); + Ok(Either::Left(())) + } + Either::Right(input) => { + Ok(Either::Right(input)) + } + } } } diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs index ae514e5c4..70830355c 100644 --- a/core/metadata/src/stm/mux.rs +++ b/core/metadata/src/stm/mux.rs @@ -16,6 +16,7 @@ // under the License. use crate::stm::snapshot::{FillSnapshot, RestoreSnapshot, SnapshotError}; +use iggy_common::Either; use iggy_common::{header::PrepareHeader, message::Message}; use crate::stm::{State, StateMachine}; @@ -44,8 +45,9 @@ where { type Input = T::Input; type Output = T::Output; + type Error = T::Error; - fn update(&self, input: Self::Input) -> Self::Output { + fn update(&self, input: Self::Input) -> Result<Self::Output, Self::Error> { self.inner.update(input) } } @@ -70,24 +72,28 @@ impl StateMachine for () { // TODO: Make sure that the `Output` matches to the output type of the rest of list. // TODO: Add a trait bound to the output that will allow us to get the response in bytes. type Output = (); + type Error = iggy_common::IggyError; - fn update(&self, _input: Self::Input) -> Self::Output {} + fn update(&self, _input: Self::Input) -> Result<Self::Output, Self::Error> { + Ok(()) + } } // Recursive case: process head and recurse on tail -// No Clone bound needed - ownership passes through via Result -impl<O, S, Rest> StateMachine for variadic!(S, ...Rest) +// No Clone bound needed - ownership passes through via `Either` +impl<O, E, S, Rest> StateMachine for variadic!(S, ...Rest) where - S: State<Output = O>, - Rest: StateMachine<Input = S::Input, Output = O>, + S: State<Output = O, Error = E>, + Rest: StateMachine<Input = S::Input, Output = O, Error = E>, { type Input = Rest::Input; type Output = O; + type Error = E; - fn update(&self, input: Self::Input) -> Self::Output { - match self.0.apply(input) { - Ok(result) => result, - Err(input) => self.1.update(input), + fn update(&self, input: Self::Input) -> Result<Self::Output, Self::Error> { + match self.0.apply(input)? { + Either::Left(result) => Ok(result), + Either::Right(input) => self.1.update(input), } } } @@ -160,7 +166,7 @@ mod tests { let input = Message::new(std::mem::size_of::<PrepareHeader>()); - mux.update(input); + let _ = mux.update(input); } #[test]
