This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch simulator
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 1b799401eb9fadb646e3782a1d5eed0cb4888d7a
Author: numinex <[email protected]>
AuthorDate: Thu Feb 5 18:17:25 2026 +0100

    simulator v1
---
 Cargo.lock                                 |   3 +
 core/common/src/types/consensus/header.rs  |  28 +-
 core/common/src/types/consensus/message.rs |  54 +--
 core/consensus/src/impls.rs                |  98 +++--
 core/consensus/src/lib.rs                  |  18 +-
 core/metadata/Cargo.toml                   |   1 +
 core/metadata/src/impls/metadata.rs        | 160 +++++---
 core/metadata/src/stm/mod.rs               |  25 +-
 core/metadata/src/stm/mux.rs               |  19 +-
 core/metadata/src/stm/stream.rs            |   1 +
 core/simulator/Cargo.toml                  |   2 +
 core/simulator/src/bus.rs                  | 119 ++++--
 core/simulator/src/client.rs               |  83 ++++
 core/simulator/src/deps.rs                 |  32 +-
 core/simulator/src/lib.rs                  | 627 +++++------------------------
 core/simulator/src/main.rs                 | 103 ++++-
 core/simulator/src/replica.rs              |  40 +-
 17 files changed, 670 insertions(+), 743 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index faad05d5b..76de899a0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5666,6 +5666,7 @@ version = "0.1.0"
 dependencies = [
  "ahash 0.8.12",
  "consensus",
+ "futures",
  "iggy_common",
  "journal",
  "left-right",
@@ -8453,8 +8454,10 @@ dependencies = [
 name = "simulator"
 version = "0.1.0"
 dependencies = [
+ "bytemuck",
  "bytes",
  "consensus",
+ "futures",
  "iggy_common",
  "journal",
  "message_bus",
diff --git a/core/common/src/types/consensus/header.rs 
b/core/common/src/types/consensus/header.rs
index 376b6be05..8f9538557 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -114,6 +114,8 @@ pub enum Operation {
     UpdatePermissions = 145,
     CreatePersonalAccessToken = 146,
     DeletePersonalAccessToken = 147,
+
+    Reserved = 200,
 }
 
 #[repr(C)]
@@ -172,6 +174,30 @@ pub struct RequestHeader {
     pub reserved: [u8; 95],
 }
 
+impl Default for RequestHeader {
+    fn default() -> Self {
+        Self {
+            reserved: [0; 95],
+            checksum: 0,
+            checksum_body: 0,
+            cluster: 0,
+            size: 0,
+            epoch: 0,
+            view: 0,
+            release: 0,
+            protocol: 0,
+            command: Default::default(),
+            replica: 0,
+            reserved_frame: [0; 12],
+            client: 0,
+            request_checksum: 0,
+            timestamp: 0,
+            request: 0,
+            operation: Default::default(),
+        }
+    }
+}
+
 unsafe impl Pod for RequestHeader {}
 unsafe impl Zeroable for RequestHeader {}
 
@@ -348,7 +374,7 @@ impl ConsensusHeader for CommitHeader {
 }
 
 #[repr(C)]
-#[derive(Debug, Clone, Copy)]
+#[derive(Default, Debug, Clone, Copy)]
 pub struct ReplyHeader {
     pub checksum: u128,
     pub checksum_body: u128,
diff --git a/core/common/src/types/consensus/message.rs 
b/core/common/src/types/consensus/message.rs
index a9fca6524..dfb9e7090 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -15,9 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{header::RequestHeader, types::consensus::header::{
-    self, CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, 
PrepareOkHeader, ReplyHeader,
-}};
+use crate::{
+    header::RequestHeader,
+    types::consensus::header::{self, ConsensusHeader, PrepareHeader, 
PrepareOkHeader},
+};
 use bytes::Bytes;
 use std::marker::PhantomData;
 
@@ -279,11 +280,8 @@ where
 #[allow(unused)]
 pub enum MessageBag {
     Request(Message<RequestHeader>),
-    Generic(Message<GenericHeader>),
     Prepare(Message<PrepareHeader>),
     PrepareOk(Message<PrepareOkHeader>),
-    Commit(Message<CommitHeader>),
-    Reply(Message<ReplyHeader>),
 }
 
 impl MessageBag {
@@ -291,11 +289,8 @@ impl MessageBag {
     pub fn command(&self) -> header::Command2 {
         match self {
             MessageBag::Request(message) => message.header().command,
-            MessageBag::Generic(message) => message.header().command,
             MessageBag::Prepare(message) => message.header().command,
             MessageBag::PrepareOk(message) => message.header().command,
-            MessageBag::Commit(message) => message.header().command,
-            MessageBag::Reply(message) => message.header().command,
         }
     }
 
@@ -303,11 +298,17 @@ impl MessageBag {
     pub fn size(&self) -> u32 {
         match self {
             MessageBag::Request(message) => message.header().size(),
-            MessageBag::Generic(message) => message.header().size(),
             MessageBag::Prepare(message) => message.header().size(),
             MessageBag::PrepareOk(message) => message.header().size(),
-            MessageBag::Commit(message) => message.header().size(),
-            MessageBag::Reply(message) => message.header().size(),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn operation(&self) -> header::Operation {
+        match self {
+            MessageBag::Request(message) => message.header().operation,
+            MessageBag::Prepare(message) => message.header().operation,
+            MessageBag::PrepareOk(message) => message.header().operation,
         }
     }
 }
@@ -329,21 +330,17 @@ where
                     unsafe { 
Message::<header::PrepareHeader>::from_buffer_unchecked(buffer) };
                 MessageBag::Prepare(msg)
             }
-            header::Command2::Commit => {
-                let msg = unsafe { 
Message::<header::CommitHeader>::from_buffer_unchecked(buffer) };
-                MessageBag::Commit(msg)
-            }
-            header::Command2::Reply => {
-                let msg = unsafe { 
Message::<header::ReplyHeader>::from_buffer_unchecked(buffer) };
-                MessageBag::Reply(msg)
-            },
             header::Command2::Request => {
-                let msg = unsafe { 
Message::<header::RequestHeader>::from_buffer_unchecked(buffer) };
+                let msg =
+                    unsafe { 
Message::<header::RequestHeader>::from_buffer_unchecked(buffer) };
                 MessageBag::Request(msg)
             }
-            _ => unreachable!(
-                "For now we only support Prepare, Commit, and Reply. In the 
future we will support more commands. Command2: {command:?}"
-            ),
+            header::Command2::PrepareOk => {
+                let msg =
+                    unsafe { 
Message::<header::PrepareOkHeader>::from_buffer_unchecked(buffer) };
+                MessageBag::PrepareOk(msg)
+            }
+            _ => unreachable!(),
         }
     }
 }
@@ -507,9 +504,6 @@ mod tests {
 
         assert_eq!(bag.command(), header::Command2::Prepare);
         assert!(matches!(bag, MessageBag::Prepare(_)));
-        assert!(!matches!(bag, MessageBag::Commit(_)));
-        assert!(!matches!(bag, MessageBag::Reply(_)));
-        assert!(!matches!(bag, MessageBag::Generic(_)));
     }
 
     #[test]
@@ -519,9 +513,6 @@ mod tests {
 
         assert_eq!(bag.command(), header::Command2::Commit);
         assert!(!matches!(bag, MessageBag::Prepare(_)));
-        assert!(matches!(bag, MessageBag::Commit(_)));
-        assert!(!matches!(bag, MessageBag::Reply(_)));
-        assert!(!matches!(bag, MessageBag::Generic(_)));
     }
 
     #[test]
@@ -531,9 +522,6 @@ mod tests {
 
         assert_eq!(bag.command(), header::Command2::Reply);
         assert!(!matches!(bag, MessageBag::Prepare(_)));
-        assert!(!matches!(bag, MessageBag::Commit(_)));
-        assert!(matches!(bag, MessageBag::Reply(_)));
-        assert!(!matches!(bag, MessageBag::Generic(_)));
     }
 }
 
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 34b08e654..36b595163 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -27,6 +27,7 @@ use iggy_common::header::{
 };
 use iggy_common::message::Message;
 use message_bus::IggyMessageBus;
+use message_bus::MessageBus;
 use std::cell::{Cell, RefCell};
 use std::collections::VecDeque;
 
@@ -221,12 +222,8 @@ impl LocalPipeline {
         self.prepare_queue.back()
     }
 
-    /// Find a message by op number and checksum.
-    pub fn message_by_op_and_checksum(
-        &mut self,
-        op: u64,
-        checksum: u128,
-    ) -> Option<&mut PipelineEntry> {
+    /// Find a message by op number and checksum (immutable).
+    pub fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> 
Option<&PipelineEntry> {
         let head_op = self.prepare_queue.front()?.message.header().op;
         let tail_op = self.prepare_queue.back()?.message.header().op;
 
@@ -242,7 +239,7 @@ impl LocalPipeline {
         }
 
         let index = (op - head_op) as usize;
-        let entry = self.prepare_queue.get_mut(index)?;
+        let entry = self.prepare_queue.get(index)?;
 
         debug_assert_eq!(entry.message.header().op, op);
 
@@ -323,6 +320,20 @@ impl LocalPipeline {
     pub fn clear(&mut self) {
         self.prepare_queue.clear();
     }
+
+    /// Extract and remove a message by op number.
+    /// Returns None if op is not in the pipeline.
+    pub fn extract_by_op(&mut self, op: u64) -> Option<PipelineEntry> {
+        let head_op = self.prepare_queue.front()?.message.header().op;
+        if op < head_op {
+            return None;
+        }
+        let index = (op - head_op) as usize;
+        if index >= self.prepare_queue.len() {
+            return None;
+        }
+        self.prepare_queue.remove(index)
+    }
 }
 
 impl Pipeline for LocalPipeline {
@@ -337,15 +348,23 @@ impl Pipeline for LocalPipeline {
         LocalPipeline::pop_message(self)
     }
 
+    fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry> {
+        LocalPipeline::extract_by_op(self, op)
+    }
+
     fn clear(&mut self) {
         LocalPipeline::clear(self)
     }
 
+    fn message_by_op(&self, op: u64) -> Option<&Self::Entry> {
+        LocalPipeline::message_by_op(self, op)
+    }
+
     fn message_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry> {
         LocalPipeline::message_by_op_mut(self, op)
     }
 
-    fn message_by_op_and_checksum(&mut self, op: u64, checksum: u128) -> 
Option<&mut Self::Entry> {
+    fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> 
Option<&Self::Entry> {
         LocalPipeline::message_by_op_and_checksum(self, op, checksum)
     }
 
@@ -390,7 +409,10 @@ pub enum VsrAction {
 
 #[allow(unused)]
 #[derive(Debug)]
-pub struct VsrConsensus {
+pub struct VsrConsensus<B = IggyMessageBus>
+where
+    B: MessageBus,
+{
     cluster: u128,
     replica: u8,
     replica_count: u8,
@@ -416,7 +438,7 @@ pub struct VsrConsensus {
 
     pipeline: RefCell<LocalPipeline>,
 
-    message_bus: IggyMessageBus,
+    message_bus: B,
     // TODO: Add loopback_queue for messages to self
     /// Tracks start view change messages received from all replicas 
(including self)
     start_view_change_from_all_replicas: RefCell<BitSet<u32>>,
@@ -434,8 +456,8 @@ pub struct VsrConsensus {
     timeouts: RefCell<TimeoutManager>,
 }
 
-impl VsrConsensus {
-    pub fn new(cluster: u128, replica: u8, replica_count: u8) -> Self {
+impl<B: MessageBus> VsrConsensus<B> {
+    pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B) 
-> Self {
         assert!(
             replica < replica_count,
             "replica index must be < replica_count"
@@ -453,7 +475,7 @@ impl VsrConsensus {
             last_timestamp: Cell::new(0),
             last_prepare_checksum: Cell::new(0),
             pipeline: RefCell::new(LocalPipeline::new()),
-            message_bus: IggyMessageBus::new(replica_count as usize, replica 
as u16, 0),
+            message_bus,
             start_view_change_from_all_replicas: 
RefCell::new(BitSet::with_capacity(REPLICAS_MAX)),
             do_view_change_from_all_replicas: 
RefCell::new(dvc_quorum_array_empty()),
             do_view_change_quorum: Cell::new(false),
@@ -463,6 +485,11 @@ impl VsrConsensus {
         }
     }
 
+    // TODO: More init logic.
+    pub fn init(&self) {
+        self.status.set(Status::Normal);
+    }
+
     pub fn primary_index(&self, view: u32) -> u8 {
         view as u8 % self.replica_count
     }
@@ -999,9 +1026,9 @@ impl VsrConsensus {
     /// Called on the primary when a follower acknowledges a prepare.
     ///
     /// Returns true if quorum was just reached for this op.
-    pub fn handle_prepare_ok(&self, message: Message<PrepareOkHeader>) -> bool 
{
-        let header = message.header();
-
+    /// Handle a PrepareOk message. Returns true if quorum was reached.
+    /// Note: Caller (on_ack) should validate is_primary and status before 
calling.
+    pub fn handle_prepare_ok(&self, header: &PrepareOkHeader) -> bool {
         assert_eq!(header.command, Command2::PrepareOk);
         assert!(
             header.replica < self.replica_count,
@@ -1009,26 +1036,16 @@ impl VsrConsensus {
             header.replica
         );
 
-        // Ignore if not in normal status
-        if self.status() != Status::Normal {
-            return false;
-        }
-
         // Ignore if from older view
         if header.view < self.view() {
             return false;
         }
 
-        // Ignore if from newer view. This shouldn't happen if we're primary
+        // Ignore if from newer view
         if header.view > self.view() {
             return false;
         }
 
-        // We must be primary to process prepare_ok
-        if !self.is_primary() {
-            return false;
-        }
-
         // Ignore if syncing
         if self.is_syncing() {
             return false;
@@ -1047,9 +1064,6 @@ impl VsrConsensus {
             return false;
         }
 
-        // Verify the prepare is for a valid op range
-        let _commit = self.commit();
-
         // Check for duplicate ack
         if entry.has_ack(header.replica) {
             return false;
@@ -1062,22 +1076,19 @@ impl VsrConsensus {
         // Check if we've reached quorum
         if ack_count >= quorum && !entry.ok_quorum_received {
             entry.ok_quorum_received = true;
-
             return true;
         }
 
         false
     }
 
-    pub fn message_bus(&self) -> &IggyMessageBus {
+    pub fn message_bus(&self) -> &B {
         &self.message_bus
     }
 }
 
-impl Project<Message<PrepareHeader>> for Message<RequestHeader> {
-    type Consensus = VsrConsensus;
-
-    fn project(self, consensus: &Self::Consensus) -> Message<PrepareHeader> {
+impl<B: MessageBus> Project<Message<PrepareHeader>, VsrConsensus<B>> for 
Message<RequestHeader> {
+    fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareHeader> {
         let op = consensus.sequencer.current_sequence() + 1;
 
         self.transmute_header(|old, new| {
@@ -1089,6 +1100,7 @@ impl Project<Message<PrepareHeader>> for 
Message<RequestHeader> {
                 release: old.release,
                 command: Command2::Prepare,
                 replica: consensus.replica,
+                client: old.client,
                 parent: 0, // TODO: Get parent checksum from the previous 
entry in the journal (figure out how to pass that ctx here)
                 request_checksum: old.request_checksum,
                 request: old.request,
@@ -1102,10 +1114,8 @@ impl Project<Message<PrepareHeader>> for 
Message<RequestHeader> {
     }
 }
 
-impl Project<Message<PrepareOkHeader>> for Message<PrepareHeader> {
-    type Consensus = VsrConsensus;
-
-    fn project(self, consensus: &Self::Consensus) -> Message<PrepareOkHeader> {
+impl<B: MessageBus> Project<Message<PrepareOkHeader>, VsrConsensus<B>> for 
Message<PrepareHeader> {
+    fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareOkHeader> {
         self.transmute_header(|old, new| {
             *new = PrepareOkHeader {
                 command: Command2::PrepareOk,
@@ -1128,8 +1138,8 @@ impl Project<Message<PrepareOkHeader>> for 
Message<PrepareHeader> {
     }
 }
 
-impl Consensus for VsrConsensus {
-    type MessageBus = IggyMessageBus;
+impl<B: MessageBus> Consensus for VsrConsensus<B> {
+    type MessageBus = B;
 
     type RequestMessage = Message<RequestHeader>;
     type ReplicateMessage = Message<PrepareHeader>;
@@ -1166,9 +1176,9 @@ impl Consensus for VsrConsensus {
         // verify op is sequential
         assert_eq!(
             header.op,
-            self.sequencer.current_sequence() + 1,
+            self.sequencer.current_sequence(),
             "op must be sequential: expected {}, got {}",
-            self.sequencer.current_sequence() + 1,
+            self.sequencer.current_sequence(),
             header.op
         );
 
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 1cf323d94..5c3e152e6 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -17,9 +17,8 @@
 
 use message_bus::MessageBus;
 
-pub trait Project<T> {
-    type Consensus: Consensus;
-    fn project(self, consensus: &Self::Consensus) -> T;
+pub trait Project<T, C: Consensus> {
+    fn project(self, consensus: &C) -> T;
 }
 
 pub trait Pipeline {
@@ -30,11 +29,16 @@ pub trait Pipeline {
 
     fn pop_message(&mut self) -> Option<Self::Entry>;
 
+    /// Extract and remove a message by op number.
+    fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry>;
+
     fn clear(&mut self);
 
+    fn message_by_op(&self, op: u64) -> Option<&Self::Entry>;
+
     fn message_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry>;
 
-    fn message_by_op_and_checksum(&mut self, op: u64, checksum: u128) -> 
Option<&mut Self::Entry>;
+    fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> 
Option<&Self::Entry>;
 
     fn is_full(&self) -> bool;
 
@@ -43,11 +47,11 @@ pub trait Pipeline {
     fn verify(&self);
 }
 
-pub trait Consensus {
+pub trait Consensus: Sized {
     type MessageBus: MessageBus;
     // I am wondering, whether we should create a dedicated trait for cloning, 
so it's explicit that we do ref counting.
-    type RequestMessage: Project<Self::ReplicateMessage, Consensus = Self> + 
Clone;
-    type ReplicateMessage: Project<Self::AckMessage, Consensus = Self> + Clone;
+    type RequestMessage: Project<Self::ReplicateMessage, Self> + Clone;
+    type ReplicateMessage: Project<Self::AckMessage, Self> + Clone;
     type AckMessage;
     type Sequencer: Sequencer;
     type Pipeline: Pipeline<Message = Self::ReplicateMessage>;
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index 193bf5788..05dc872a1 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -30,6 +30,7 @@ readme = "../../../README.md"
 [dependencies]
 ahash = { workspace = true }
 consensus = { workspace = true }
+futures = { workspace = true }
 iggy_common = { workspace = true }
 journal = { workspace = true }
 left-right = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 5932c3e09..7dbf9fe71 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,31 +14,30 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+use crate::stm::StateMachine;
 use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
 use iggy_common::{
-    header::{Command2, PrepareHeader, PrepareOkHeader},
+    header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, 
ReplyHeader},
     message::Message,
 };
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
 use tracing::{debug, warn};
 
-#[expect(unused)]
 pub trait Metadata<C>
 where
     C: Consensus,
 {
     /// Handle a request message.
-    fn on_request(&self, message: C::RequestMessage);
+    fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = 
()>;
 
     /// Handle a replicate message (Prepare in VSR).
     fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output 
= ()>;
 
     /// Handle an ack message (PrepareOk in VSR).
-    fn on_ack(&self, message: C::AckMessage);
+    fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
 }
 
-#[expect(unused)]
 #[derive(Debug)]
 pub struct IggyMetadata<C, J, S, M> {
     /// Some on shard0, None on other shards
@@ -51,25 +50,27 @@ pub struct IggyMetadata<C, J, S, M> {
     pub mux_stm: M,
 }
 
-impl<J, S, M> Metadata<VsrConsensus> for IggyMetadata<VsrConsensus, J, S, M>
+impl<B, J, S, M> Metadata<VsrConsensus<B>> for IggyMetadata<VsrConsensus<B>, 
J, S, M>
 where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     J: JournalHandle,
     J::Target: Journal<
             J::Storage,
-            Entry = <VsrConsensus as Consensus>::ReplicateMessage,
+            Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage,
             Header = PrepareHeader,
         >,
+    M: StateMachine<Input = Message<PrepareHeader>>,
 {
-    fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage) 
{
+    async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::RequestMessage) {
         let consensus = self.consensus.as_ref().unwrap();
 
         // TODO: Bunch of asserts.
         debug!("handling metadata request");
         let prepare = message.project(consensus);
-        self.pipeline_prepare(prepare);
+        self.pipeline_prepare(prepare).await;
     }
 
-    async fn on_replicate(&self, message: <VsrConsensus as 
Consensus>::ReplicateMessage) {
+    async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::ReplicateMessage) {
         let consensus = self.consensus.as_ref().unwrap();
         let journal = self.journal.as_ref().unwrap();
 
@@ -94,20 +95,6 @@ where
 
         let current_op = consensus.sequencer().current_sequence();
 
-        // Old message (handle as repair). Not replicating.
-        if header.view < consensus.view()
-            || (consensus.status() == Status::Normal
-                && header.view == consensus.view()
-                && header.op <= current_op)
-        {
-            debug!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (repair)"
-            );
-            self.on_repair(message);
-            return;
-        }
-
         // If status is not normal, ignore the replicate.
         if consensus.status() != Status::Normal {
             warn!(
@@ -158,7 +145,7 @@ where
         }
     }
 
-    fn on_ack(&self, message: <VsrConsensus as Consensus>::AckMessage) {
+    async fn on_ack(&self, message: <VsrConsensus<B> as 
Consensus>::AckMessage) {
         let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
 
@@ -172,60 +159,112 @@ where
             return;
         }
 
-        // Find the prepare in pipeline
-        let mut pipeline = consensus.pipeline().borrow_mut();
-        let Some(entry) = pipeline.message_by_op_and_checksum(header.op, 
header.prepare_checksum)
-        else {
-            debug!("on_ack: prepare not in pipeline op={}", header.op);
-            return;
-        };
-
-        // Verify checksum matches
-        if entry.message.header().checksum != header.prepare_checksum {
-            warn!("on_ack: checksum mismatch");
-            return;
+        // Verify checksum by checking pipeline entry exists
+        {
+            let pipeline = consensus.pipeline().borrow();
+            let Some(entry) =
+                pipeline.message_by_op_and_checksum(header.op, 
header.prepare_checksum)
+            else {
+                debug!("on_ack: prepare not in pipeline op={}", header.op);
+                return;
+            };
+
+            if entry.message.header().checksum != header.prepare_checksum {
+                warn!("on_ack: checksum mismatch");
+                return;
+            }
         }
 
-        // Record ack
-        let count = entry.add_ack(header.replica);
-
-        // Check quorum
-        if count >= consensus.quorum() && !entry.ok_quorum_received {
-            entry.ok_quorum_received = true;
+        // Let consensus handle the ack increment and quorum check
+        if consensus.handle_prepare_ok(header) {
             debug!("on_ack: quorum received for op={}", header.op);
-
-            // Advance commit number and trigger commit journal
             consensus.advance_commit_number(header.op);
-            self.commit_journal();
+
+            // Extract the prepare message from the pipeline by op
+            let entry = 
consensus.pipeline().borrow_mut().extract_by_op(header.op);
+            let Some(entry) = entry else {
+                warn!("on_ack: prepare not found in pipeline for op={}", 
header.op);
+                return;
+            };
+
+            let prepare = entry.message;
+            let prepare_header = *prepare.header();
+
+            // Apply the state (consumes prepare)
+            // TODO: Handle appending result to response
+            let _result = self.mux_stm.update(prepare);
+            debug!("on_ack: state applied for op={}", prepare_header.op);
+
+            // TODO: Figure out better infra for this, its messy.
+            let reply = 
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>())
+                .transmute_header(|_, new| {
+                    *new = ReplyHeader {
+                        checksum: 0,
+                        checksum_body: 0,
+                        cluster: consensus.cluster(),
+                        size: std::mem::size_of::<ReplyHeader>() as u32,
+                        epoch: prepare_header.epoch,
+                        view: consensus.view(),
+                        release: 0,
+                        protocol: 0,
+                        command: Command2::Reply,
+                        replica: consensus.replica(),
+                        reserved_frame: [0; 12],
+                        request_checksum: prepare_header.request_checksum,
+                        request_checksum_padding: 0,
+                        context: 0,
+                        context_padding: 0,
+                        op: prepare_header.op,
+                        commit: consensus.commit(),
+                        timestamp: prepare_header.timestamp,
+                        request: prepare_header.request,
+                        operation: prepare_header.operation,
+                        ..Default::default()
+                    };
+                });
+
+            // Send reply to client
+            let generic_reply = reply.into_generic();
+            debug!(
+                "on_ack: sending reply to client={} for op={}",
+                prepare_header.client, prepare_header.op
+            );
+
+            // TODO: Error handling
+            consensus
+                .message_bus()
+                .send_to_client(prepare_header.client, generic_reply)
+                .await
+                .unwrap()
         }
     }
 }
 
-impl<J, S, M> IggyMetadata<VsrConsensus, J, S, M>
+impl<B, J, S, M> IggyMetadata<VsrConsensus<B>, J, S, M>
 where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     J: JournalHandle,
     J::Target: Journal<
             J::Storage,
-            Entry = <VsrConsensus as Consensus>::ReplicateMessage,
+            Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage,
             Header = PrepareHeader,
         >,
+    M: StateMachine<Input = Message<PrepareHeader>>,
 {
-    #[expect(unused)]
-    fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
+    async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
 
         debug!("inserting prepare into metadata pipeline");
         consensus.verify_pipeline();
         consensus.pipeline_message(prepare.clone());
 
-        self.on_replicate(prepare.clone());
+        self.on_replicate(prepare.clone()).await;
         consensus.post_replicate_verify(&prepare);
     }
 
     fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool {
-        let (Some(consensus), Some(journal)) = (&self.consensus, 
&self.journal) else {
-            todo!("dispatch fence_old_prepare to shard0");
-        };
+        let consensus = self.consensus.as_ref().unwrap();
+        let journal = self.journal.as_ref().unwrap();
 
         let header = prepare.header();
         // TODO: Handle idx calculation, for now using header.op, but since 
the journal may get compacted, this may not be correct.
@@ -245,7 +284,7 @@ where
         let header = message.header();
 
         // TODO: calculate the index;
-        let idx= header.op as usize;
+        let idx = header.op as usize;
         assert_eq!(header.command, Command2::Prepare);
         assert!(
             journal.handle().header(idx).is_none(),
@@ -282,10 +321,6 @@ where
             .unwrap();
     }
 
-    fn on_repair(&self, _message: Message<PrepareHeader>) {
-        todo!()
-    }
-
     /// Verify hash chain would not break if we add this header.
     fn panic_if_hash_chain_would_break_in_same_view(
         &self,
@@ -309,7 +344,6 @@ where
         // TODO: Implement commit logic
         // Walk through journal from last committed to current commit number
         // Apply each entry to the state machine
-        todo!()
     }
 
     /// Send a prepare_ok message to the primary.
@@ -401,6 +435,12 @@ where
                 "send_prepare_ok: loopback to self"
             );
             // TODO: Queue for self-processing or call handle_prepare_ok 
directly
+            // TODO: This is temporal, to test simulator, but we should send 
message to ourselves properly.
+            consensus
+                .message_bus()
+                .send_to_replica(primary, generic_message)
+                .await
+                .unwrap();
         } else {
             debug!(
                 replica = consensus.replica(),
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index e26651c05..c92b0ca10 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -62,11 +62,12 @@ where
 }
 
 /// Parses type-erased input into a command. Macro-generated.
+/// Returns `Ok(cmd)` if applicable, `Err(input)` to pass ownership back.
 pub trait Command {
     type Cmd;
     type Input;
 
-    fn parse(input: &Self::Input) -> Option<Self::Cmd>;
+    fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>;
 }
 
 /// Handles commands. User-implemented business logic.
@@ -110,17 +111,18 @@ where
 }
 
 /// Public interface for state machines.
+/// Returns `Ok(output)` if applicable, `Err(input)` to pass ownership back.
 pub trait State {
     type Output;
     type Input;
 
-    fn apply(&self, input: &Self::Input) -> Option<Self::Output>;
+    fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input>;
 }
 
 pub trait StateMachine {
     type Input;
     type Output;
-    fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>);
+    fn update(&self, input: Self::Input) -> Self::Output;
 }
 
 /// Generates a state machine with convention-based storage.
@@ -212,32 +214,31 @@ macro_rules! define_state {
                 type Input = <[<$state Inner>] as $crate::stm::Command>::Input;
                 type Output = ();
 
-                fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
-                    <[<$state Inner>] as $crate::stm::Command>::parse(input)
-                        .map(|cmd| self.inner.do_apply(cmd))
+                fn apply(&self, input: Self::Input) -> Result<Self::Output, 
Self::Input> {
+                    let cmd = <[<$state Inner>] as 
$crate::stm::Command>::parse(input)?;
+                    self.inner.do_apply(cmd);
+                    Ok(())
                 }
             }
 
-            // TODO: This can be monomorphized by const generics, instead of 
creating an runtime enum
-            // We can use const generics and specialize each of those methods.
             impl $crate::stm::Command for [<$state Inner>] {
                 type Cmd = [<$state Command>];
                 type Input = 
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
 
-                fn parse(input: &Self::Input) -> Option<Self::Cmd> {
+                fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input> 
{
                     use ::iggy_common::BytesSerializable;
                     use ::iggy_common::header::Operation;
 
-                    let body = input.body_bytes();
                     match input.header().operation {
                         $(
                             Operation::$operation => {
-                                Some([<$state Command>]::$operation(
+                                let body = input.body_bytes();
+                                Ok([<$state Command>]::$operation(
                                     $operation::from_bytes(body).unwrap()
                                 ))
                             },
                         )*
-                        _ => None,
+                        _ => Err(input),
                     }
                 }
             }
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 9928169a6..f68d99bf7 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -44,8 +44,8 @@ where
     type Input = T::Input;
     type Output = T::Output;
 
-    fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>) {
-        self.inner.update(input, output);
+    fn update(&self, input: Self::Input) -> Self::Output {
+        self.inner.update(input)
     }
 }
 
@@ -67,12 +67,14 @@ macro_rules! variadic {
 impl StateMachine for () {
     type Input = Message<PrepareHeader>;
     // TODO: Make sure that the `Output` matches to the output type of the 
rest of list.
+    // TODO: Add a trait bound to the output that will allow us to get the 
response in bytes.
     type Output = ();
 
-    fn update(&self, _input: &Self::Input, _output: &mut Vec<Self::Output>) {}
+    fn update(&self, _input: Self::Input) -> Self::Output {}
 }
 
 // Recursive case: process head and recurse on tail
+// No Clone bound needed - ownership passes through via Result
 impl<O, S, Rest> StateMachine for variadic!(S, ...Rest)
 where
     S: State<Output = O>,
@@ -81,11 +83,11 @@ where
     type Input = Rest::Input;
     type Output = O;
 
-    fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>) {
-        if let Some(result) = self.0.apply(input) {
-            output.push(result);
+    fn update(&self, input: Self::Input) -> Self::Output {
+        match self.0.apply(input) {
+            Ok(result) => result,
+            Err(input) => self.1.update(input),
         }
-        self.1.update(input, output)
     }
 }
 
@@ -105,8 +107,7 @@ mod tests {
         let mux = MuxStateMachine::new(variadic!(users, streams));
 
         let input = Message::new(std::mem::size_of::<PrepareHeader>());
-        let mut output = Vec::new();
 
-        mux.update(&input, &mut output);
+        mux.update(input);
     }
 }
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 5106d6122..d1a3da212 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -182,6 +182,7 @@ define_state! {
         DeletePartitions
     ]
 }
+
 impl_absorb!(StreamsInner, StreamsCommand);
 
 impl StreamsInner {
diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
index 4e611090c..ad11311b0 100644
--- a/core/simulator/Cargo.toml
+++ b/core/simulator/Cargo.toml
@@ -5,7 +5,9 @@ edition = "2024"
 
 [dependencies]
 bytes = { workspace = true }
+bytemuck = { workspace = true }
 consensus = { path = "../consensus" }
+futures = { workspace = true }
 iggy_common = { path = "../common" }
 journal = { path = "../journal" }
 message_bus = { path = "../message_bus" }
diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs
index 6fefdc20a..0df3b19fa 100644
--- a/core/simulator/src/bus.rs
+++ b/core/simulator/src/bus.rs
@@ -1,7 +1,8 @@
-use iggy_common::{header::GenericHeader, message::Message, IggyError};
+use iggy_common::{IggyError, header::GenericHeader, message::Message};
 use message_bus::MessageBus;
-use std::cell::RefCell;
 use std::collections::{HashMap, VecDeque};
+use std::ops::Deref;
+use std::sync::{Arc, Mutex};
 
 /// Message envelope for tracking sender/recipient
 #[derive(Debug, Clone)]
@@ -12,41 +13,29 @@ pub struct Envelope {
     pub message: Message<GenericHeader>,
 }
 
-/// In-memory message bus implementing the MessageBus trait
+// TODO: Proper bus with an `Network` component which would simulate sending 
packets.
+// Tigerbeetle handles this by having an list of "buses", and calling 
callbacks for clients when an response is send.
+// This requires self-referntial structs (as message_bus has to store 
collection of other buses), which is overcomplilcated.
+// I think the way we could handle that is by having an dedicated collection 
for client responses (clients_table).
 #[derive(Debug, Default)]
 pub struct MemBus {
-    clients: RefCell<HashMap<u128, ()>>,
-    replicas: RefCell<HashMap<u8, ()>>,
-    pending_messages: RefCell<VecDeque<Envelope>>,
+    clients: Mutex<HashMap<u128, ()>>,
+    replicas: Mutex<HashMap<u8, ()>>,
+    pending_messages: Mutex<VecDeque<Envelope>>,
 }
 
 impl MemBus {
     pub fn new() -> Self {
         Self {
-            clients: RefCell::new(HashMap::new()),
-            replicas: RefCell::new(HashMap::new()),
-            pending_messages: RefCell::new(VecDeque::new()),
+            clients: Mutex::new(HashMap::new()),
+            replicas: Mutex::new(HashMap::new()),
+            pending_messages: Mutex::new(VecDeque::new()),
         }
     }
 
     /// Get the next pending message from the bus
     pub fn receive(&self) -> Option<Envelope> {
-        self.pending_messages.borrow_mut().pop_front()
-    }
-
-    /// Get all pending messages from the bus
-    pub fn receive_all(&self) -> Vec<Envelope> {
-        self.pending_messages.borrow_mut().drain(..).collect()
-    }
-
-    /// Check if there are pending messages
-    pub fn has_pending(&self) -> bool {
-        !self.pending_messages.borrow().is_empty()
-    }
-
-    /// Get the count of pending messages
-    pub fn pending_count(&self) -> usize {
-        self.pending_messages.borrow().len()
+        self.pending_messages.lock().unwrap().pop_front()
     }
 }
 
@@ -57,27 +46,27 @@ impl MessageBus for MemBus {
     type Sender = ();
 
     fn add_client(&mut self, client: Self::Client, _sender: Self::Sender) -> 
bool {
-        if self.clients.borrow().contains_key(&client) {
+        if self.clients.lock().unwrap().contains_key(&client) {
             return false;
         }
-        self.clients.borrow_mut().insert(client, ());
+        self.clients.lock().unwrap().insert(client, ());
         true
     }
 
     fn remove_client(&mut self, client: Self::Client) -> bool {
-        self.clients.borrow_mut().remove(&client).is_some()
+        self.clients.lock().unwrap().remove(&client).is_some()
     }
 
     fn add_replica(&mut self, replica: Self::Replica) -> bool {
-        if self.replicas.borrow().contains_key(&replica) {
+        if self.replicas.lock().unwrap().contains_key(&replica) {
             return false;
         }
-        self.replicas.borrow_mut().insert(replica, ());
+        self.replicas.lock().unwrap().insert(replica, ());
         true
     }
 
     fn remove_replica(&mut self, replica: Self::Replica) -> bool {
-        self.replicas.borrow_mut().remove(&replica).is_some()
+        self.replicas.lock().unwrap().remove(&replica).is_some()
     }
 
     async fn send_to_client(
@@ -85,11 +74,11 @@ impl MessageBus for MemBus {
         client_id: Self::Client,
         message: Self::Data,
     ) -> Result<(), IggyError> {
-        if !self.clients.borrow().contains_key(&client_id) {
+        if !self.clients.lock().unwrap().contains_key(&client_id) {
             return Err(IggyError::ClientNotFound(client_id as u32));
         }
 
-        self.pending_messages.borrow_mut().push_back(Envelope {
+        self.pending_messages.lock().unwrap().push_back(Envelope {
             from_replica: None,
             to_replica: None,
             to_client: Some(client_id),
@@ -104,11 +93,11 @@ impl MessageBus for MemBus {
         replica: Self::Replica,
         message: Self::Data,
     ) -> Result<(), IggyError> {
-        if !self.replicas.borrow().contains_key(&replica) {
+        if !self.replicas.lock().unwrap().contains_key(&replica) {
             return Err(IggyError::ResourceNotFound(format!("Replica {}", 
replica)));
         }
 
-        self.pending_messages.borrow_mut().push_back(Envelope {
+        self.pending_messages.lock().unwrap().push_back(Envelope {
             from_replica: None,
             to_replica: Some(replica),
             to_client: None,
@@ -118,3 +107,63 @@ impl MessageBus for MemBus {
         Ok(())
     }
 }
+
+/// Newtype wrapper for shared MemBus that implements MessageBus
+#[derive(Debug, Clone)]
+pub struct SharedMemBus(pub Arc<MemBus>);
+
+impl Deref for SharedMemBus {
+    type Target = MemBus;
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl MessageBus for SharedMemBus {
+    type Client = u128;
+    type Replica = u8;
+    type Data = Message<GenericHeader>;
+    type Sender = ();
+
+    fn add_client(&mut self, client: Self::Client, sender: Self::Sender) -> 
bool {
+        self.0
+            .clients
+            .lock()
+            .unwrap()
+            .insert(client, sender)
+            .is_none()
+    }
+
+    fn remove_client(&mut self, client: Self::Client) -> bool {
+        self.0.clients.lock().unwrap().remove(&client).is_some()
+    }
+
+    fn add_replica(&mut self, replica: Self::Replica) -> bool {
+        self.0
+            .replicas
+            .lock()
+            .unwrap()
+            .insert(replica, ())
+            .is_none()
+    }
+
+    fn remove_replica(&mut self, replica: Self::Replica) -> bool {
+        self.0.replicas.lock().unwrap().remove(&replica).is_some()
+    }
+
+    async fn send_to_client(
+        &self,
+        client_id: Self::Client,
+        message: Self::Data,
+    ) -> Result<(), IggyError> {
+        self.0.send_to_client(client_id, message).await
+    }
+
+    async fn send_to_replica(
+        &self,
+        replica: Self::Replica,
+        message: Self::Data,
+    ) -> Result<(), IggyError> {
+        self.0.send_to_replica(replica, message).await
+    }
+}
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
new file mode 100644
index 000000000..d3fbfe42b
--- /dev/null
+++ b/core/simulator/src/client.rs
@@ -0,0 +1,83 @@
+use iggy_common::{
+    BytesSerializable, Identifier,
+    create_stream::CreateStream,
+    delete_stream::DeleteStream,
+    header::{Operation, RequestHeader},
+    message::Message,
+};
+use std::cell::Cell;
+
+// TODO: Proper client which implements the full client SDK API
+pub struct SimClient {
+    client_id: u128,
+    request_counter: Cell<u64>,
+}
+
+impl SimClient {
+    pub fn new(client_id: u128) -> Self {
+        Self {
+            client_id,
+            request_counter: Cell::new(0),
+        }
+    }
+
+    fn next_request_number(&self) -> u64 {
+        let current = self.request_counter.get();
+        self.request_counter.set(current + 1);
+        current
+    }
+
+    pub fn create_stream(&self, name: &str) -> Message<RequestHeader> {
+        let create_stream = CreateStream {
+            name: name.to_string(),
+        };
+        let payload = bytes::Bytes::from(create_stream.to_bytes());
+
+        self.build_request(Operation::CreateStream, payload)
+    }
+
+    pub fn delete_stream(&self, name: &str) -> Message<RequestHeader> {
+        let delete_stream = DeleteStream {
+            stream_id: Identifier::named(name).unwrap(),
+        };
+        let payload = bytes::Bytes::from(delete_stream.to_bytes());
+
+        self.build_request(Operation::DeleteStream, payload)
+    }
+
+    fn build_request(&self, operation: Operation, payload: bytes::Bytes) -> 
Message<RequestHeader> {
+        use bytes::Bytes;
+
+        let header_size = std::mem::size_of::<RequestHeader>();
+        let total_size = header_size + payload.len();
+
+        let header = RequestHeader {
+            command: iggy_common::header::Command2::Request,
+            operation,
+            size: total_size as u32,
+            cluster: 0, // TODO: Get from config
+            checksum: 0,
+            checksum_body: 0,
+            epoch: 0,
+            view: 0,
+            release: 0,
+            protocol: 0,
+            replica: 0,
+            reserved_frame: [0; 12],
+            client: self.client_id,
+            request_checksum: 0,
+            timestamp: 0, // TODO: Use actual timestamp
+            request: self.next_request_number(),
+            ..Default::default()
+        };
+
+        let header_bytes = bytemuck::bytes_of(&header);
+        let mut buffer = Vec::with_capacity(total_size);
+        buffer.extend_from_slice(header_bytes);
+        buffer.extend_from_slice(&payload);
+
+        let message = Message::<RequestHeader>::from_bytes(Bytes::from(buffer))
+            .expect("failed to build request message");
+        message
+    }
+}
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 5f1790e75..0900456a3 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -1,3 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::bus::SharedMemBus;
 use bytes::Bytes;
 use consensus::VsrConsensus;
 use iggy_common::header::PrepareHeader;
@@ -6,7 +24,7 @@ use journal::{Journal, JournalHandle, Storage};
 use metadata::stm::consumer_group::ConsumerGroups;
 use metadata::stm::stream::Streams;
 use metadata::stm::user::Users;
-use metadata::{variadic, IggyMetadata, MuxStateMachine};
+use metadata::{IggyMetadata, MuxStateMachine, variadic};
 use std::cell::{Cell, RefCell, UnsafeCell};
 use std::collections::HashMap;
 
@@ -37,8 +55,8 @@ impl Storage for MemStorage {
     }
 }
 
+// TODO: Replace with actual Journal, the only thing that we will need to 
change is the `Storage` impl for an in-memory one.
 /// Generic in-memory journal implementation for testing/simulation
-/// Following TigerBeetle's approach: headers in memory, full messages in 
storage
 pub struct SimJournal<S: Storage> {
     storage: S,
     headers: UnsafeCell<HashMap<u64, PrepareHeader>>,
@@ -124,15 +142,17 @@ impl JournalHandle for SimJournal<MemStorage> {
     }
 }
 
-/// Placeholder snapshot implementation
 #[derive(Debug, Default)]
 pub struct SimSnapshot {}
 
 /// Type aliases for simulator metadata
 pub type SimMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)>;
-pub type SimMetadata =
-    IggyMetadata<VsrConsensus, SimJournal<MemStorage>, SimSnapshot, 
SimMuxStateMachine>;
+pub type SimMetadata = IggyMetadata<
+    VsrConsensus<SharedMemBus>,
+    SimJournal<MemStorage>,
+    SimSnapshot,
+    SimMuxStateMachine,
+>;
 
-/// Placeholder for replica partitions
 #[derive(Debug, Default)]
 pub struct ReplicaPartitions {}
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index fdd296f94..3bc17882a 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -1,43 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
 pub mod bus;
+pub mod client;
 pub mod deps;
 pub mod replica;
 
 use bus::MemBus;
-use iggy_common::header::{PrepareHeader, PrepareOkHeader, RequestHeader};
+use iggy_common::header::{GenericHeader, ReplyHeader};
 use iggy_common::message::{Message, MessageBag};
-use iggy_common::{
-    ClientInfo, ClientInfoDetails, ClusterMetadata, CompressionAlgorithm, 
Consumer,
-    ConsumerGroup, ConsumerGroupDetails, ConsumerOffsetInfo, Identifier, 
IdentityInfo,
-    IggyDuration, IggyError, IggyExpiry, IggyMessage, MaxTopicSize, 
Partitioning, Permissions,
-    PersonalAccessTokenExpiry, PersonalAccessTokenInfo, PolledMessages, 
PollingStrategy,
-    RawPersonalAccessToken, Snapshot, SnapshotCompression, Stats, Stream, 
StreamDetails,
-    SystemSnapshotType, Topic, TopicDetails, UserInfo, UserInfoDetails, 
UserStatus,
-};
 use message_bus::MessageBus;
 use metadata::Metadata;
 use replica::Replica;
-use std::rc::Rc;
+use std::sync::Arc;
 
-/// The main simulator struct that manages all replicas and exposes the SDK API
 #[derive(Debug)]
 pub struct Simulator {
     pub replicas: Vec<Replica>,
-    pub message_bus: Rc<MemBus>,
+    pub message_bus: Arc<MemBus>,
 }
 
 impl Simulator {
-    pub fn new(replica_count: usize) -> Self {
-        // Create message bus and preseed all replica connections
+    pub fn new(replica_count: usize, clients: impl Iterator<Item = u128>) -> 
Self {
         let mut message_bus = MemBus::new();
+        for client in clients {
+            message_bus.add_client(client, ());
+        }
 
-        // Register all replicas with the message bus
         for i in 0..replica_count as u8 {
             message_bus.add_replica(i);
         }
 
-        let message_bus = Rc::new(message_bus);
+        let message_bus = Arc::new(message_bus);
         let replicas = (0..replica_count)
-            .map(|i| Replica::new(i as u8, format!("replica-{}", i), 
Rc::clone(&message_bus)))
+            .map(|i| {
+                Replica::new(
+                    i as u8,
+                    format!("replica-{}", i),
+                    Arc::clone(&message_bus),
+                    replica_count as u8,
+                )
+            })
             .collect();
 
         Self {
@@ -47,14 +64,20 @@ impl Simulator {
     }
 
     pub fn with_message_bus(replica_count: usize, mut message_bus: MemBus) -> 
Self {
-        // Preseed replica connections
         for i in 0..replica_count as u8 {
             message_bus.add_replica(i);
         }
 
-        let message_bus = Rc::new(message_bus);
+        let message_bus = Arc::new(message_bus);
         let replicas = (0..replica_count)
-            .map(|i| Replica::new(i as u8, format!("replica-{}", i), 
Rc::clone(&message_bus)))
+            .map(|i| {
+                Replica::new(
+                    i as u8,
+                    format!("replica-{}", i),
+                    Arc::clone(&message_bus),
+                    replica_count as u8,
+                )
+            })
             .collect();
 
         Self {
@@ -64,521 +87,79 @@ impl Simulator {
     }
 }
 
-// 
=============================================================================
-// Internal message dispatch - Implementation details
-// 
=============================================================================
-
 impl Simulator {
-    /// Dispatch a message to the appropriate subsystem
-    /// Private - called by public client methods after building the message
-    ///
-    /// Routes based on message type and operation:
-    /// - Request messages → check operation type
-    ///   - Metadata operations (1-99) → metadata shard
-    ///   - Partition operations (100+) → partition shard
-    /// - Other message types → routed directly
-    fn dispatch(&self, message: MessageBag) {
-        match message {
-            MessageBag::Request(request) => {
-                // Route request based on operation type
-                let operation_value = request.header().operation as u32;
+    pub async fn step(&self) -> Option<Message<ReplyHeader>> {
+        if let Some(envelope) = self.message_bus.receive() {
+            if let Some(_client_id) = envelope.to_client {
+                let reply: Message<ReplyHeader> = envelope
+                    .message
+                    .try_into_typed()
+                    .expect("invalid message, wrong command type for an client 
response");
+                return Some(reply);
+            }
 
-                if operation_value < 100 {
-                    // Metadata operation
-                    self.dispatch_to_metadata(request);
-                } else {
-                    // Partition operation
-                    self.dispatch_to_partition(request);
+            if let Some(replica_id) = envelope.to_replica {
+                if let Some(replica) = self.replicas.get(replica_id as usize) {
+                    self.dispatch_to_replica(replica, envelope.message).await;
                 }
             }
-            MessageBag::Prepare(prepare) => {
-                // TODO: Handle prepare messages (chain replication)
-                todo!("handle prepare: op={}", prepare.header().op);
-            }
-            MessageBag::PrepareOk(prepare_ok) => {
-                // TODO: Handle acknowledgments
-                todo!("handle prepare_ok: op={}", prepare_ok.header().op);
-            }
-            MessageBag::Reply(_reply) => {
-                // TODO: Handle replies back to client
-                todo!("handle reply message");
-            }
-            MessageBag::Commit(_commit) => {
-                // TODO: Handle commit messages
-                todo!("handle commit message");
-            }
-            MessageBag::Generic(_generic) => {
-                // TODO: Handle generic messages
-                todo!("handle generic message");
-            }
         }
-    }
-
-    /// Dispatch metadata operation to metadata shard
-    fn dispatch_to_metadata(&self, request: Message<RequestHeader>) {
-        // TODO: Determine which replica is primary (for now, use replica 0)
-        let primary_id = 0;
-        let primary = &self.replicas[primary_id];
-
-        // Route to metadata's on_request handler
-        primary.metadata.on_request(request);
-    }
-
-    /// Dispatch partition operation to partition shard
-    fn dispatch_to_partition(&self, request: Message<RequestHeader>) {
-        // TODO: Determine which partition/replica handles this request
-        // For now, just route to replica 0's partitions
-        let replica_id = 0;
-        let _replica = &self.replicas[replica_id];
-
-        // TODO: Route to replica.partitions.on_request(request)
-        todo!(
-            "dispatch partition operation to replica {}: operation={:?}",
-            replica_id,
-            request.header().operation
-        );
-    }
-
-}
-
-// 
=============================================================================
-// Client methods
-// 
=============================================================================
-
-impl Simulator {
-    pub fn connect(&self) -> Result<(), IggyError> {
-        todo!()
-    }
-
-    pub fn disconnect(&self) -> Result<(), IggyError> {
-        todo!()
-    }
-
-    pub fn shutdown(&self) -> Result<(), IggyError> {
-        todo!()
-    }
-}
-
-// 
=============================================================================
-// System methods
-// 
=============================================================================
-
-impl Simulator {
-    pub fn get_stats(&self) -> Result<Stats, IggyError> {
-        todo!()
-    }
-
-    pub fn get_me(&self) -> Result<ClientInfoDetails, IggyError> {
-        todo!()
-    }
-
-    pub fn get_client(&self, _client_id: u32) -> 
Result<Option<ClientInfoDetails>, IggyError> {
-        todo!()
-    }
-
-    pub fn get_clients(&self) -> Result<Vec<ClientInfo>, IggyError> {
-        todo!()
-    }
-
-    pub fn ping(&self) -> Result<(), IggyError> {
-        todo!()
-    }
-
-    pub fn heartbeat_interval(&self) -> IggyDuration {
-        todo!()
-    }
-
-    pub fn snapshot(
-        &self,
-        _compression: SnapshotCompression,
-        _snapshot_types: Vec<SystemSnapshotType>,
-    ) -> Result<Snapshot, IggyError> {
-        todo!()
-    }
-}
-
-// 
=============================================================================
-// User methods
-// 
=============================================================================
-
-impl Simulator {
-    pub fn get_user(&self, _user_id: &Identifier) -> 
Result<Option<UserInfoDetails>, IggyError> {
-        todo!()
-    }
-
-    pub fn get_users(&self) -> Result<Vec<UserInfo>, IggyError> {
-        todo!()
-    }
-
-    pub fn create_user(
-        &self,
-        _username: &str,
-        _password: &str,
-        _status: UserStatus,
-        _permissions: Option<Permissions>,
-    ) -> Result<UserInfoDetails, IggyError> {
-        todo!()
-    }
-
-    pub fn delete_user(&self, _user_id: &Identifier) -> Result<(), IggyError> {
-        todo!()
-    }
-
-    pub fn update_user(
-        &self,
-        _user_id: &Identifier,
-        _username: Option<&str>,
-        _status: Option<UserStatus>,
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
-
-    pub fn update_permissions(
-        &self,
-        _user_id: &Identifier,
-        _permissions: Option<Permissions>,
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
-
-    pub fn change_password(
-        &self,
-        _user_id: &Identifier,
-        _current_password: &str,
-        _new_password: &str,
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
-
-    pub fn login_user(&self, _username: &str, _password: &str) -> 
Result<IdentityInfo, IggyError> {
-        todo!()
-    }
-
-    pub fn logout_user(&self) -> Result<(), IggyError> {
-        todo!()
-    }
-}
-
-// 
=============================================================================
-// Personal access token methods
-// 
=============================================================================
-
-impl Simulator {
-    pub fn get_personal_access_tokens(&self) -> 
Result<Vec<PersonalAccessTokenInfo>, IggyError> {
-        todo!()
-    }
-
-    pub fn create_personal_access_token(
-        &self,
-        _name: &str,
-        _expiry: PersonalAccessTokenExpiry,
-    ) -> Result<RawPersonalAccessToken, IggyError> {
-        todo!()
-    }
-
-    pub fn delete_personal_access_token(&self, _name: &str) -> Result<(), 
IggyError> {
-        todo!()
-    }
-
-    pub fn login_with_personal_access_token(
-        &self,
-        _token: &str,
-    ) -> Result<IdentityInfo, IggyError> {
-        todo!()
-    }
-}
-
-// 
=============================================================================
-// Stream methods - Public API
-// 
=============================================================================
-//
-// Pattern for all client methods:
-// 1. Build Message<RequestHeader> from parameters
-// 2. Convert to MessageBag::Request(message)
-// 3. Call self.dispatch(bag) - private method
-// 4. Wait for reply from primary
-// 5. Parse and return result
-
-impl Simulator {
-    pub fn get_stream(&self, _stream_id: &Identifier) -> 
Result<Option<StreamDetails>, IggyError> {
-        // TODO: Build message with Operation::GetStream (not yet in enum)
-        // self.dispatch_request(message);
-        // Wait for reply and parse
-        todo!()
-    }
-
-    pub fn get_streams(&self) -> Result<Vec<Stream>, IggyError> {
-        // TODO: Build message with Operation::ListStreams (not yet in enum)
-        // self.dispatch_request(message);
-        // Wait for reply and parse
-        todo!()
-    }
-
-    pub fn create_stream(&self, name: &str) -> Result<StreamDetails, 
IggyError> {
-        // TODO: Build Message<RequestHeader> with:
-        //    - header.operation = Operation::CreateStream
-        //    - body containing serialized stream name
-        // Convert to MessageBag (using From impl) and dispatch:
-        //   let message: Message<RequestHeader> = build_request(name);
-        //   self.dispatch(message.into());  // ← .into() converts to 
MessageBag
-        // Wait for reply and parse StreamDetails
-        todo!("build message for: {}", name)
-    }
-
-    pub fn update_stream(&self, _stream_id: &Identifier, _name: &str) -> 
Result<(), IggyError> {
-        todo!()
-    }
-
-    pub fn delete_stream(&self, _stream_id: &Identifier) -> Result<(), 
IggyError> {
-        todo!()
-    }
-
-    pub fn purge_stream(&self, _stream_id: &Identifier) -> Result<(), 
IggyError> {
-        todo!()
-    }
-}
-
-// 
=============================================================================
-// Topic methods
-// 
=============================================================================
-
-impl Simulator {
-    pub fn get_topic(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-    ) -> Result<Option<TopicDetails>, IggyError> {
-        todo!()
-    }
 
-    pub fn get_topics(&self, _stream_id: &Identifier) -> Result<Vec<Topic>, 
IggyError> {
-        todo!()
+        None
     }
 
-    pub fn create_topic(
-        &self,
-        _stream_id: &Identifier,
-        _name: &str,
-        _partitions_count: u32,
-        _compression_algorithm: CompressionAlgorithm,
-        _replication_factor: Option<u8>,
-        _message_expiry: IggyExpiry,
-        _max_topic_size: MaxTopicSize,
-    ) -> Result<TopicDetails, IggyError> {
-        todo!()
-    }
-
-    pub fn update_topic(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _name: &str,
-        _compression_algorithm: CompressionAlgorithm,
-        _replication_factor: Option<u8>,
-        _message_expiry: IggyExpiry,
-        _max_topic_size: MaxTopicSize,
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
-
-    pub fn delete_topic(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
-
-    pub fn purge_topic(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
-}
-
-// 
=============================================================================
-// Partition methods
-// 
=============================================================================
-
-impl Simulator {
-    pub fn create_partitions(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _partitions_count: u32,
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
-
-    pub fn delete_partitions(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _partitions_count: u32,
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
-}
-
-// 
=============================================================================
-// Segment methods
-// 
=============================================================================
-
-impl Simulator {
-    pub fn delete_segments(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _partition_id: u32,
-        _segments_count: u32,
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
-}
-
-// 
=============================================================================
-// Message methods
-// 
=============================================================================
-
-impl Simulator {
-    pub fn poll_messages(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _partition_id: Option<u32>,
-        _consumer: &Consumer,
-        _strategy: &PollingStrategy,
-        _count: u32,
-        _auto_commit: bool,
-    ) -> Result<PolledMessages, IggyError> {
-        todo!()
-    }
-
-    pub fn send_messages(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _partitioning: &Partitioning,
-        _messages: &mut [IggyMessage],
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
+    async fn dispatch_to_replica(&self, replica: &Replica, message: 
Message<GenericHeader>) {
+        let message: MessageBag = message.into();
+        let operation = match &message {
+            MessageBag::Request(message) => message.header().operation,
+            MessageBag::Prepare(message) => message.header().operation,
+            MessageBag::PrepareOk(message) => message.header().operation,
+        } as u8;
 
-    pub fn flush_unsaved_buffer(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _partition_id: u32,
-        _fsync: bool,
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
-}
-
-// 
=============================================================================
-// Consumer offset methods
-// 
=============================================================================
-
-impl Simulator {
-    pub fn store_consumer_offset(
-        &self,
-        _consumer: &Consumer,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _partition_id: Option<u32>,
-        _offset: u64,
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
-
-    pub fn get_consumer_offset(
-        &self,
-        _consumer: &Consumer,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _partition_id: Option<u32>,
-    ) -> Result<Option<ConsumerOffsetInfo>, IggyError> {
-        todo!()
-    }
-
-    pub fn delete_consumer_offset(
-        &self,
-        _consumer: &Consumer,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _partition_id: Option<u32>,
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
-}
-
-// 
=============================================================================
-// Consumer group methods
-// 
=============================================================================
-
-impl Simulator {
-    pub fn get_consumer_group(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _group_id: &Identifier,
-    ) -> Result<Option<ConsumerGroupDetails>, IggyError> {
-        todo!()
-    }
-
-    pub fn get_consumer_groups(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-    ) -> Result<Vec<ConsumerGroup>, IggyError> {
-        todo!()
-    }
-
-    pub fn create_consumer_group(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _name: &str,
-    ) -> Result<ConsumerGroupDetails, IggyError> {
-        todo!()
-    }
-
-    pub fn delete_consumer_group(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _group_id: &Identifier,
-    ) -> Result<(), IggyError> {
-        todo!()
-    }
-
-    pub fn join_consumer_group(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _group_id: &Identifier,
-    ) -> Result<(), IggyError> {
-        todo!()
+        if operation < 200 {
+            self.dispatch_to_metadata_on_replica(replica, message).await;
+        } else {
+            self.dispatch_to_partition_on_replica(replica, message);
+        }
     }
 
-    pub fn leave_consumer_group(
-        &self,
-        _stream_id: &Identifier,
-        _topic_id: &Identifier,
-        _group_id: &Identifier,
-    ) -> Result<(), IggyError> {
-        todo!()
+    async fn dispatch_to_metadata_on_replica(&self, replica: &Replica, 
message: MessageBag) {
+        match message {
+            MessageBag::Request(request) => {
+                replica.metadata.on_request(request).await;
+            }
+            MessageBag::Prepare(prepare) => {
+                replica.metadata.on_replicate(prepare).await;
+            }
+            MessageBag::PrepareOk(prepare_ok) => {
+                replica.metadata.on_ack(prepare_ok).await;
+            }
+        }
     }
-}
 
-// 
=============================================================================
-// Cluster methods
-// 
=============================================================================
-
-impl Simulator {
-    pub fn get_cluster_metadata(&self) -> Result<ClusterMetadata, IggyError> {
-        todo!()
+    fn dispatch_to_partition_on_replica(&self, replica: &Replica, message: 
MessageBag) {
+        match message {
+            MessageBag::Request(request) => {
+                todo!(
+                    "dispatch request to partition replica {}: operation={:?}",
+                    replica.id,
+                    request.header().operation
+                );
+            }
+            MessageBag::Prepare(prepare) => {
+                todo!(
+                    "dispatch prepare to partition replica {}: operation={:?}",
+                    replica.id,
+                    prepare.header().operation
+                );
+            }
+            MessageBag::PrepareOk(prepare_ok) => {
+                todo!(
+                    "dispatch prepare_ok to partition replica {}: op={}",
+                    replica.id,
+                    prepare_ok.header().op
+                );
+            }
+        }
     }
 }
diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs
index 0790f62d5..995c717a3 100644
--- a/core/simulator/src/main.rs
+++ b/core/simulator/src/main.rs
@@ -1,9 +1,100 @@
-use simulator::Simulator;
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
 
-fn main() {
-    let sim = Simulator::new(3);
-    println!("Created simulator with {} replicas", sim.replicas.len());
-    for replica in &sim.replicas {
-        println!("  - {} (id: {})", replica.name, replica.id);
+use iggy_common::header::ReplyHeader;
+use iggy_common::message::Message;
+use message_bus::MessageBus;
+use simulator::{Simulator, client::SimClient};
+use std::collections::VecDeque;
+use std::sync::{Arc, Mutex};
+
+/// Shared response queue for client replies
+#[derive(Default)]
+pub struct Responses {
+    queue: VecDeque<Message<ReplyHeader>>,
+}
+
+impl Responses {
+    pub fn push(&mut self, msg: Message<ReplyHeader>) {
+        self.queue.push_back(msg);
     }
+
+    pub fn pop(&mut self) -> Option<Message<ReplyHeader>> {
+        self.queue.pop_front()
+    }
+}
+
+fn main() {
+    let client_id: u128 = 1;
+    let leader: u8 = 0;
+    let sim = Simulator::new(3, std::iter::once(client_id));
+    let bus = sim.message_bus.clone();
+
+    // Responses queue
+    let responses = Arc::new(Mutex::new(Responses::default()));
+    let responses_clone = responses.clone();
+
+    // TODO: Scuffed client/simulator setup.
+    // We need a better interface on simulator
+    let client_handle = std::thread::spawn(move || {
+        futures::executor::block_on(async {
+            let client = SimClient::new(client_id);
+
+            let create_msg = client.create_stream("test-stream");
+            bus.send_to_replica(leader, create_msg.into_generic())
+                .await
+                .expect("failed to send create_stream");
+
+            loop {
+                if let Some(reply) = responses_clone.lock().unwrap().pop() {
+                    println!("[client] Got create_stream reply: {:?}", 
reply.header());
+                    break;
+                }
+                std::thread::sleep(std::time::Duration::from_millis(1));
+            }
+
+            let delete_msg = client.delete_stream("test-stream");
+            bus.send_to_replica(leader, delete_msg.into_generic())
+                .await
+                .expect("failed to send delete_stream");
+
+            loop {
+                if let Some(reply) = responses_clone.lock().unwrap().pop() {
+                    println!("[client] Got delete_stream reply: {:?}", 
reply.header());
+                    break;
+                }
+                std::thread::sleep(std::time::Duration::from_millis(1));
+            }
+        });
+    });
+
+    println!("[sim] Starting simulator loop");
+    futures::executor::block_on(async {
+        loop {
+            if let Some(reply) = sim.step().await {
+                responses.lock().unwrap().push(reply);
+            }
+
+            if client_handle.is_finished() {
+                break;
+            }
+        }
+    });
+
+    client_handle.join().expect("client thread panicked");
+    println!("[sim] Simulator loop ended");
 }
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index 1e93b9640..74c26b72f 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -1,12 +1,30 @@
-use crate::bus::MemBus;
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::bus::{MemBus, SharedMemBus};
 use crate::deps::{
     MemStorage, ReplicaPartitions, SimJournal, SimMetadata, 
SimMuxStateMachine, SimSnapshot,
 };
+use consensus::VsrConsensus;
 use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
 use metadata::stm::stream::{Streams, StreamsInner};
 use metadata::stm::user::{Users, UsersInner};
-use metadata::{variadic, IggyMetadata};
-use std::rc::Rc;
+use metadata::{IggyMetadata, variadic};
+use std::sync::Arc;
 
 #[derive(Debug)]
 pub struct Replica {
@@ -14,22 +32,30 @@ pub struct Replica {
     pub name: String,
     pub metadata: SimMetadata,
     pub partitions: ReplicaPartitions,
-    pub bus: Rc<MemBus>,
+    pub bus: Arc<MemBus>,
 }
 
 impl Replica {
-    pub fn new(id: u8, name: String, bus: Rc<MemBus>) -> Self {
-        // Create the mux state machine with all state machines
+    pub fn new(id: u8, name: String, bus: Arc<MemBus>, replica_count: u8) -> 
Self {
         let users: Users = UsersInner::new().into();
         let streams: Streams = StreamsInner::new().into();
         let consumer_groups: ConsumerGroups = 
ConsumerGroupsInner::new().into();
         let mux = SimMuxStateMachine::new(variadic!(users, streams, 
consumer_groups));
 
+        let cluster_id: u128 = 1; // TODO: Make configurable
+        let consensus = VsrConsensus::new(
+            cluster_id,
+            id,
+            replica_count,
+            SharedMemBus(Arc::clone(&bus)),
+        );
+        consensus.init();
+
         Self {
             id,
             name,
             metadata: IggyMetadata {
-                consensus: None, // TODO: Init consensus
+                consensus: Some(consensus),
                 journal: Some(SimJournal::<MemStorage>::default()),
                 snapshot: Some(SimSnapshot::default()),
                 mux_stm: mux,

Reply via email to