This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new cc3746448 feat(simulator): implement bare bone version of simulator 
(#2688)
cc3746448 is described below

commit cc37464487321d1b1b44952db6c6d1106b978cde
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Mon Feb 9 11:18:10 2026 +0100

    feat(simulator): implement bare bone version of simulator (#2688)
    
    Bare bone version of simulator, further progress is blocked by #2687
---
 .github/workflows/_common.yml              |   1 +
 Cargo.lock                                 |  14 +++
 Cargo.toml                                 |   1 +
 DEPENDENCIES.md                            |   1 +
 core/common/src/sender/mod.rs              |   1 +
 core/common/src/types/consensus/header.rs  |  28 ++++-
 core/common/src/types/consensus/message.rs |  71 ++++-------
 core/consensus/src/impls.rs                | 102 +++++++++-------
 core/consensus/src/lib.rs                  |  18 +--
 core/consensus/src/vsr_timeout.rs          |   1 +
 core/journal/src/lib.rs                    |  12 +-
 core/message_bus/src/cache/connection.rs   |   5 +-
 core/message_bus/src/lib.rs                |   7 +-
 core/metadata/src/impls/metadata.rs        | 184 +++++++++++++++++-----------
 core/metadata/src/lib.rs                   |   8 +-
 core/metadata/src/stm/mod.rs               |  34 ++++--
 core/metadata/src/stm/mux.rs               |  20 ++--
 core/metadata/src/stm/stream.rs            |   1 +
 core/simulator/Cargo.toml                  |  31 +++++
 core/simulator/src/bus.rs                  | 186 +++++++++++++++++++++++++++++
 core/simulator/src/client.rs               |  99 +++++++++++++++
 core/simulator/src/deps.rs                 | 155 ++++++++++++++++++++++++
 core/simulator/src/lib.rs                  | 165 +++++++++++++++++++++++++
 core/simulator/src/main.rs                 | 100 ++++++++++++++++
 core/simulator/src/replica.rs              |  67 +++++++++++
 25 files changed, 1113 insertions(+), 199 deletions(-)

diff --git a/.github/workflows/_common.yml b/.github/workflows/_common.yml
index 3e2cc20cf..2a1a3aa7e 100644
--- a/.github/workflows/_common.yml
+++ b/.github/workflows/_common.yml
@@ -107,6 +107,7 @@ jobs:
             metadata
             message_bus
             storage
+            simulator
             configs
 
   license-headers:
diff --git a/Cargo.lock b/Cargo.lock
index 19ff17599..13a4dd133 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8433,6 +8433,20 @@ dependencies = [
  "time",
 ]
 
+[[package]]
+name = "simulator"
+version = "0.1.0"
+dependencies = [
+ "bytemuck",
+ "bytes",
+ "consensus",
+ "futures",
+ "iggy_common",
+ "journal",
+ "message_bus",
+ "metadata",
+]
+
 [[package]]
 name = "siphasher"
 version = "1.0.1"
diff --git a/Cargo.toml b/Cargo.toml
index 115b034c2..3566333b2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -50,6 +50,7 @@ members = [
     "core/partitions",
     "core/sdk",
     "core/server",
+    "core/simulator",
     "core/tools",
     "examples/rust",
 ]
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 673532693..be466b0d5 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -730,6 +730,7 @@ simd-adler32: 0.3.8, "MIT",
 simd-json: 0.17.0, "Apache-2.0 OR MIT",
 simdutf8: 0.1.5, "Apache-2.0 OR MIT",
 simple_asn1: 0.6.3, "ISC",
+simulator: 0.1.0, "N/A",
 siphasher: 1.0.1, "Apache-2.0 OR MIT",
 slab: 0.4.11, "MIT",
 smallvec: 1.15.1, "Apache-2.0 OR MIT",
diff --git a/core/common/src/sender/mod.rs b/core/common/src/sender/mod.rs
index 11c13bc70..50b426460 100644
--- a/core/common/src/sender/mod.rs
+++ b/core/common/src/sender/mod.rs
@@ -86,6 +86,7 @@ pub trait Sender {
 }
 
 #[allow(clippy::large_enum_variant)]
+#[derive(Debug)]
 pub enum SenderKind {
     Tcp(TcpSender),
     TcpTls(TcpTlsSender),
diff --git a/core/common/src/types/consensus/header.rs 
b/core/common/src/types/consensus/header.rs
index 376b6be05..8f9538557 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -114,6 +114,8 @@ pub enum Operation {
     UpdatePermissions = 145,
     CreatePersonalAccessToken = 146,
     DeletePersonalAccessToken = 147,
+
+    Reserved = 200,
 }
 
 #[repr(C)]
@@ -172,6 +174,30 @@ pub struct RequestHeader {
     pub reserved: [u8; 95],
 }
 
+impl Default for RequestHeader {
+    fn default() -> Self {
+        Self {
+            reserved: [0; 95],
+            checksum: 0,
+            checksum_body: 0,
+            cluster: 0,
+            size: 0,
+            epoch: 0,
+            view: 0,
+            release: 0,
+            protocol: 0,
+            command: Default::default(),
+            replica: 0,
+            reserved_frame: [0; 12],
+            client: 0,
+            request_checksum: 0,
+            timestamp: 0,
+            request: 0,
+            operation: Default::default(),
+        }
+    }
+}
+
 unsafe impl Pod for RequestHeader {}
 unsafe impl Zeroable for RequestHeader {}
 
@@ -348,7 +374,7 @@ impl ConsensusHeader for CommitHeader {
 }
 
 #[repr(C)]
-#[derive(Debug, Clone, Copy)]
+#[derive(Default, Debug, Clone, Copy)]
 pub struct ReplyHeader {
     pub checksum: u128,
     pub checksum_body: u128,
diff --git a/core/common/src/types/consensus/message.rs 
b/core/common/src/types/consensus/message.rs
index ac32cc3c8..5fe7e0843 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -15,8 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::types::consensus::header::{
-    self, CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, 
PrepareOkHeader, ReplyHeader,
+use crate::{
+    header::RequestHeader,
+    types::consensus::header::{self, ConsensusHeader, PrepareHeader, 
PrepareOkHeader},
 };
 use bytes::Bytes;
 use std::marker::PhantomData;
@@ -278,33 +279,36 @@ where
 #[derive(Debug)]
 #[allow(unused)]
 pub enum MessageBag {
-    Generic(Message<GenericHeader>),
+    Request(Message<RequestHeader>),
     Prepare(Message<PrepareHeader>),
     PrepareOk(Message<PrepareOkHeader>),
-    Commit(Message<CommitHeader>),
-    Reply(Message<ReplyHeader>),
 }
 
 impl MessageBag {
     #[allow(unused)]
     pub fn command(&self) -> header::Command2 {
         match self {
-            MessageBag::Generic(message) => message.header().command,
+            MessageBag::Request(message) => message.header().command,
             MessageBag::Prepare(message) => message.header().command,
             MessageBag::PrepareOk(message) => message.header().command,
-            MessageBag::Commit(message) => message.header().command,
-            MessageBag::Reply(message) => message.header().command,
         }
     }
 
     #[allow(unused)]
     pub fn size(&self) -> u32 {
         match self {
-            MessageBag::Generic(message) => message.header().size(),
+            MessageBag::Request(message) => message.header().size(),
             MessageBag::Prepare(message) => message.header().size(),
             MessageBag::PrepareOk(message) => message.header().size(),
-            MessageBag::Commit(message) => message.header().size(),
-            MessageBag::Reply(message) => message.header().size(),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn operation(&self) -> header::Operation {
+        match self {
+            MessageBag::Request(message) => message.header().operation,
+            MessageBag::Prepare(message) => message.header().operation,
+            MessageBag::PrepareOk(message) => message.header().operation,
         }
     }
 }
@@ -326,17 +330,17 @@ where
                     unsafe { 
Message::<header::PrepareHeader>::from_buffer_unchecked(buffer) };
                 MessageBag::Prepare(msg)
             }
-            header::Command2::Commit => {
-                let msg = unsafe { 
Message::<header::CommitHeader>::from_buffer_unchecked(buffer) };
-                MessageBag::Commit(msg)
+            header::Command2::Request => {
+                let msg =
+                    unsafe { 
Message::<header::RequestHeader>::from_buffer_unchecked(buffer) };
+                MessageBag::Request(msg)
             }
-            header::Command2::Reply => {
-                let msg = unsafe { 
Message::<header::ReplyHeader>::from_buffer_unchecked(buffer) };
-                MessageBag::Reply(msg)
+            header::Command2::PrepareOk => {
+                let msg =
+                    unsafe { 
Message::<header::PrepareOkHeader>::from_buffer_unchecked(buffer) };
+                MessageBag::PrepareOk(msg)
             }
-            _ => unreachable!(
-                "For now we only support Prepare, Commit, and Reply. In the 
future we will support more commands. Command2: {command:?}"
-            ),
+            _ => unreachable!(),
         }
     }
 }
@@ -500,33 +504,6 @@ mod tests {
 
         assert_eq!(bag.command(), header::Command2::Prepare);
         assert!(matches!(bag, MessageBag::Prepare(_)));
-        assert!(!matches!(bag, MessageBag::Commit(_)));
-        assert!(!matches!(bag, MessageBag::Reply(_)));
-        assert!(!matches!(bag, MessageBag::Generic(_)));
-    }
-
-    #[test]
-    fn test_message_bag_from_commit() {
-        let commit = header::CommitHeader::create_test();
-        let bag = MessageBag::from(commit);
-
-        assert_eq!(bag.command(), header::Command2::Commit);
-        assert!(!matches!(bag, MessageBag::Prepare(_)));
-        assert!(matches!(bag, MessageBag::Commit(_)));
-        assert!(!matches!(bag, MessageBag::Reply(_)));
-        assert!(!matches!(bag, MessageBag::Generic(_)));
-    }
-
-    #[test]
-    fn test_message_bag_from_reply() {
-        let reply = header::ReplyHeader::create_test();
-        let bag = MessageBag::from(reply);
-
-        assert_eq!(bag.command(), header::Command2::Reply);
-        assert!(!matches!(bag, MessageBag::Prepare(_)));
-        assert!(!matches!(bag, MessageBag::Commit(_)));
-        assert!(matches!(bag, MessageBag::Reply(_)));
-        assert!(!matches!(bag, MessageBag::Generic(_)));
     }
 }
 
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 154a6c3a4..36b595163 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -27,6 +27,7 @@ use iggy_common::header::{
 };
 use iggy_common::message::Message;
 use message_bus::IggyMessageBus;
+use message_bus::MessageBus;
 use std::cell::{Cell, RefCell};
 use std::collections::VecDeque;
 
@@ -43,6 +44,7 @@ pub trait Sequencer {
     fn set_sequence(&self, sequence: Self::Sequence);
 }
 
+#[derive(Debug)]
 pub struct LocalSequencer {
     op: Cell<u64>,
 }
@@ -82,6 +84,7 @@ pub const PIPELINE_PREPARE_QUEUE_MAX: usize = 8;
 /// Maximum number of replicas in a cluster.
 pub const REPLICAS_MAX: usize = 32;
 
+#[derive(Debug)]
 pub struct PipelineEntry {
     pub message: Message<PrepareHeader>,
     /// Bitmap of replicas that have acknowledged this prepare.
@@ -133,6 +136,7 @@ impl RequestEntry {
     }
 }
 
+#[derive(Debug)]
 pub struct LocalPipeline {
     /// Messages being prepared (uncommitted and being replicated).
     prepare_queue: VecDeque<PipelineEntry>,
@@ -218,12 +222,8 @@ impl LocalPipeline {
         self.prepare_queue.back()
     }
 
-    /// Find a message by op number and checksum.
-    pub fn message_by_op_and_checksum(
-        &mut self,
-        op: u64,
-        checksum: u128,
-    ) -> Option<&mut PipelineEntry> {
+    /// Find a message by op number and checksum (immutable).
+    pub fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> 
Option<&PipelineEntry> {
         let head_op = self.prepare_queue.front()?.message.header().op;
         let tail_op = self.prepare_queue.back()?.message.header().op;
 
@@ -239,7 +239,7 @@ impl LocalPipeline {
         }
 
         let index = (op - head_op) as usize;
-        let entry = self.prepare_queue.get_mut(index)?;
+        let entry = self.prepare_queue.get(index)?;
 
         debug_assert_eq!(entry.message.header().op, op);
 
@@ -320,6 +320,20 @@ impl LocalPipeline {
     pub fn clear(&mut self) {
         self.prepare_queue.clear();
     }
+
+    /// Extract and remove a message by op number.
+    /// Returns None if op is not in the pipeline.
+    pub fn extract_by_op(&mut self, op: u64) -> Option<PipelineEntry> {
+        let head_op = self.prepare_queue.front()?.message.header().op;
+        if op < head_op {
+            return None;
+        }
+        let index = (op - head_op) as usize;
+        if index >= self.prepare_queue.len() {
+            return None;
+        }
+        self.prepare_queue.remove(index)
+    }
 }
 
 impl Pipeline for LocalPipeline {
@@ -334,15 +348,23 @@ impl Pipeline for LocalPipeline {
         LocalPipeline::pop_message(self)
     }
 
+    fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry> {
+        LocalPipeline::extract_by_op(self, op)
+    }
+
     fn clear(&mut self) {
         LocalPipeline::clear(self)
     }
 
+    fn message_by_op(&self, op: u64) -> Option<&Self::Entry> {
+        LocalPipeline::message_by_op(self, op)
+    }
+
     fn message_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry> {
         LocalPipeline::message_by_op_mut(self, op)
     }
 
-    fn message_by_op_and_checksum(&mut self, op: u64, checksum: u128) -> 
Option<&mut Self::Entry> {
+    fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> 
Option<&Self::Entry> {
         LocalPipeline::message_by_op_and_checksum(self, op, checksum)
     }
 
@@ -386,7 +408,11 @@ pub enum VsrAction {
 }
 
 #[allow(unused)]
-pub struct VsrConsensus {
+#[derive(Debug)]
+pub struct VsrConsensus<B = IggyMessageBus>
+where
+    B: MessageBus,
+{
     cluster: u128,
     replica: u8,
     replica_count: u8,
@@ -412,7 +438,7 @@ pub struct VsrConsensus {
 
     pipeline: RefCell<LocalPipeline>,
 
-    message_bus: IggyMessageBus,
+    message_bus: B,
     // TODO: Add loopback_queue for messages to self
     /// Tracks start view change messages received from all replicas 
(including self)
     start_view_change_from_all_replicas: RefCell<BitSet<u32>>,
@@ -430,8 +456,8 @@ pub struct VsrConsensus {
     timeouts: RefCell<TimeoutManager>,
 }
 
-impl VsrConsensus {
-    pub fn new(cluster: u128, replica: u8, replica_count: u8) -> Self {
+impl<B: MessageBus> VsrConsensus<B> {
+    pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B) 
-> Self {
         assert!(
             replica < replica_count,
             "replica index must be < replica_count"
@@ -449,7 +475,7 @@ impl VsrConsensus {
             last_timestamp: Cell::new(0),
             last_prepare_checksum: Cell::new(0),
             pipeline: RefCell::new(LocalPipeline::new()),
-            message_bus: IggyMessageBus::new(replica_count as usize, replica 
as u16, 0),
+            message_bus,
             start_view_change_from_all_replicas: 
RefCell::new(BitSet::with_capacity(REPLICAS_MAX)),
             do_view_change_from_all_replicas: 
RefCell::new(dvc_quorum_array_empty()),
             do_view_change_quorum: Cell::new(false),
@@ -459,6 +485,11 @@ impl VsrConsensus {
         }
     }
 
+    // TODO: More init logic.
+    pub fn init(&self) {
+        self.status.set(Status::Normal);
+    }
+
     pub fn primary_index(&self, view: u32) -> u8 {
         view as u8 % self.replica_count
     }
@@ -995,9 +1026,9 @@ impl VsrConsensus {
     /// Called on the primary when a follower acknowledges a prepare.
     ///
     /// Returns true if quorum was just reached for this op.
-    pub fn handle_prepare_ok(&self, message: Message<PrepareOkHeader>) -> bool 
{
-        let header = message.header();
-
+    /// Handle a PrepareOk message. Returns true if quorum was reached.
+    /// Note: Caller (on_ack) should validate is_primary and status before 
calling.
+    pub fn handle_prepare_ok(&self, header: &PrepareOkHeader) -> bool {
         assert_eq!(header.command, Command2::PrepareOk);
         assert!(
             header.replica < self.replica_count,
@@ -1005,26 +1036,16 @@ impl VsrConsensus {
             header.replica
         );
 
-        // Ignore if not in normal status
-        if self.status() != Status::Normal {
-            return false;
-        }
-
         // Ignore if from older view
         if header.view < self.view() {
             return false;
         }
 
-        // Ignore if from newer view. This shouldn't happen if we're primary
+        // Ignore if from newer view
         if header.view > self.view() {
             return false;
         }
 
-        // We must be primary to process prepare_ok
-        if !self.is_primary() {
-            return false;
-        }
-
         // Ignore if syncing
         if self.is_syncing() {
             return false;
@@ -1043,9 +1064,6 @@ impl VsrConsensus {
             return false;
         }
 
-        // Verify the prepare is for a valid op range
-        let _commit = self.commit();
-
         // Check for duplicate ack
         if entry.has_ack(header.replica) {
             return false;
@@ -1058,22 +1076,19 @@ impl VsrConsensus {
         // Check if we've reached quorum
         if ack_count >= quorum && !entry.ok_quorum_received {
             entry.ok_quorum_received = true;
-
             return true;
         }
 
         false
     }
 
-    pub fn message_bus(&self) -> &IggyMessageBus {
+    pub fn message_bus(&self) -> &B {
         &self.message_bus
     }
 }
 
-impl Project<Message<PrepareHeader>> for Message<RequestHeader> {
-    type Consensus = VsrConsensus;
-
-    fn project(self, consensus: &Self::Consensus) -> Message<PrepareHeader> {
+impl<B: MessageBus> Project<Message<PrepareHeader>, VsrConsensus<B>> for 
Message<RequestHeader> {
+    fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareHeader> {
         let op = consensus.sequencer.current_sequence() + 1;
 
         self.transmute_header(|old, new| {
@@ -1085,6 +1100,7 @@ impl Project<Message<PrepareHeader>> for 
Message<RequestHeader> {
                 release: old.release,
                 command: Command2::Prepare,
                 replica: consensus.replica,
+                client: old.client,
                 parent: 0, // TODO: Get parent checksum from the previous 
entry in the journal (figure out how to pass that ctx here)
                 request_checksum: old.request_checksum,
                 request: old.request,
@@ -1098,10 +1114,8 @@ impl Project<Message<PrepareHeader>> for 
Message<RequestHeader> {
     }
 }
 
-impl Project<Message<PrepareOkHeader>> for Message<PrepareHeader> {
-    type Consensus = VsrConsensus;
-
-    fn project(self, consensus: &Self::Consensus) -> Message<PrepareOkHeader> {
+impl<B: MessageBus> Project<Message<PrepareOkHeader>, VsrConsensus<B>> for 
Message<PrepareHeader> {
+    fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareOkHeader> {
         self.transmute_header(|old, new| {
             *new = PrepareOkHeader {
                 command: Command2::PrepareOk,
@@ -1124,8 +1138,8 @@ impl Project<Message<PrepareOkHeader>> for 
Message<PrepareHeader> {
     }
 }
 
-impl Consensus for VsrConsensus {
-    type MessageBus = IggyMessageBus;
+impl<B: MessageBus> Consensus for VsrConsensus<B> {
+    type MessageBus = B;
 
     type RequestMessage = Message<RequestHeader>;
     type ReplicateMessage = Message<PrepareHeader>;
@@ -1162,9 +1176,9 @@ impl Consensus for VsrConsensus {
         // verify op is sequential
         assert_eq!(
             header.op,
-            self.sequencer.current_sequence() + 1,
+            self.sequencer.current_sequence(),
             "op must be sequential: expected {}, got {}",
-            self.sequencer.current_sequence() + 1,
+            self.sequencer.current_sequence(),
             header.op
         );
 
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 1cf323d94..5c3e152e6 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -17,9 +17,8 @@
 
 use message_bus::MessageBus;
 
-pub trait Project<T> {
-    type Consensus: Consensus;
-    fn project(self, consensus: &Self::Consensus) -> T;
+pub trait Project<T, C: Consensus> {
+    fn project(self, consensus: &C) -> T;
 }
 
 pub trait Pipeline {
@@ -30,11 +29,16 @@ pub trait Pipeline {
 
     fn pop_message(&mut self) -> Option<Self::Entry>;
 
+    /// Extract and remove a message by op number.
+    fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry>;
+
     fn clear(&mut self);
 
+    fn message_by_op(&self, op: u64) -> Option<&Self::Entry>;
+
     fn message_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry>;
 
-    fn message_by_op_and_checksum(&mut self, op: u64, checksum: u128) -> 
Option<&mut Self::Entry>;
+    fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> 
Option<&Self::Entry>;
 
     fn is_full(&self) -> bool;
 
@@ -43,11 +47,11 @@ pub trait Pipeline {
     fn verify(&self);
 }
 
-pub trait Consensus {
+pub trait Consensus: Sized {
     type MessageBus: MessageBus;
     // I am wondering, whether we should create a dedicated trait for cloning, 
so it's explicit that we do ref counting.
-    type RequestMessage: Project<Self::ReplicateMessage, Consensus = Self> + 
Clone;
-    type ReplicateMessage: Project<Self::AckMessage, Consensus = Self> + Clone;
+    type RequestMessage: Project<Self::ReplicateMessage, Self> + Clone;
+    type ReplicateMessage: Project<Self::AckMessage, Self> + Clone;
     type AckMessage;
     type Sequencer: Sequencer;
     type Pipeline: Pipeline<Message = Self::ReplicateMessage>;
diff --git a/core/consensus/src/vsr_timeout.rs 
b/core/consensus/src/vsr_timeout.rs
index 140d40f02..d5cda482e 100644
--- a/core/consensus/src/vsr_timeout.rs
+++ b/core/consensus/src/vsr_timeout.rs
@@ -109,6 +109,7 @@ pub enum TimeoutKind {
 
 /// Manager for all VSR timeouts of a replica.
 #[allow(unused)]
+#[derive(Debug)]
 pub struct TimeoutManager {
     ping: Timeout,
     prepare: Timeout,
diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index b091bcd3d..07056416f 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -24,21 +24,19 @@ where
     type Header;
     type Entry;
 
-    fn entry(&self, header: &Self::Header) -> Option<&Self::Entry>;
-
-    fn previous_entry(&self, header: &Self::Header) -> Option<Self::Header>;
-
-    fn set_header_as_dirty(&self, header: &Self::Header);
+    fn header(&self, idx: usize) -> Option<&Self::Header>;
+    fn previous_header(&self, header: &Self::Header) -> Option<&Self::Header>;
 
     fn append(&self, entry: Self::Entry) -> impl Future<Output = ()>;
+    fn entry(&self, header: &Self::Header) -> impl Future<Output = 
Option<Self::Entry>>;
 }
 
 // TODO: Move to other crate.
 pub trait Storage {
     type Buffer;
 
-    fn write(&self, buf: Self::Buffer) -> usize;
-    fn read(&self, offset: u64, buffer: Self::Buffer) -> Self::Buffer;
+    fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>;
+    fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output 
= Self::Buffer>;
 }
 
 pub trait JournalHandle {
diff --git a/core/message_bus/src/cache/connection.rs 
b/core/message_bus/src/cache/connection.rs
index d07ac66ec..633dc2280 100644
--- a/core/message_bus/src/cache/connection.rs
+++ b/core/message_bus/src/cache/connection.rs
@@ -34,6 +34,7 @@ pub trait ShardedState {
 }
 
 /// Least-loaded allocation strategy for connections
+#[derive(Debug)]
 pub struct LeastLoadedStrategy {
     total_shards: usize,
     connections_per_shard: RefCell<Vec<(u16, usize)>>,
@@ -193,6 +194,7 @@ impl AllocationStrategy<ConnectionCache> for 
LeastLoadedStrategy {
 }
 
 /// Coordinator that wraps a strategy for a specific sharded state type
+#[derive(Debug)]
 pub struct Coordinator<A, SS>
 where
     SS: ShardedState,
@@ -223,6 +225,7 @@ where
     }
 }
 
+#[derive(Debug)]
 pub struct ShardedConnections<A, SS>
 where
     SS: ShardedState,
@@ -259,7 +262,7 @@ where
 }
 
 /// Cache for connection state per shard
-#[derive(Default)]
+#[derive(Debug, Default)]
 pub struct ConnectionCache {
     pub shard_id: u16,
     pub connections: HashMap<u8, Option<Rc<TcpSender>>>,
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index c578a1fb0..027fe9f56 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -27,8 +27,9 @@ pub trait MessageBus {
     type Client;
     type Replica;
     type Data;
+    type Sender;
 
-    fn add_client(&mut self, client: Self::Client, sender: SenderKind) -> bool;
+    fn add_client(&mut self, client: Self::Client, sender: Self::Sender) -> 
bool;
     fn remove_client(&mut self, client: Self::Client) -> bool;
 
     fn add_replica(&mut self, replica: Self::Replica) -> bool;
@@ -48,6 +49,7 @@ pub trait MessageBus {
 }
 
 // TODO: explore generics for Strategy
+#[derive(Debug)]
 pub struct IggyMessageBus {
     clients: HashMap<u128, SenderKind>,
     replicas: ShardedConnections<LeastLoadedStrategy, ConnectionCache>,
@@ -83,8 +85,9 @@ impl MessageBus for IggyMessageBus {
     type Client = u128;
     type Replica = u8;
     type Data = Message<GenericHeader>;
+    type Sender = SenderKind;
 
-    fn add_client(&mut self, client: Self::Client, sender: SenderKind) -> bool 
{
+    fn add_client(&mut self, client: Self::Client, sender: Self::Sender) -> 
bool {
         if self.clients.contains_key(&client) {
             return false;
         }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index c8c7fdf90..c4e4b3da3 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,61 +14,63 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+use crate::stm::StateMachine;
 use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
 use iggy_common::{
-    header::{Command2, PrepareHeader, PrepareOkHeader},
+    header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, 
ReplyHeader},
     message::Message,
 };
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
 use tracing::{debug, warn};
 
-#[expect(unused)]
 pub trait Metadata<C>
 where
     C: Consensus,
 {
     /// Handle a request message.
-    fn on_request(&self, message: C::RequestMessage);
+    fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = 
()>;
 
     /// Handle a replicate message (Prepare in VSR).
     fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output 
= ()>;
 
     /// Handle an ack message (PrepareOk in VSR).
-    fn on_ack(&self, message: C::AckMessage);
+    fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
 }
 
-#[expect(unused)]
-struct IggyMetadata<C, J, S, M> {
+#[derive(Debug)]
+pub struct IggyMetadata<C, J, S, M> {
     /// Some on shard0, None on other shards
-    consensus: Option<C>,
+    pub consensus: Option<C>,
     /// Some on shard0, None on other shards
-    journal: Option<J>,
+    pub journal: Option<J>,
     /// Some on shard0, None on other shards
-    snapshot: Option<S>,
+    pub snapshot: Option<S>,
     /// State machine - lives on all shards
-    mux_stm: M,
+    pub mux_stm: M,
 }
 
-impl<J, S, M> Metadata<VsrConsensus> for IggyMetadata<VsrConsensus, J, S, M>
+impl<B, J, S, M> Metadata<VsrConsensus<B>> for IggyMetadata<VsrConsensus<B>, 
J, S, M>
 where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     J: JournalHandle,
     J::Target: Journal<
             J::Storage,
-            Entry = <VsrConsensus as Consensus>::ReplicateMessage,
+            Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage,
             Header = PrepareHeader,
         >,
+    M: StateMachine<Input = Message<PrepareHeader>>,
 {
-    fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage) 
{
+    async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::RequestMessage) {
         let consensus = self.consensus.as_ref().unwrap();
 
         // TODO: Bunch of asserts.
         debug!("handling metadata request");
         let prepare = message.project(consensus);
-        self.pipeline_prepare(prepare);
+        self.pipeline_prepare(prepare).await;
     }
 
-    async fn on_replicate(&self, message: <VsrConsensus as 
Consensus>::ReplicateMessage) {
+    async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::ReplicateMessage) {
         let consensus = self.consensus.as_ref().unwrap();
         let journal = self.journal.as_ref().unwrap();
 
@@ -93,20 +95,6 @@ where
 
         let current_op = consensus.sequencer().current_sequence();
 
-        // Old message (handle as repair). Not replicating.
-        if header.view < consensus.view()
-            || (consensus.status() == Status::Normal
-                && header.view == consensus.view()
-                && header.op <= current_op)
-        {
-            debug!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (repair)"
-            );
-            self.on_repair(message);
-            return;
-        }
-
         // If status is not normal, ignore the replicate.
         if consensus.status() != Status::Normal {
             warn!(
@@ -137,14 +125,13 @@ where
         // TODO handle gap in ops.
 
         // Verify hash chain integrity.
-        if let Some(previous) = journal.handle().previous_entry(header) {
-            self.panic_if_hash_chain_would_break_in_same_view(&previous, 
header);
+        if let Some(previous) = journal.handle().previous_header(header) {
+            self.panic_if_hash_chain_would_break_in_same_view(previous, 
header);
         }
 
         assert_eq!(header.op, current_op + 1);
 
         consensus.sequencer().set_sequence(header.op);
-        journal.handle().set_header_as_dirty(header);
 
         // Append to journal.
         journal.handle().append(message.clone()).await;
@@ -158,7 +145,7 @@ where
         }
     }
 
-    fn on_ack(&self, message: <VsrConsensus as Consensus>::AckMessage) {
+    async fn on_ack(&self, message: <VsrConsensus<B> as 
Consensus>::AckMessage) {
         let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
 
@@ -172,63 +159,117 @@ where
             return;
         }
 
-        // Find the prepare in pipeline
-        let mut pipeline = consensus.pipeline().borrow_mut();
-        let Some(entry) = pipeline.message_by_op_and_checksum(header.op, 
header.prepare_checksum)
-        else {
-            debug!("on_ack: prepare not in pipeline op={}", header.op);
-            return;
-        };
-
-        // Verify checksum matches
-        if entry.message.header().checksum != header.prepare_checksum {
-            warn!("on_ack: checksum mismatch");
-            return;
+        // Verify checksum by checking pipeline entry exists
+        {
+            let pipeline = consensus.pipeline().borrow();
+            let Some(entry) =
+                pipeline.message_by_op_and_checksum(header.op, 
header.prepare_checksum)
+            else {
+                debug!("on_ack: prepare not in pipeline op={}", header.op);
+                return;
+            };
+
+            if entry.message.header().checksum != header.prepare_checksum {
+                warn!("on_ack: checksum mismatch");
+                return;
+            }
         }
 
-        // Record ack
-        let count = entry.add_ack(header.replica);
-
-        // Check quorum
-        if count >= consensus.quorum() && !entry.ok_quorum_received {
-            entry.ok_quorum_received = true;
+        // Let consensus handle the ack increment and quorum check
+        if consensus.handle_prepare_ok(header) {
             debug!("on_ack: quorum received for op={}", header.op);
-
-            // Advance commit number and trigger commit journal
             consensus.advance_commit_number(header.op);
-            self.commit_journal();
+
+            // Extract the prepare message from the pipeline by op
+            // TODO: Commit from the head. ALWAYS
+            let entry = 
consensus.pipeline().borrow_mut().extract_by_op(header.op);
+            let Some(entry) = entry else {
+                warn!("on_ack: prepare not found in pipeline for op={}", 
header.op);
+                return;
+            };
+
+            let prepare = entry.message;
+            let prepare_header = *prepare.header();
+
+            // Apply the state (consumes prepare)
+            // TODO: Handle appending result to response
+            let _result = self.mux_stm.update(prepare);
+            debug!("on_ack: state applied for op={}", prepare_header.op);
+
+            // TODO: Figure out better infra for this, its messy.
+            let reply = 
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>())
+                .transmute_header(|_, new| {
+                    *new = ReplyHeader {
+                        checksum: 0,
+                        checksum_body: 0,
+                        cluster: consensus.cluster(),
+                        size: std::mem::size_of::<ReplyHeader>() as u32,
+                        epoch: prepare_header.epoch,
+                        view: consensus.view(),
+                        release: 0,
+                        protocol: 0,
+                        command: Command2::Reply,
+                        replica: consensus.replica(),
+                        reserved_frame: [0; 12],
+                        request_checksum: prepare_header.request_checksum,
+                        request_checksum_padding: 0,
+                        context: 0,
+                        context_padding: 0,
+                        op: prepare_header.op,
+                        commit: consensus.commit(),
+                        timestamp: prepare_header.timestamp,
+                        request: prepare_header.request,
+                        operation: prepare_header.operation,
+                        ..Default::default()
+                    };
+                });
+
+            // Send reply to client
+            let generic_reply = reply.into_generic();
+            debug!(
+                "on_ack: sending reply to client={} for op={}",
+                prepare_header.client, prepare_header.op
+            );
+
+            // TODO: Error handling
+            consensus
+                .message_bus()
+                .send_to_client(prepare_header.client, generic_reply)
+                .await
+                .unwrap()
         }
     }
 }
 
-impl<J, S, M> IggyMetadata<VsrConsensus, J, S, M>
+impl<B, J, S, M> IggyMetadata<VsrConsensus<B>, J, S, M>
 where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     J: JournalHandle,
     J::Target: Journal<
             J::Storage,
-            Entry = <VsrConsensus as Consensus>::ReplicateMessage,
+            Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage,
             Header = PrepareHeader,
         >,
+    M: StateMachine<Input = Message<PrepareHeader>>,
 {
-    #[expect(unused)]
-    fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
+    async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
 
         debug!("inserting prepare into metadata pipeline");
         consensus.verify_pipeline();
         consensus.pipeline_message(prepare.clone());
 
-        self.on_replicate(prepare.clone());
+        self.on_replicate(prepare.clone()).await;
         consensus.post_replicate_verify(&prepare);
     }
 
     fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool {
-        let (Some(consensus), Some(journal)) = (&self.consensus, 
&self.journal) else {
-            todo!("dispatch fence_old_prepare to shard0");
-        };
+        let consensus = self.consensus.as_ref().unwrap();
+        let journal = self.journal.as_ref().unwrap();
 
         let header = prepare.header();
-        header.op <= consensus.commit() || 
journal.handle().entry(header).is_some()
+        // TODO: Handle idx calculation, for now using header.op, but since 
the journal may get compacted, this may not be correct.
+        header.op <= consensus.commit() || journal.handle().header(header.op 
as usize).is_some()
     }
 
     /// Replicate a prepare message to the next replica in the chain.
@@ -243,9 +284,11 @@ where
 
         let header = message.header();
 
+        // TODO: calculate the index;
+        let idx = header.op as usize;
         assert_eq!(header.command, Command2::Prepare);
         assert!(
-            journal.handle().entry(header).is_none(),
+            journal.handle().header(idx).is_none(),
             "replicate: must not already have prepare"
         );
         assert!(header.op > consensus.commit());
@@ -279,10 +322,6 @@ where
             .unwrap();
     }
 
-    fn on_repair(&self, _message: Message<PrepareHeader>) {
-        todo!()
-    }
-
     /// Verify hash chain would not break if we add this header.
     fn panic_if_hash_chain_would_break_in_same_view(
         &self,
@@ -306,7 +345,6 @@ where
         // TODO: Implement commit logic
         // Walk through journal from last committed to current commit number
         // Apply each entry to the state machine
-        todo!()
     }
 
     /// Send a prepare_ok message to the primary.
@@ -335,7 +373,7 @@ where
         }
 
         // Verify we have the prepare and it's persisted (not dirty).
-        if journal.handle().entry(header).is_none() {
+        if journal.handle().header(header.op as usize).is_none() {
             debug!(
                 replica = consensus.replica(),
                 op = header.op,
@@ -398,6 +436,12 @@ where
                 "send_prepare_ok: loopback to self"
             );
             // TODO: Queue for self-processing or call handle_prepare_ok 
directly
+            // TODO: This is temporal, to test simulator, but we should send 
message to ourselves properly.
+            consensus
+                .message_bus()
+                .send_to_replica(primary, generic_message)
+                .await
+                .unwrap();
         } else {
             debug!(
                 replica = consensus.replica(),
diff --git a/core/metadata/src/lib.rs b/core/metadata/src/lib.rs
index 824cf614f..08c137fff 100644
--- a/core/metadata/src/lib.rs
+++ b/core/metadata/src/lib.rs
@@ -17,8 +17,14 @@
 
 //! Iggy metadata module
 
-mod impls;
+pub mod impls;
 pub mod permissioner;
 pub mod stm;
 
 mod stats;
+
+// Re-export IggyMetadata and Metadata trait for use in other modules
+pub use impls::metadata::{IggyMetadata, Metadata};
+
+// Re-export MuxStateMachine for use in other modules
+pub use stm::mux::MuxStateMachine;
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 9dfb44852..c92b0ca10 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -31,6 +31,15 @@ where
     inner: UnsafeCell<WriteHandle<T, O>>,
 }
 
+impl<T, O> std::fmt::Debug for WriteCell<T, O>
+where
+    T: Absorb<O>,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("WriteCell").finish_non_exhaustive()
+    }
+}
+
 impl<T, O> WriteCell<T, O>
 where
     T: Absorb<O>,
@@ -53,11 +62,12 @@ where
 }
 
 /// Parses type-erased input into a command. Macro-generated.
+/// Returns `Ok(cmd)` if applicable, `Err(input)` to pass ownership back.
 pub trait Command {
     type Cmd;
     type Input;
 
-    fn parse(input: &Self::Input) -> Option<Self::Cmd>;
+    fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>;
 }
 
 /// Handles commands. User-implemented business logic.
@@ -65,6 +75,7 @@ pub trait Handler: Command {
     fn handle(&mut self, cmd: &Self::Cmd);
 }
 
+#[derive(Debug)]
 pub struct LeftRight<T, C>
 where
     T: Absorb<C>,
@@ -100,17 +111,18 @@ where
 }
 
 /// Public interface for state machines.
+/// Returns `Ok(output)` if applicable, `Err(input)` to pass ownership back.
 pub trait State {
     type Output;
     type Input;
 
-    fn apply(&self, input: &Self::Input) -> Option<Self::Output>;
+    fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input>;
 }
 
 pub trait StateMachine {
     type Input;
     type Output;
-    fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>);
+    fn update(&self, input: Self::Input) -> Self::Output;
 }
 
 /// Generates a state machine with convention-based storage.
@@ -180,6 +192,7 @@ macro_rules! define_state {
                 )*
             }
 
+            #[derive(Debug)]
             pub struct $state {
                 inner: $crate::stm::LeftRight<[<$state Inner>], [<$state 
Command>]>,
             }
@@ -201,9 +214,10 @@ macro_rules! define_state {
                 type Input = <[<$state Inner>] as $crate::stm::Command>::Input;
                 type Output = ();
 
-                fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
-                    <[<$state Inner>] as $crate::stm::Command>::parse(input)
-                        .map(|cmd| self.inner.do_apply(cmd))
+                fn apply(&self, input: Self::Input) -> Result<Self::Output, 
Self::Input> {
+                    let cmd = <[<$state Inner>] as 
$crate::stm::Command>::parse(input)?;
+                    self.inner.do_apply(cmd);
+                    Ok(())
                 }
             }
 
@@ -211,20 +225,20 @@ macro_rules! define_state {
                 type Cmd = [<$state Command>];
                 type Input = 
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
 
-                fn parse(input: &Self::Input) -> Option<Self::Cmd> {
+                fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input> 
{
                     use ::iggy_common::BytesSerializable;
                     use ::iggy_common::header::Operation;
 
-                    let body = input.body_bytes();
                     match input.header().operation {
                         $(
                             Operation::$operation => {
-                                Some([<$state Command>]::$operation(
+                                let body = input.body_bytes();
+                                Ok([<$state Command>]::$operation(
                                     $operation::from_bytes(body).unwrap()
                                 ))
                             },
                         )*
-                        _ => None,
+                        _ => Err(input),
                     }
                 }
             }
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 979ff3ccf..f68d99bf7 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -20,6 +20,7 @@ use iggy_common::{header::PrepareHeader, message::Message};
 use crate::stm::{State, StateMachine};
 
 // MuxStateMachine that proxies to an tuple of variadic state machines
+#[derive(Debug)]
 pub struct MuxStateMachine<T>
 where
     T: StateMachine,
@@ -43,8 +44,8 @@ where
     type Input = T::Input;
     type Output = T::Output;
 
-    fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>) {
-        self.inner.update(input, output);
+    fn update(&self, input: Self::Input) -> Self::Output {
+        self.inner.update(input)
     }
 }
 
@@ -66,12 +67,14 @@ macro_rules! variadic {
 impl StateMachine for () {
     type Input = Message<PrepareHeader>;
     // TODO: Make sure that the `Output` matches to the output type of the 
rest of list.
+    // TODO: Add a trait bound to the output that will allow us to get the 
response in bytes.
     type Output = ();
 
-    fn update(&self, _input: &Self::Input, _output: &mut Vec<Self::Output>) {}
+    fn update(&self, _input: Self::Input) -> Self::Output {}
 }
 
 // Recursive case: process head and recurse on tail
+// No Clone bound needed - ownership passes through via Result
 impl<O, S, Rest> StateMachine for variadic!(S, ...Rest)
 where
     S: State<Output = O>,
@@ -80,11 +83,11 @@ where
     type Input = Rest::Input;
     type Output = O;
 
-    fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>) {
-        if let Some(result) = self.0.apply(input) {
-            output.push(result);
+    fn update(&self, input: Self::Input) -> Self::Output {
+        match self.0.apply(input) {
+            Ok(result) => result,
+            Err(input) => self.1.update(input),
         }
-        self.1.update(input, output)
     }
 }
 
@@ -104,8 +107,7 @@ mod tests {
         let mux = MuxStateMachine::new(variadic!(users, streams));
 
         let input = Message::new(std::mem::size_of::<PrepareHeader>());
-        let mut output = Vec::new();
 
-        mux.update(&input, &mut output);
+        mux.update(input);
     }
 }
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 5106d6122..d1a3da212 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -182,6 +182,7 @@ define_state! {
         DeletePartitions
     ]
 }
+
 impl_absorb!(StreamsInner, StreamsCommand);
 
 impl StreamsInner {
diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
new file mode 100644
index 000000000..e75769201
--- /dev/null
+++ b/core/simulator/Cargo.toml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "simulator"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+bytemuck = { workspace = true }
+bytes = { workspace = true }
+consensus = { path = "../consensus" }
+futures = { workspace = true }
+iggy_common = { path = "../common" }
+journal = { path = "../journal" }
+message_bus = { path = "../message_bus" }
+metadata = { path = "../metadata" }
diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs
new file mode 100644
index 000000000..b4dc1e52a
--- /dev/null
+++ b/core/simulator/src/bus.rs
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::{IggyError, header::GenericHeader, message::Message};
+use message_bus::MessageBus;
+use std::collections::{HashMap, VecDeque};
+use std::ops::Deref;
+use std::sync::{Arc, Mutex};
+
+/// Message envelope for tracking sender/recipient
+#[derive(Debug, Clone)]
+pub struct Envelope {
+    pub from_replica: Option<u8>,
+    pub to_replica: Option<u8>,
+    pub to_client: Option<u128>,
+    pub message: Message<GenericHeader>,
+}
+
+// TODO: Proper bus with an `Network` component which would simulate sending 
packets.
+// Tigerbeetle handles this by having an list of "buses", and calling 
callbacks for clients when an response is send.
+// This requires self-referntial structs (as message_bus has to store 
collection of other buses), which is overcomplilcated.
+// I think the way we could handle that is by having an dedicated collection 
for client responses (clients_table).
+#[derive(Debug, Default)]
+pub struct MemBus {
+    clients: Mutex<HashMap<u128, ()>>,
+    replicas: Mutex<HashMap<u8, ()>>,
+    pending_messages: Mutex<VecDeque<Envelope>>,
+}
+
+impl MemBus {
+    pub fn new() -> Self {
+        Self {
+            clients: Mutex::new(HashMap::new()),
+            replicas: Mutex::new(HashMap::new()),
+            pending_messages: Mutex::new(VecDeque::new()),
+        }
+    }
+
+    /// Get the next pending message from the bus
+    pub fn receive(&self) -> Option<Envelope> {
+        self.pending_messages.lock().unwrap().pop_front()
+    }
+}
+
+impl MessageBus for MemBus {
+    type Client = u128;
+    type Replica = u8;
+    type Data = Message<GenericHeader>;
+    type Sender = ();
+
+    fn add_client(&mut self, client: Self::Client, _sender: Self::Sender) -> 
bool {
+        if self.clients.lock().unwrap().contains_key(&client) {
+            return false;
+        }
+        self.clients.lock().unwrap().insert(client, ());
+        true
+    }
+
+    fn remove_client(&mut self, client: Self::Client) -> bool {
+        self.clients.lock().unwrap().remove(&client).is_some()
+    }
+
+    fn add_replica(&mut self, replica: Self::Replica) -> bool {
+        if self.replicas.lock().unwrap().contains_key(&replica) {
+            return false;
+        }
+        self.replicas.lock().unwrap().insert(replica, ());
+        true
+    }
+
+    fn remove_replica(&mut self, replica: Self::Replica) -> bool {
+        self.replicas.lock().unwrap().remove(&replica).is_some()
+    }
+
+    async fn send_to_client(
+        &self,
+        client_id: Self::Client,
+        message: Self::Data,
+    ) -> Result<(), IggyError> {
+        if !self.clients.lock().unwrap().contains_key(&client_id) {
+            return Err(IggyError::ClientNotFound(client_id as u32));
+        }
+
+        self.pending_messages.lock().unwrap().push_back(Envelope {
+            from_replica: None,
+            to_replica: None,
+            to_client: Some(client_id),
+            message,
+        });
+
+        Ok(())
+    }
+
+    async fn send_to_replica(
+        &self,
+        replica: Self::Replica,
+        message: Self::Data,
+    ) -> Result<(), IggyError> {
+        if !self.replicas.lock().unwrap().contains_key(&replica) {
+            return Err(IggyError::ResourceNotFound(format!("Replica {}", 
replica)));
+        }
+
+        self.pending_messages.lock().unwrap().push_back(Envelope {
+            from_replica: None,
+            to_replica: Some(replica),
+            to_client: None,
+            message,
+        });
+
+        Ok(())
+    }
+}
+
+/// Newtype wrapper for shared MemBus that implements MessageBus
+#[derive(Debug, Clone)]
+pub struct SharedMemBus(pub Arc<MemBus>);
+
+impl Deref for SharedMemBus {
+    type Target = MemBus;
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl MessageBus for SharedMemBus {
+    type Client = u128;
+    type Replica = u8;
+    type Data = Message<GenericHeader>;
+    type Sender = ();
+
+    fn add_client(&mut self, client: Self::Client, sender: Self::Sender) -> 
bool {
+        self.0
+            .clients
+            .lock()
+            .unwrap()
+            .insert(client, sender)
+            .is_none()
+    }
+
+    fn remove_client(&mut self, client: Self::Client) -> bool {
+        self.0.clients.lock().unwrap().remove(&client).is_some()
+    }
+
+    fn add_replica(&mut self, replica: Self::Replica) -> bool {
+        self.0
+            .replicas
+            .lock()
+            .unwrap()
+            .insert(replica, ())
+            .is_none()
+    }
+
+    fn remove_replica(&mut self, replica: Self::Replica) -> bool {
+        self.0.replicas.lock().unwrap().remove(&replica).is_some()
+    }
+
+    async fn send_to_client(
+        &self,
+        client_id: Self::Client,
+        message: Self::Data,
+    ) -> Result<(), IggyError> {
+        self.0.send_to_client(client_id, message).await
+    }
+
+    async fn send_to_replica(
+        &self,
+        replica: Self::Replica,
+        message: Self::Data,
+    ) -> Result<(), IggyError> {
+        self.0.send_to_replica(replica, message).await
+    }
+}
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
new file mode 100644
index 000000000..b2f57c152
--- /dev/null
+++ b/core/simulator/src/client.rs
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::{
+    BytesSerializable, Identifier,
+    create_stream::CreateStream,
+    delete_stream::DeleteStream,
+    header::{Operation, RequestHeader},
+    message::Message,
+};
+use std::cell::Cell;
+
+// TODO: Proper client which implements the full client SDK API
+pub struct SimClient {
+    client_id: u128,
+    request_counter: Cell<u64>,
+}
+
+impl SimClient {
+    pub fn new(client_id: u128) -> Self {
+        Self {
+            client_id,
+            request_counter: Cell::new(0),
+        }
+    }
+
+    fn next_request_number(&self) -> u64 {
+        let current = self.request_counter.get();
+        self.request_counter.set(current + 1);
+        current
+    }
+
+    pub fn create_stream(&self, name: &str) -> Message<RequestHeader> {
+        let create_stream = CreateStream {
+            name: name.to_string(),
+        };
+        let payload = create_stream.to_bytes();
+
+        self.build_request(Operation::CreateStream, payload)
+    }
+
+    pub fn delete_stream(&self, name: &str) -> Message<RequestHeader> {
+        let delete_stream = DeleteStream {
+            stream_id: Identifier::named(name).unwrap(),
+        };
+        let payload = delete_stream.to_bytes();
+
+        self.build_request(Operation::DeleteStream, payload)
+    }
+
+    fn build_request(&self, operation: Operation, payload: bytes::Bytes) -> 
Message<RequestHeader> {
+        use bytes::Bytes;
+
+        let header_size = std::mem::size_of::<RequestHeader>();
+        let total_size = header_size + payload.len();
+
+        let header = RequestHeader {
+            command: iggy_common::header::Command2::Request,
+            operation,
+            size: total_size as u32,
+            cluster: 0, // TODO: Get from config
+            checksum: 0,
+            checksum_body: 0,
+            epoch: 0,
+            view: 0,
+            release: 0,
+            protocol: 0,
+            replica: 0,
+            reserved_frame: [0; 12],
+            client: self.client_id,
+            request_checksum: 0,
+            timestamp: 0, // TODO: Use actual timestamp
+            request: self.next_request_number(),
+            ..Default::default()
+        };
+
+        let header_bytes = bytemuck::bytes_of(&header);
+        let mut buffer = Vec::with_capacity(total_size);
+        buffer.extend_from_slice(header_bytes);
+        buffer.extend_from_slice(&payload);
+
+        Message::<RequestHeader>::from_bytes(Bytes::from(buffer))
+            .expect("failed to build request message")
+    }
+}
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
new file mode 100644
index 000000000..3dbdac54f
--- /dev/null
+++ b/core/simulator/src/deps.rs
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::bus::SharedMemBus;
+use bytes::Bytes;
+use consensus::VsrConsensus;
+use iggy_common::header::PrepareHeader;
+use iggy_common::message::Message;
+use journal::{Journal, JournalHandle, Storage};
+use metadata::stm::consumer_group::ConsumerGroups;
+use metadata::stm::stream::Streams;
+use metadata::stm::user::Users;
+use metadata::{IggyMetadata, MuxStateMachine, variadic};
+use std::cell::{Cell, RefCell, UnsafeCell};
+use std::collections::HashMap;
+
+/// In-memory storage backend for testing/simulation
+#[derive(Debug, Default)]
+pub struct MemStorage {
+    data: RefCell<Vec<u8>>,
+    offset: Cell<u64>,
+}
+
+impl Storage for MemStorage {
+    type Buffer = Vec<u8>;
+
+    async fn write(&self, buf: Self::Buffer) -> usize {
+        let len = buf.len();
+        self.data.borrow_mut().extend_from_slice(&buf);
+        self.offset.set(self.offset.get() + len as u64);
+        len
+    }
+
+    async fn read(&self, offset: usize, mut buffer: Self::Buffer) -> 
Self::Buffer {
+        let data = self.data.borrow();
+        let end = offset + buffer.len();
+        if offset < data.len() && end <= data.len() {
+            buffer[..].copy_from_slice(&data[offset..end]);
+        }
+        buffer
+    }
+}
+
+// TODO: Replace with actual Journal, the only thing that we will need to 
change is the `Storage` impl for an in-memory one.
+/// Generic in-memory journal implementation for testing/simulation
+pub struct SimJournal<S: Storage> {
+    storage: S,
+    headers: UnsafeCell<HashMap<u64, PrepareHeader>>,
+    offsets: UnsafeCell<HashMap<u64, usize>>,
+}
+
+impl<S: Storage + Default> Default for SimJournal<S> {
+    fn default() -> Self {
+        Self {
+            storage: S::default(),
+            headers: UnsafeCell::new(HashMap::new()),
+            offsets: UnsafeCell::new(HashMap::new()),
+        }
+    }
+}
+
+impl<S: Storage> std::fmt::Debug for SimJournal<S> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("SimJournal")
+            .field("storage", &"<Storage>")
+            .field("headers", &"<UnsafeCell>")
+            .field("offsets", &"<UnsafeCell>")
+            .finish()
+    }
+}
+
+impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for SimJournal<S> {
+    type Header = PrepareHeader;
+    type Entry = Message<PrepareHeader>;
+
+    async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
+        let headers = unsafe { &*self.headers.get() };
+        let offsets = unsafe { &*self.offsets.get() };
+
+        let header = headers.get(&header.op)?;
+        let offset = *offsets.get(&header.op)?;
+
+        let buffer = vec![0; header.size as usize];
+        let buffer = self.storage.read(offset, buffer).await;
+        let message =
+            Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes 
should be valid");
+        Some(message)
+    }
+
+    fn previous_header(&self, header: &Self::Header) -> Option<&Self::Header> {
+        if header.op == 0 {
+            return None;
+        }
+        unsafe { &*self.headers.get() }.get(&(header.op - 1))
+    }
+
+    async fn append(&self, entry: Self::Entry) {
+        let header = *entry.header();
+        let message_bytes = entry.as_bytes();
+
+        let bytes_written = self.storage.write(message_bytes.to_vec()).await;
+
+        let current_offset = unsafe { &mut *self.offsets.get() }
+            .values()
+            .last()
+            .cloned()
+            .unwrap_or_default();
+
+        unsafe { &mut *self.headers.get() }.insert(header.op, header);
+        unsafe { &mut *self.offsets.get() }.insert(header.op, current_offset + 
bytes_written);
+    }
+
+    fn header(&self, idx: usize) -> Option<&Self::Header> {
+        let headers = unsafe { &*self.headers.get() };
+        headers.get(&(idx as u64))
+    }
+}
+
+impl JournalHandle for SimJournal<MemStorage> {
+    type Storage = MemStorage;
+    type Target = SimJournal<MemStorage>;
+
+    fn handle(&self) -> &Self::Target {
+        self
+    }
+}
+
+#[derive(Debug, Default)]
+pub struct SimSnapshot {}
+
+/// Type aliases for simulator metadata
+pub type SimMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)>;
+pub type SimMetadata = IggyMetadata<
+    VsrConsensus<SharedMemBus>,
+    SimJournal<MemStorage>,
+    SimSnapshot,
+    SimMuxStateMachine,
+>;
+
+#[derive(Debug, Default)]
+pub struct ReplicaPartitions {}
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
new file mode 100644
index 000000000..d1f64fc09
--- /dev/null
+++ b/core/simulator/src/lib.rs
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod bus;
+pub mod client;
+pub mod deps;
+pub mod replica;
+
+use bus::MemBus;
+use iggy_common::header::{GenericHeader, ReplyHeader};
+use iggy_common::message::{Message, MessageBag};
+use message_bus::MessageBus;
+use metadata::Metadata;
+use replica::Replica;
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub struct Simulator {
+    pub replicas: Vec<Replica>,
+    pub message_bus: Arc<MemBus>,
+}
+
+impl Simulator {
+    pub fn new(replica_count: usize, clients: impl Iterator<Item = u128>) -> 
Self {
+        let mut message_bus = MemBus::new();
+        for client in clients {
+            message_bus.add_client(client, ());
+        }
+
+        for i in 0..replica_count as u8 {
+            message_bus.add_replica(i);
+        }
+
+        let message_bus = Arc::new(message_bus);
+        let replicas = (0..replica_count)
+            .map(|i| {
+                Replica::new(
+                    i as u8,
+                    format!("replica-{}", i),
+                    Arc::clone(&message_bus),
+                    replica_count as u8,
+                )
+            })
+            .collect();
+
+        Self {
+            replicas,
+            message_bus,
+        }
+    }
+
+    pub fn with_message_bus(replica_count: usize, mut message_bus: MemBus) -> 
Self {
+        for i in 0..replica_count as u8 {
+            message_bus.add_replica(i);
+        }
+
+        let message_bus = Arc::new(message_bus);
+        let replicas = (0..replica_count)
+            .map(|i| {
+                Replica::new(
+                    i as u8,
+                    format!("replica-{}", i),
+                    Arc::clone(&message_bus),
+                    replica_count as u8,
+                )
+            })
+            .collect();
+
+        Self {
+            replicas,
+            message_bus,
+        }
+    }
+}
+
+impl Simulator {
+    pub async fn step(&self) -> Option<Message<ReplyHeader>> {
+        if let Some(envelope) = self.message_bus.receive() {
+            if let Some(_client_id) = envelope.to_client {
+                let reply: Message<ReplyHeader> = envelope
+                    .message
+                    .try_into_typed()
+                    .expect("invalid message, wrong command type for an client 
response");
+                return Some(reply);
+            }
+
+            if let Some(replica_id) = envelope.to_replica
+                && let Some(replica) = self.replicas.get(replica_id as usize)
+            {
+                self.dispatch_to_replica(replica, envelope.message).await;
+            }
+        }
+
+        None
+    }
+
+    async fn dispatch_to_replica(&self, replica: &Replica, message: 
Message<GenericHeader>) {
+        let message: MessageBag = message.into();
+        let operation = match &message {
+            MessageBag::Request(message) => message.header().operation,
+            MessageBag::Prepare(message) => message.header().operation,
+            MessageBag::PrepareOk(message) => message.header().operation,
+        } as u8;
+
+        if operation < 200 {
+            self.dispatch_to_metadata_on_replica(replica, message).await;
+        } else {
+            self.dispatch_to_partition_on_replica(replica, message);
+        }
+    }
+
+    async fn dispatch_to_metadata_on_replica(&self, replica: &Replica, 
message: MessageBag) {
+        match message {
+            MessageBag::Request(request) => {
+                replica.metadata.on_request(request).await;
+            }
+            MessageBag::Prepare(prepare) => {
+                replica.metadata.on_replicate(prepare).await;
+            }
+            MessageBag::PrepareOk(prepare_ok) => {
+                replica.metadata.on_ack(prepare_ok).await;
+            }
+        }
+    }
+
+    fn dispatch_to_partition_on_replica(&self, replica: &Replica, message: 
MessageBag) {
+        match message {
+            MessageBag::Request(request) => {
+                todo!(
+                    "dispatch request to partition replica {}: operation={:?}",
+                    replica.id,
+                    request.header().operation
+                );
+            }
+            MessageBag::Prepare(prepare) => {
+                todo!(
+                    "dispatch prepare to partition replica {}: operation={:?}",
+                    replica.id,
+                    prepare.header().operation
+                );
+            }
+            MessageBag::PrepareOk(prepare_ok) => {
+                todo!(
+                    "dispatch prepare_ok to partition replica {}: op={}",
+                    replica.id,
+                    prepare_ok.header().op
+                );
+            }
+        }
+    }
+}
diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs
new file mode 100644
index 000000000..995c717a3
--- /dev/null
+++ b/core/simulator/src/main.rs
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::header::ReplyHeader;
+use iggy_common::message::Message;
+use message_bus::MessageBus;
+use simulator::{Simulator, client::SimClient};
+use std::collections::VecDeque;
+use std::sync::{Arc, Mutex};
+
+/// Shared response queue for client replies
+#[derive(Default)]
+pub struct Responses {
+    queue: VecDeque<Message<ReplyHeader>>,
+}
+
+impl Responses {
+    pub fn push(&mut self, msg: Message<ReplyHeader>) {
+        self.queue.push_back(msg);
+    }
+
+    pub fn pop(&mut self) -> Option<Message<ReplyHeader>> {
+        self.queue.pop_front()
+    }
+}
+
+fn main() {
+    let client_id: u128 = 1;
+    let leader: u8 = 0;
+    let sim = Simulator::new(3, std::iter::once(client_id));
+    let bus = sim.message_bus.clone();
+
+    // Responses queue
+    let responses = Arc::new(Mutex::new(Responses::default()));
+    let responses_clone = responses.clone();
+
+    // TODO: Scuffed client/simulator setup.
+    // We need a better interface on simulator
+    let client_handle = std::thread::spawn(move || {
+        futures::executor::block_on(async {
+            let client = SimClient::new(client_id);
+
+            let create_msg = client.create_stream("test-stream");
+            bus.send_to_replica(leader, create_msg.into_generic())
+                .await
+                .expect("failed to send create_stream");
+
+            loop {
+                if let Some(reply) = responses_clone.lock().unwrap().pop() {
+                    println!("[client] Got create_stream reply: {:?}", 
reply.header());
+                    break;
+                }
+                std::thread::sleep(std::time::Duration::from_millis(1));
+            }
+
+            let delete_msg = client.delete_stream("test-stream");
+            bus.send_to_replica(leader, delete_msg.into_generic())
+                .await
+                .expect("failed to send delete_stream");
+
+            loop {
+                if let Some(reply) = responses_clone.lock().unwrap().pop() {
+                    println!("[client] Got delete_stream reply: {:?}", 
reply.header());
+                    break;
+                }
+                std::thread::sleep(std::time::Duration::from_millis(1));
+            }
+        });
+    });
+
+    println!("[sim] Starting simulator loop");
+    futures::executor::block_on(async {
+        loop {
+            if let Some(reply) = sim.step().await {
+                responses.lock().unwrap().push(reply);
+            }
+
+            if client_handle.is_finished() {
+                break;
+            }
+        }
+    });
+
+    client_handle.join().expect("client thread panicked");
+    println!("[sim] Simulator loop ended");
+}
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
new file mode 100644
index 000000000..74c26b72f
--- /dev/null
+++ b/core/simulator/src/replica.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::bus::{MemBus, SharedMemBus};
+use crate::deps::{
+    MemStorage, ReplicaPartitions, SimJournal, SimMetadata, 
SimMuxStateMachine, SimSnapshot,
+};
+use consensus::VsrConsensus;
+use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
+use metadata::stm::stream::{Streams, StreamsInner};
+use metadata::stm::user::{Users, UsersInner};
+use metadata::{IggyMetadata, variadic};
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub struct Replica {
+    pub id: u8,
+    pub name: String,
+    pub metadata: SimMetadata,
+    pub partitions: ReplicaPartitions,
+    pub bus: Arc<MemBus>,
+}
+
+impl Replica {
+    pub fn new(id: u8, name: String, bus: Arc<MemBus>, replica_count: u8) -> 
Self {
+        let users: Users = UsersInner::new().into();
+        let streams: Streams = StreamsInner::new().into();
+        let consumer_groups: ConsumerGroups = 
ConsumerGroupsInner::new().into();
+        let mux = SimMuxStateMachine::new(variadic!(users, streams, 
consumer_groups));
+
+        let cluster_id: u128 = 1; // TODO: Make configurable
+        let consensus = VsrConsensus::new(
+            cluster_id,
+            id,
+            replica_count,
+            SharedMemBus(Arc::clone(&bus)),
+        );
+        consensus.init();
+
+        Self {
+            id,
+            name,
+            metadata: IggyMetadata {
+                consensus: Some(consensus),
+                journal: Some(SimJournal::<MemStorage>::default()),
+                snapshot: Some(SimSnapshot::default()),
+                mux_stm: mux,
+            },
+            partitions: ReplicaPartitions::default(),
+            bus,
+        }
+    }
+}

Reply via email to