This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch improve_state_machine_apply
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit c1d56fa11da934874787e12411597d887a615613
Author: numinex <[email protected]>
AuthorDate: Tue Feb 17 14:04:22 2026 +0100

    refactor(metadata): improve the stm update method
---
 core/common/src/lib.rs                      |  1 +
 core/common/src/types/{mod.rs => either.rs} | 28 ++++--------------
 core/common/src/types/mod.rs                |  1 +
 core/metadata/src/stm/mod.rs                | 46 +++++++++++++++++++----------
 core/metadata/src/stm/mux.rs                | 28 +++++++++++-------
 5 files changed, 55 insertions(+), 49 deletions(-)

diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 20e0de846..a87c07b5c 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -90,6 +90,7 @@ pub use types::consumer::consumer_offset::*;
 pub use types::consumer::consumer_offset_info::*;
 pub use types::consumer::consumer_offsets::*;
 pub use types::diagnostic::diagnostic_event::DiagnosticEvent;
+pub use types::either::Either;
 pub use types::identifier::*;
 pub use types::message::*;
 pub use types::partition::*;
diff --git a/core/common/src/types/mod.rs b/core/common/src/types/either.rs
similarity index 57%
copy from core/common/src/types/mod.rs
copy to core/common/src/types/either.rs
index 79c1a1df7..684ce90dd 100644
--- a/core/common/src/types/mod.rs
+++ b/core/common/src/types/either.rs
@@ -15,26 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consensus;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod personal_access_tokens;
-pub(crate) mod segment;
-pub(crate) mod segment_storage;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod streaming_stats;
-pub(crate) mod topic;
-pub(crate) mod user;
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Either<L, R> {
+    Left(L),
+    Right(R),
+}
diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs
index 79c1a1df7..1aaf2aae7 100644
--- a/core/common/src/types/mod.rs
+++ b/core/common/src/types/mod.rs
@@ -25,6 +25,7 @@ pub(crate) mod configuration;
 pub(crate) mod consensus;
 pub(crate) mod consumer;
 pub(crate) mod diagnostic;
+pub(crate) mod either;
 pub(crate) mod identifier;
 pub(crate) mod message;
 pub(crate) mod partition;
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index d5f3d573e..7de1992d0 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -21,6 +21,7 @@ pub mod snapshot;
 pub mod stream;
 pub mod user;
 
+use iggy_common::Either;
 use left_right::*;
 use std::cell::UnsafeCell;
 use std::sync::Arc;
@@ -63,12 +64,16 @@ where
 }
 
 /// Parses type-erased input into a command. Macro-generated.
-/// Returns `Ok(cmd)` if applicable, `Err(input)` to pass ownership back.
+/// Returns:
+/// - `Ok(Either::Left(cmd))` if applicable
+/// - `Ok(Either::Right(input))` to pass ownership back
+/// - `Err(error)` for malformed payload/parse errors
 pub trait Command {
     type Cmd;
     type Input;
+    type Error;
 
-    fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>;
+    fn parse(input: Self::Input) -> Result<Either<Self::Cmd, Self::Input>, 
Self::Error>;
 }
 
 /// Per-command handler for a given state type.
@@ -126,19 +131,20 @@ where
     }
 }
 
-/// Public interface for state machines.
-/// Returns `Ok(output)` if applicable, `Err(input)` to pass ownership back.
+/// Public interface for state handlers.
 pub trait State {
     type Output;
     type Input;
+    type Error;
 
-    fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input>;
+    fn apply(&self, input: Self::Input) -> Result<Either<Self::Output, 
Self::Input>, Self::Error>;
 }
 
 pub trait StateMachine {
     type Input;
     type Output;
-    fn update(&self, input: Self::Input) -> Self::Output;
+    type Error;
+    fn update(&self, input: Self::Input) -> Result<Self::Output, Self::Error>;
 }
 
 /// Generates the state's inner struct and wrapper type.
@@ -223,21 +229,22 @@ macro_rules! collect_handlers {
             impl $crate::stm::Command for [<$state Inner>] {
                 type Cmd = [<$state Command>];
                 type Input = 
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
+                type Error = ::iggy_common::IggyError;
 
-                fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input> 
{
+                fn parse(input: Self::Input) -> 
Result<::iggy_common::Either<Self::Cmd, Self::Input>, Self::Error> {
                     use ::iggy_common::BytesSerializable;
+                    use ::iggy_common::Either;
                     use ::iggy_common::header::Operation;
 
                     match input.header().operation {
                         $(
                             Operation::$operation => {
                                 let body = input.body_bytes();
-                                Ok([<$state Command>]::$operation(
-                                    $operation::from_bytes(body).unwrap()
-                                ))
+                                let cmd = $operation::from_bytes(body)?;
+                                Ok(Either::Left([<$state 
Command>]::$operation(cmd)))
                             },
                         )*
-                        _ => Err(input),
+                        _ => Ok(Either::Right(input)),
                     }
                 }
             }
@@ -257,11 +264,20 @@ macro_rules! collect_handlers {
             impl $crate::stm::State for $state {
                 type Input = <[<$state Inner>] as $crate::stm::Command>::Input;
                 type Output = ();
+                type Error = <[<$state Inner>] as $crate::stm::Command>::Error;
 
-                fn apply(&self, input: Self::Input) -> Result<Self::Output, 
Self::Input> {
-                    let cmd = <[<$state Inner>] as 
$crate::stm::Command>::parse(input)?;
-                    self.inner.do_apply(cmd);
-                    Ok(())
+                fn apply(&self, input: Self::Input) -> 
Result<::iggy_common::Either<Self::Output, Self::Input>, Self::Error> {
+                    use ::iggy_common::Either;
+
+                    match <[<$state Inner>] as 
$crate::stm::Command>::parse(input)? {
+                        Either::Left(cmd) => {
+                            self.inner.do_apply(cmd);
+                            Ok(Either::Left(()))
+                        }
+                        Either::Right(input) => {
+                            Ok(Either::Right(input))
+                        }
+                    }
                 }
             }
 
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index ae514e5c4..70830355c 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use crate::stm::snapshot::{FillSnapshot, RestoreSnapshot, SnapshotError};
+use iggy_common::Either;
 use iggy_common::{header::PrepareHeader, message::Message};
 
 use crate::stm::{State, StateMachine};
@@ -44,8 +45,9 @@ where
 {
     type Input = T::Input;
     type Output = T::Output;
+    type Error = T::Error;
 
-    fn update(&self, input: Self::Input) -> Self::Output {
+    fn update(&self, input: Self::Input) -> Result<Self::Output, Self::Error> {
         self.inner.update(input)
     }
 }
@@ -70,24 +72,28 @@ impl StateMachine for () {
     // TODO: Make sure that the `Output` matches to the output type of the 
rest of list.
     // TODO: Add a trait bound to the output that will allow us to get the 
response in bytes.
     type Output = ();
+    type Error = iggy_common::IggyError;
 
-    fn update(&self, _input: Self::Input) -> Self::Output {}
+    fn update(&self, _input: Self::Input) -> Result<Self::Output, Self::Error> 
{
+        Ok(())
+    }
 }
 
 // Recursive case: process head and recurse on tail
-// No Clone bound needed - ownership passes through via Result
-impl<O, S, Rest> StateMachine for variadic!(S, ...Rest)
+// No Clone bound needed - ownership passes through via `Either`
+impl<O, E, S, Rest> StateMachine for variadic!(S, ...Rest)
 where
-    S: State<Output = O>,
-    Rest: StateMachine<Input = S::Input, Output = O>,
+    S: State<Output = O, Error = E>,
+    Rest: StateMachine<Input = S::Input, Output = O, Error = E>,
 {
     type Input = Rest::Input;
     type Output = O;
+    type Error = E;
 
-    fn update(&self, input: Self::Input) -> Self::Output {
-        match self.0.apply(input) {
-            Ok(result) => result,
-            Err(input) => self.1.update(input),
+    fn update(&self, input: Self::Input) -> Result<Self::Output, Self::Error> {
+        match self.0.apply(input)? {
+            Either::Left(result) => Ok(result),
+            Either::Right(input) => self.1.update(input),
         }
     }
 }
@@ -160,7 +166,7 @@ mod tests {
 
         let input = Message::new(std::mem::size_of::<PrepareHeader>());
 
-        mux.update(input);
+        let _ = mux.update(input);
     }
 
     #[test]

Reply via email to