This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch plane_demuxer
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 035fe3eaf38c4cce1320c5097fe606b2ee18ccbe
Author: numinex <[email protected]>
AuthorDate: Thu Feb 19 11:19:17 2026 +0100

    good
---
 core/common/src/lib.rs                     |   1 +
 core/common/src/macros.rs                  |  27 +++++++
 core/common/src/types/consensus/header.rs  |  88 ++++++++++++++++++---
 core/common/src/types/consensus/message.rs |  37 ++++++---
 core/consensus/src/lib.rs                  |  15 +++-
 core/consensus/src/plane_mux.rs            | 121 +++++++++++++++++++++++++++++
 core/metadata/src/impls/metadata.rs        |  55 ++++++++++++-
 core/metadata/src/stm/mux.rs               |  13 +---
 core/partitions/src/iggy_partitions.rs     |  36 +++++++--
 core/simulator/src/deps.rs                 |   6 +-
 core/simulator/src/lib.rs                  |  46 ++---------
 core/simulator/src/replica.rs              |  31 +++++---
 12 files changed, 378 insertions(+), 98 deletions(-)

diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index a87c07b5c..6e8b4dcf7 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -22,6 +22,7 @@ mod certificates;
 mod commands;
 mod deduplication;
 mod error;
+mod macros;
 mod sender;
 pub mod sharding;
 mod traits;
diff --git a/core/common/src/macros.rs b/core/common/src/macros.rs
new file mode 100644
index 000000000..c7c1ce3f6
--- /dev/null
+++ b/core/common/src/macros.rs
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_export]
+macro_rules! variadic {
+    () => (());
+    (...$a:ident $(,)?) => ($a);
+    (...$a:expr $(,)?) => ($a);
+    ($a:ident $(,)?) => (($a, ()));
+    ($a:expr $(,)?) => (($a, ()));
+    ($a:ident, $($b:tt)+) => (($a, $crate::variadic!($($b)+)));
+    ($a:expr, $($b:tt)+) => (($a, $crate::variadic!($($b)+)));
+}
diff --git a/core/common/src/types/consensus/header.rs 
b/core/common/src/types/consensus/header.rs
index c8d311fa7..bda75e93b 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -21,9 +21,10 @@ use thiserror::Error;
 const HEADER_SIZE: usize = 256;
 pub trait ConsensusHeader: Sized + Pod + Zeroable {
     const COMMAND: Command2;
-    const _SIZE_CHECK: () = assert!(std::mem::size_of::<Self>() == 
HEADER_SIZE);
 
     fn validate(&self) -> Result<(), ConsensusError>;
+    fn operation(&self) -> Operation;
+    fn command(&self) -> Command2;
     fn size(&self) -> u32;
 }
 
@@ -138,6 +139,7 @@ pub struct GenericHeader {
     pub namespace: u64,
     pub reserved_command: [u8; 120],
 }
+const _: () = assert!(core::mem::size_of::<GenericHeader>() == HEADER_SIZE);
 
 unsafe impl Pod for GenericHeader {}
 unsafe impl Zeroable for GenericHeader {}
@@ -145,6 +147,14 @@ unsafe impl Zeroable for GenericHeader {}
 impl ConsensusHeader for GenericHeader {
     const COMMAND: Command2 = Command2::Reserved;
 
+    fn operation(&self) -> Operation {
+        Operation::Default
+    }
+
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         Ok(())
     }
@@ -176,6 +186,7 @@ pub struct RequestHeader {
     pub namespace: u64,
     pub reserved: [u8; 64],
 }
+const _: () = assert!(core::mem::size_of::<RequestHeader>() == HEADER_SIZE);
 
 impl Default for RequestHeader {
     fn default() -> Self {
@@ -207,6 +218,10 @@ unsafe impl Zeroable for RequestHeader {}
 impl ConsensusHeader for RequestHeader {
     const COMMAND: Command2 = Command2::Request;
 
+    fn operation(&self) -> Operation {
+        self.operation
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::Request {
             return Err(ConsensusError::InvalidCommand {
@@ -216,6 +231,9 @@ impl ConsensusHeader for RequestHeader {
         }
         Ok(())
     }
+    fn command(&self) -> Command2 {
+        self.command
+    }
 
     fn size(&self) -> u32 {
         self.size
@@ -248,6 +266,7 @@ pub struct PrepareHeader {
     pub namespace: u64,
     pub reserved: [u8; 32],
 }
+const _: () = assert!(core::mem::size_of::<PrepareHeader>() == HEADER_SIZE);
 
 unsafe impl Pod for PrepareHeader {}
 unsafe impl Zeroable for PrepareHeader {}
@@ -255,6 +274,10 @@ unsafe impl Zeroable for PrepareHeader {}
 impl ConsensusHeader for PrepareHeader {
     const COMMAND: Command2 = Command2::Prepare;
 
+    fn operation(&self) -> Operation {
+        self.operation
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::Prepare {
             return Err(ConsensusError::InvalidCommand {
@@ -264,6 +287,9 @@ impl ConsensusHeader for PrepareHeader {
         }
         Ok(())
     }
+    fn command(&self) -> Command2 {
+        self.command
+    }
 
     fn size(&self) -> u32 {
         self.size
@@ -322,6 +348,7 @@ pub struct PrepareOkHeader {
     pub namespace: u64,
     pub reserved: [u8; 48],
 }
+const _: () = assert!(core::mem::size_of::<PrepareOkHeader>() == HEADER_SIZE);
 
 unsafe impl Pod for PrepareOkHeader {}
 unsafe impl Zeroable for PrepareOkHeader {}
@@ -329,6 +356,13 @@ unsafe impl Zeroable for PrepareOkHeader {}
 impl ConsensusHeader for PrepareOkHeader {
     const COMMAND: Command2 = Command2::PrepareOk;
 
+    fn operation(&self) -> Operation {
+        self.operation
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::PrepareOk {
             return Err(ConsensusError::InvalidCommand {
@@ -390,6 +424,7 @@ pub struct CommitHeader {
     pub namespace: u64,
     pub reserved: [u8; 80],
 }
+const _: () = assert!(core::mem::size_of::<CommitHeader>() == HEADER_SIZE);
 
 unsafe impl Pod for CommitHeader {}
 unsafe impl Zeroable for CommitHeader {}
@@ -397,6 +432,13 @@ unsafe impl Zeroable for CommitHeader {}
 impl ConsensusHeader for CommitHeader {
     const COMMAND: Command2 = Command2::Commit;
 
+    fn operation(&self) -> Operation {
+        Operation::Default
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::Commit {
             return Err(ConsensusError::CommitInvalidCommand2);
@@ -436,6 +478,7 @@ pub struct ReplyHeader {
     pub namespace: u64,
     pub reserved: [u8; 41],
 }
+const _: () = assert!(core::mem::size_of::<ReplyHeader>() == HEADER_SIZE);
 
 unsafe impl Pod for ReplyHeader {}
 unsafe impl Zeroable for ReplyHeader {}
@@ -443,6 +486,13 @@ unsafe impl Zeroable for ReplyHeader {}
 impl ConsensusHeader for ReplyHeader {
     const COMMAND: Command2 = Command2::Reply;
 
+    fn operation(&self) -> Operation {
+        self.operation
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::Reply {
             return Err(ConsensusError::ReplyInvalidCommand2);
@@ -489,19 +539,18 @@ impl Default for ReplyHeader {
 #[repr(C)]
 pub struct StartViewChangeHeader {
     pub checksum: u128,
-    pub checksum_padding: u128,
     pub checksum_body: u128,
-    pub checksum_body_padding: u128,
     pub cluster: u128,
     pub size: u32,
     pub view: u32,
     pub release: u32,
     pub command: Command2,
     pub replica: u8,
-    pub reserved_frame: [u8; 42],
+    pub reserved_frame: [u8; 58],
 
     pub reserved: [u8; 128],
 }
+const _: () = assert!(core::mem::size_of::<StartViewChangeHeader>() == 
HEADER_SIZE);
 
 unsafe impl Pod for StartViewChangeHeader {}
 unsafe impl Zeroable for StartViewChangeHeader {}
@@ -509,6 +558,13 @@ unsafe impl Zeroable for StartViewChangeHeader {}
 impl ConsensusHeader for StartViewChangeHeader {
     const COMMAND: Command2 = Command2::StartViewChange;
 
+    fn operation(&self) -> Operation {
+        Operation::Default
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::StartViewChange {
             return Err(ConsensusError::InvalidCommand {
@@ -536,16 +592,14 @@ impl ConsensusHeader for StartViewChangeHeader {
 #[repr(C)]
 pub struct DoViewChangeHeader {
     pub checksum: u128,
-    pub checksum_padding: u128,
     pub checksum_body: u128,
-    pub checksum_body_padding: u128,
     pub cluster: u128,
     pub size: u32,
     pub view: u32,
     pub release: u32,
     pub command: Command2,
     pub replica: u8,
-    pub reserved_frame: [u8; 42],
+    pub reserved_frame: [u8; 58],
 
     /// The highest op-number in this replica's log.
     /// Used to select the most complete log when log_view values are equal.
@@ -559,6 +613,7 @@ pub struct DoViewChangeHeader {
     pub log_view: u32,
     pub reserved: [u8; 108],
 }
+const _: () = assert!(core::mem::size_of::<DoViewChangeHeader>() == 
HEADER_SIZE);
 
 unsafe impl Pod for DoViewChangeHeader {}
 unsafe impl Zeroable for DoViewChangeHeader {}
@@ -566,6 +621,13 @@ unsafe impl Zeroable for DoViewChangeHeader {}
 impl ConsensusHeader for DoViewChangeHeader {
     const COMMAND: Command2 = Command2::DoViewChange;
 
+    fn operation(&self) -> Operation {
+        Operation::Default
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::DoViewChange {
             return Err(ConsensusError::InvalidCommand {
@@ -609,16 +671,14 @@ impl ConsensusHeader for DoViewChangeHeader {
 #[repr(C)]
 pub struct StartViewHeader {
     pub checksum: u128,
-    pub checksum_padding: u128,
     pub checksum_body: u128,
-    pub checksum_body_padding: u128,
     pub cluster: u128,
     pub size: u32,
     pub view: u32,
     pub release: u32,
     pub command: Command2,
     pub replica: u8,
-    pub reserved_frame: [u8; 42],
+    pub reserved_frame: [u8; 58],
 
     /// The op-number of the highest entry in the new primary's log.
     /// Backups set their op to this value.
@@ -629,6 +689,7 @@ pub struct StartViewHeader {
     pub commit: u64,
     pub reserved: [u8; 112],
 }
+const _: () = assert!(core::mem::size_of::<StartViewHeader>() == HEADER_SIZE);
 
 unsafe impl Pod for StartViewHeader {}
 unsafe impl Zeroable for StartViewHeader {}
@@ -636,6 +697,13 @@ unsafe impl Zeroable for StartViewHeader {}
 impl ConsensusHeader for StartViewHeader {
     const COMMAND: Command2 = Command2::StartView;
 
+    fn operation(&self) -> Operation {
+        Operation::Default
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::StartView {
             return Err(ConsensusError::InvalidCommand {
diff --git a/core/common/src/types/consensus/message.rs 
b/core/common/src/types/consensus/message.rs
index 5fe7e0843..7ef39cdca 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -22,6 +22,25 @@ use crate::{
 use bytes::Bytes;
 use std::marker::PhantomData;
 
+// TODO: Rename this to Message and ConsensusHeader to Header.
+pub trait ConsensusMessage<H>
+where
+    H: ConsensusHeader,
+{
+    // TODO: fn body(&self) -> Something;
+    fn header(&self) -> &H;
+}
+
+impl<H> ConsensusMessage<H> for Message<H>
+where
+    H: ConsensusHeader,
+{
+    fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::from_bytes(header_bytes)
+    }
+}
+
 #[derive(Debug, Clone)]
 pub struct Message<H: ConsensusHeader> {
     buffer: Bytes,
@@ -32,6 +51,13 @@ impl<H> Message<H>
 where
     H: ConsensusHeader,
 {
+    #[inline]
+    #[allow(unused)]
+    pub fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::from_bytes(header_bytes)
+    }
+
     /// Create a new message from a buffer.
     ///
     /// # Safety
@@ -115,17 +141,6 @@ where
         }
     }
 
-    /// Get a reference to the header using zero-copy access.
-    ///
-    /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
-    /// without any copying or allocation.
-    #[inline]
-    #[allow(unused)]
-    pub fn header(&self) -> &H {
-        let header_bytes = &self.buffer[..size_of::<H>()];
-        bytemuck::from_bytes(header_bytes)
-    }
-
     /// Get a reference to the message body (everything after the header).
     ///
     /// Returns an empty slice if there is no body.
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 767ee9bf8..f021f3d11 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use iggy_common::header::ConsensusHeader;
+use iggy_common::message::ConsensusMessage;
 use message_bus::MessageBus;
 
 pub trait Project<T, C: Consensus> {
@@ -53,7 +54,7 @@ pub trait Pipeline {
 pub trait Consensus: Sized {
     type MessageBus: MessageBus;
     #[rustfmt::skip] // Scuffed formatter.
-    type Message<H> where H: ConsensusHeader;
+    type Message<H>: ConsensusMessage<H> where H: ConsensusHeader;
 
     type RequestHeader: ConsensusHeader;
     type ReplicateHeader: ConsensusHeader;
@@ -95,8 +96,20 @@ where
     fn on_ack(&self, message: C::Message<C::AckHeader>) -> impl Future<Output 
= ()>;
 }
 
+pub trait PlaneIdentity<C>
+where
+    C: Consensus,
+{
+    fn is_applicable<H>(&self, message: &C::Message<H>) -> bool
+    where
+        H: ConsensusHeader,
+        C::Message<H>: ConsensusMessage<H>;
+}
+
 mod impls;
 pub use impls::*;
+mod plane_mux;
+pub use plane_mux::*;
 mod plane_helpers;
 pub use plane_helpers::*;
 
diff --git a/core/consensus/src/plane_mux.rs b/core/consensus/src/plane_mux.rs
new file mode 100644
index 000000000..6e849be5b
--- /dev/null
+++ b/core/consensus/src/plane_mux.rs
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{Consensus, Plane, PlaneIdentity, Project};
+use iggy_common::variadic;
+
+#[derive(Debug)]
+pub struct MuxPlane<T> {
+    inner: T,
+}
+
+impl<T> MuxPlane<T> {
+    pub fn new(inner: T) -> Self {
+        Self { inner }
+    }
+
+    pub fn inner(&self) -> &T {
+        &self.inner
+    }
+
+    pub fn inner_mut(&mut self) -> &mut T {
+        &mut self.inner
+    }
+}
+
+impl<C, T> Plane<C> for MuxPlane<T>
+where
+    C: Consensus,
+    T: Plane<C>,
+{
+    async fn on_request(&self, message: C::Message<C::RequestHeader>)
+    where
+        C::Message<C::RequestHeader>:
+            Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone,
+    {
+        self.inner.on_request(message).await;
+    }
+
+    async fn on_replicate(&self, message: C::Message<C::ReplicateHeader>)
+    where
+        C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, 
Consensus = C> + Clone,
+    {
+        self.inner.on_replicate(message).await;
+    }
+
+    async fn on_ack(&self, message: C::Message<C::AckHeader>) {
+        self.inner.on_ack(message).await;
+    }
+}
+
+impl<C> Plane<C> for ()
+where
+    C: Consensus,
+{
+    async fn on_request(&self, _message: C::Message<C::RequestHeader>)
+    where
+        C::Message<C::RequestHeader>:
+            Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone,
+    {
+    }
+
+    async fn on_replicate(&self, _message: C::Message<C::ReplicateHeader>)
+    where
+        C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, 
Consensus = C> + Clone,
+    {
+    }
+
+    async fn on_ack(&self, _message: C::Message<C::AckHeader>) {}
+}
+
+impl<C, Head, Tail> Plane<C> for variadic!(Head, ...Tail)
+where
+    C: Consensus,
+    Head: Plane<C> + PlaneIdentity<C>,
+    Tail: Plane<C>,
+{
+    async fn on_request(&self, message: C::Message<C::RequestHeader>)
+    where
+        C::Message<C::RequestHeader>:
+            Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone,
+    {
+        if self.0.is_applicable(&message) {
+            self.0.on_request(message).await;
+        } else {
+            self.1.on_request(message).await;
+        }
+    }
+
+    async fn on_replicate(&self, message: C::Message<C::ReplicateHeader>)
+    where
+        C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, 
Consensus = C> + Clone,
+    {
+        if self.0.is_applicable(&message) {
+            self.0.on_replicate(message).await;
+        } else {
+            self.1.on_replicate(message).await;
+        }
+    }
+
+    async fn on_ack(&self, message: C::Message<C::AckHeader>) {
+        if self.0.is_applicable(&message) {
+            self.0.on_ack(message).await;
+        } else {
+            self.1.on_ack(message).await;
+        }
+    }
+}
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 4ff74f4e2..ce4e324c6 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -17,14 +17,17 @@
 use crate::stm::StateMachine;
 use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, 
SnapshotError};
 use consensus::{
-    Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, 
VsrConsensus, ack_preflight,
-    ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit,
+    Consensus, Pipeline, PipelineEntry, Plane, PlaneIdentity, Project, 
Sequencer, VsrConsensus,
+    ack_preflight, ack_quorum_reached, build_reply_message, 
fence_old_prepare_by_commit,
     panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common, 
replicate_preflight,
     replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
 };
 use iggy_common::{
-    header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, 
RequestHeader},
-    message::Message,
+    header::{
+        Command2, ConsensusHeader, GenericHeader, Operation, PrepareHeader, 
PrepareOkHeader,
+        RequestHeader,
+    },
+    message::{ConsensusMessage, Message},
 };
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
@@ -239,6 +242,50 @@ where
     }
 }
 
+impl<B, P, J, S, M> PlaneIdentity<VsrConsensus<B, P>> for 
IggyMetadata<VsrConsensus<B, P>, J, S, M>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+    J: JournalHandle,
+    J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
+    M: StateMachine<Input = Message<PrepareHeader>>,
+{
+    fn is_applicable<H>(&self, message: &<VsrConsensus<B, P> as 
Consensus>::Message<H>) -> bool
+    where
+        H: ConsensusHeader,
+        <VsrConsensus<B, P> as Consensus>::Message<H>: ConsensusMessage<H>,
+    {
+        assert!(matches!(
+            message.header().command(),
+            Command2::Request | Command2::Prepare | Command2::PrepareOk
+        ));
+        let operation = message.header().operation();
+        // TODO: Use better heuristic, smth like greater or equal based on op 
number.
+        matches!(
+            operation,
+            Operation::CreateStream
+                | Operation::UpdateStream
+                | Operation::DeleteStream
+                | Operation::PurgeStream
+                | Operation::CreateTopic
+                | Operation::UpdateTopic
+                | Operation::DeleteTopic
+                | Operation::PurgeTopic
+                | Operation::CreatePartitions
+                | Operation::DeletePartitions
+                | Operation::CreateConsumerGroup
+                | Operation::DeleteConsumerGroup
+                | Operation::CreateUser
+                | Operation::UpdateUser
+                | Operation::DeleteUser
+                | Operation::ChangePassword
+                | Operation::UpdatePermissions
+                | Operation::CreatePersonalAccessToken
+                | Operation::DeletePersonalAccessToken
+        )
+    }
+}
+
 impl<B, P, J, S, M> IggyMetadata<VsrConsensus<B, P>, J, S, M>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 70830355c..52a3c8bdc 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -18,6 +18,7 @@
 use crate::stm::snapshot::{FillSnapshot, RestoreSnapshot, SnapshotError};
 use iggy_common::Either;
 use iggy_common::{header::PrepareHeader, message::Message};
+use iggy_common::variadic;
 
 use crate::stm::{State, StateMachine};
 
@@ -52,18 +53,6 @@ where
     }
 }
 
-//TODO: Move to common
-#[macro_export]
-macro_rules! variadic {
-    () => ( () );
-    (...$a:ident  $(,)? ) => ( $a );
-    (...$a:expr  $(,)? ) => ( $a );
-    ($a:ident  $(,)? ) => ( ($a, ()) );
-    ($a:expr  $(,)? ) => ( ($a, ()) );
-    ($a:ident,  $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) );
-    ($a:expr,  $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) );
-}
-
 // TODO: Figure out how to get around the fact that we need to hardcode the 
Input/Output type for base case.
 // TODO: I think we could move the base case to the impl site of `State`, so 
this way we know the `Input` and `Output` types.
 // Base case of the recursive resolution.
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 16ed124ab..255a31fff 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -21,15 +21,19 @@ use crate::IggyPartition;
 use crate::Partition;
 use crate::types::PartitionsConfig;
 use consensus::{
-    Consensus, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, 
ack_preflight,
-    ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, 
pipeline_prepare_common,
-    replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as 
send_prepare_ok_common,
+    Consensus, PipelineEntry, Plane, PlaneIdentity, Project, Sequencer, 
VsrConsensus,
+    ack_preflight, ack_quorum_reached, build_reply_message, 
fence_old_prepare_by_commit,
+    pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
+    send_prepare_ok as send_prepare_ok_common,
 };
+use iggy_common::header::Command2;
 use iggy_common::{
     INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, 
PartitionStats, PooledBuffer,
     Segment, SegmentStorage,
-    header::{GenericHeader, Operation, PrepareHeader, PrepareOkHeader, 
RequestHeader},
-    message::Message,
+    header::{
+        ConsensusHeader, GenericHeader, Operation, PrepareHeader, 
PrepareOkHeader, RequestHeader,
+    },
+    message::{ConsensusMessage, Message},
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
 use message_bus::MessageBus;
@@ -464,6 +468,28 @@ where
     }
 }
 
+impl<B> PlaneIdentity<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+{
+    fn is_applicable<H>(&self, message: &<VsrConsensus<B> as 
Consensus>::Message<H>) -> bool
+    where
+        H: ConsensusHeader,
+        <VsrConsensus<B> as Consensus>::Message<H>: ConsensusMessage<H>,
+    {
+        assert!(matches!(
+            message.header().command(),
+            Command2::Request | Command2::Prepare | Command2::PrepareOk
+        ));
+        let operation = message.header().operation();
+        // TODO: Use better selection, smth like greater or equal based on op 
number.
+        matches!(
+            operation,
+            Operation::DeleteSegments | Operation::SendMessages | 
Operation::StoreConsumerOffset
+        )
+    }
+}
+
 impl<B> IggyPartitions<VsrConsensus<B>>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 25175ea8e..d6fab8623 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -17,14 +17,15 @@
 
 use crate::bus::SharedMemBus;
 use bytes::Bytes;
-use consensus::VsrConsensus;
+use consensus::{MuxPlane, VsrConsensus};
+use iggy_common::variadic;
 use iggy_common::header::PrepareHeader;
 use iggy_common::message::Message;
 use journal::{Journal, JournalHandle, Storage};
 use metadata::stm::consumer_group::ConsumerGroups;
 use metadata::stm::stream::Streams;
 use metadata::stm::user::Users;
-use metadata::{IggyMetadata, MuxStateMachine, variadic};
+use metadata::{IggyMetadata, MuxStateMachine};
 use std::cell::{Cell, RefCell, UnsafeCell};
 use std::collections::HashMap;
 
@@ -160,3 +161,4 @@ pub type SimMetadata = IggyMetadata<
 
 /// Type alias for simulator partitions
 pub type ReplicaPartitions = 
partitions::IggyPartitions<VsrConsensus<SharedMemBus>>;
+pub type SimPlane = MuxPlane<variadic!(SimMetadata, ReplicaPartitions)>;
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 44e063f78..cc848e0ed 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -37,7 +37,7 @@ impl Simulator {
     /// Initialize a partition on all replicas (in-memory for simulation)
     pub fn init_partition(&mut self, namespace: 
iggy_common::sharding::IggyNamespace) {
         for replica in &mut self.replicas {
-            replica.partitions.init_partition_in_memory(namespace);
+            replica.init_partition_in_memory(namespace);
         }
     }
 
@@ -115,46 +115,10 @@ impl Simulator {
     }
 
     async fn dispatch_to_replica(&self, replica: &Replica, message: 
Message<GenericHeader>) {
-        let message: MessageBag = message.into();
-        let operation = match &message {
-            MessageBag::Request(message) => message.header().operation,
-            MessageBag::Prepare(message) => message.header().operation,
-            MessageBag::PrepareOk(message) => message.header().operation,
-        } as u8;
-
-        if operation < 200 {
-            self.dispatch_to_metadata_on_replica(replica, message).await;
-        } else {
-            self.dispatch_to_partition_on_replica(replica, message)
-                .await;
-        }
-    }
-
-    async fn dispatch_to_metadata_on_replica(&self, replica: &Replica, 
message: MessageBag) {
-        match message {
-            MessageBag::Request(request) => {
-                replica.metadata.on_request(request).await;
-            }
-            MessageBag::Prepare(prepare) => {
-                replica.metadata.on_replicate(prepare).await;
-            }
-            MessageBag::PrepareOk(prepare_ok) => {
-                replica.metadata.on_ack(prepare_ok).await;
-            }
-        }
-    }
-
-    async fn dispatch_to_partition_on_replica(&self, replica: &Replica, 
message: MessageBag) {
-        match message {
-            MessageBag::Request(request) => {
-                replica.partitions.on_request(request).await;
-            }
-            MessageBag::Prepare(prepare) => {
-                replica.partitions.on_replicate(prepare).await;
-            }
-            MessageBag::PrepareOk(prepare_ok) => {
-                replica.partitions.on_ack(prepare_ok).await;
-            }
+        match MessageBag::from(message) {
+            MessageBag::Request(request) => 
replica.plane.on_request(request).await,
+            MessageBag::Prepare(prepare) => 
replica.plane.on_replicate(prepare).await,
+            MessageBag::PrepareOk(prepare_ok) => 
replica.plane.on_ack(prepare_ok).await,
         }
     }
 }
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index ae4631e95..c975ce8d6 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -17,23 +17,23 @@
 
 use crate::bus::{MemBus, SharedMemBus};
 use crate::deps::{
-    MemStorage, ReplicaPartitions, SimJournal, SimMetadata, 
SimMuxStateMachine, SimSnapshot,
+    MemStorage, ReplicaPartitions, SimJournal, SimMuxStateMachine, SimPlane, 
SimSnapshot,
 };
-use consensus::{LocalPipeline, VsrConsensus};
+use consensus::{LocalPipeline, MuxPlane, VsrConsensus};
 use iggy_common::IggyByteSize;
 use iggy_common::sharding::ShardId;
+use iggy_common::variadic;
 use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
 use metadata::stm::stream::{Streams, StreamsInner};
 use metadata::stm::user::{Users, UsersInner};
-use metadata::{IggyMetadata, variadic};
+use metadata::IggyMetadata;
 use partitions::PartitionsConfig;
 use std::sync::Arc;
 
 pub struct Replica {
     pub id: u8,
     pub name: String,
-    pub metadata: SimMetadata,
-    pub partitions: ReplicaPartitions,
+    pub plane: SimPlane,
     pub bus: Arc<MemBus>,
 }
 
@@ -83,17 +83,24 @@ impl Replica {
             ReplicaPartitions::new(ShardId::new(id as u16), partitions_config, 
None)
         };
 
+        let metadata = IggyMetadata {
+            consensus: Some(metadata_consensus),
+            journal: Some(SimJournal::<MemStorage>::default()),
+            snapshot: Some(SimSnapshot::default()),
+            mux_stm: mux,
+        };
+        let plane = MuxPlane::new(variadic!(metadata, partitions));
+
         Self {
             id,
             name,
-            metadata: IggyMetadata {
-                consensus: Some(metadata_consensus),
-                journal: Some(SimJournal::<MemStorage>::default()),
-                snapshot: Some(SimSnapshot::default()),
-                mux_stm: mux,
-            },
-            partitions,
+            plane,
             bus,
         }
     }
+
+    pub fn init_partition_in_memory(&mut self, namespace: 
iggy_common::sharding::IggyNamespace) {
+        // TODO: create an accessor for the partitions within mux plane, same 
for metadata.
+        self.plane.inner_mut().1.0.init_partition_in_memory(namespace);
+    }
 }

Reply via email to