This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch simulator
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit ea951fbdaf0cb501f49ec4911a9a091050d61b0a
Author: numinex <[email protected]>
AuthorDate: Tue Feb 3 19:32:33 2026 +0100

    top
---
 Cargo.lock                                 |  12 +
 Cargo.toml                                 |   1 +
 core/common/src/sender/mod.rs              |   1 +
 core/common/src/types/consensus/message.rs |  11 +-
 core/consensus/src/impls.rs                |   4 +
 core/consensus/src/vsr_timeout.rs          |   1 +
 core/journal/src/lib.rs                    |  12 +-
 core/message_bus/src/cache/connection.rs   |   5 +-
 core/message_bus/src/lib.rs                |   7 +-
 core/metadata/src/impls/metadata.rs        |  23 +-
 core/metadata/src/lib.rs                   |   8 +-
 core/metadata/src/stm/mod.rs               |  13 +
 core/metadata/src/stm/mux.rs               |   1 +
 core/simulator/Cargo.toml                  |  12 +
 core/simulator/src/bus.rs                  | 120 ++++++
 core/simulator/src/deps.rs                 | 138 +++++++
 core/simulator/src/lib.rs                  | 584 +++++++++++++++++++++++++++++
 core/simulator/src/main.rs                 |   9 +
 core/simulator/src/replica.rs              |  41 ++
 19 files changed, 980 insertions(+), 23 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index f2806dd02..faad05d5b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8449,6 +8449,18 @@ dependencies = [
  "time",
 ]
 
+[[package]]
+name = "simulator"
+version = "0.1.0"
+dependencies = [
+ "bytes",
+ "consensus",
+ "iggy_common",
+ "journal",
+ "message_bus",
+ "metadata",
+]
+
 [[package]]
 name = "siphasher"
 version = "1.0.1"
diff --git a/Cargo.toml b/Cargo.toml
index 0be7d8a9c..082b02b0e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -49,6 +49,7 @@ members = [
     "core/partitions",
     "core/sdk",
     "core/server",
+    "core/simulator",
     "core/tools",
     "examples/rust",
 ]
diff --git a/core/common/src/sender/mod.rs b/core/common/src/sender/mod.rs
index 27c98d890..07ca24c40 100644
--- a/core/common/src/sender/mod.rs
+++ b/core/common/src/sender/mod.rs
@@ -83,6 +83,7 @@ pub trait Sender {
 }
 
 #[allow(clippy::large_enum_variant)]
+#[derive(Debug)]
 pub enum SenderKind {
     Tcp(TcpSender),
     TcpTls(TcpTlsSender),
diff --git a/core/common/src/types/consensus/message.rs 
b/core/common/src/types/consensus/message.rs
index ac32cc3c8..a9fca6524 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::types::consensus::header::{
+use crate::{header::RequestHeader, types::consensus::header::{
     self, CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, 
PrepareOkHeader, ReplyHeader,
-};
+}};
 use bytes::Bytes;
 use std::marker::PhantomData;
 
@@ -278,6 +278,7 @@ where
 #[derive(Debug)]
 #[allow(unused)]
 pub enum MessageBag {
+    Request(Message<RequestHeader>),
     Generic(Message<GenericHeader>),
     Prepare(Message<PrepareHeader>),
     PrepareOk(Message<PrepareOkHeader>),
@@ -289,6 +290,7 @@ impl MessageBag {
     #[allow(unused)]
     pub fn command(&self) -> header::Command2 {
         match self {
+            MessageBag::Request(message) => message.header().command,
             MessageBag::Generic(message) => message.header().command,
             MessageBag::Prepare(message) => message.header().command,
             MessageBag::PrepareOk(message) => message.header().command,
@@ -300,6 +302,7 @@ impl MessageBag {
     #[allow(unused)]
     pub fn size(&self) -> u32 {
         match self {
+            MessageBag::Request(message) => message.header().size(),
             MessageBag::Generic(message) => message.header().size(),
             MessageBag::Prepare(message) => message.header().size(),
             MessageBag::PrepareOk(message) => message.header().size(),
@@ -333,6 +336,10 @@ where
             header::Command2::Reply => {
                 let msg = unsafe { 
Message::<header::ReplyHeader>::from_buffer_unchecked(buffer) };
                 MessageBag::Reply(msg)
+            },
+            header::Command2::Request => {
+                let msg = unsafe { 
Message::<header::RequestHeader>::from_buffer_unchecked(buffer) };
+                MessageBag::Request(msg)
             }
             _ => unreachable!(
                 "For now we only support Prepare, Commit, and Reply. In the 
future we will support more commands. Command2: {command:?}"
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 154a6c3a4..34b08e654 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -43,6 +43,7 @@ pub trait Sequencer {
     fn set_sequence(&self, sequence: Self::Sequence);
 }
 
+#[derive(Debug)]
 pub struct LocalSequencer {
     op: Cell<u64>,
 }
@@ -82,6 +83,7 @@ pub const PIPELINE_PREPARE_QUEUE_MAX: usize = 8;
 /// Maximum number of replicas in a cluster.
 pub const REPLICAS_MAX: usize = 32;
 
+#[derive(Debug)]
 pub struct PipelineEntry {
     pub message: Message<PrepareHeader>,
     /// Bitmap of replicas that have acknowledged this prepare.
@@ -133,6 +135,7 @@ impl RequestEntry {
     }
 }
 
+#[derive(Debug)]
 pub struct LocalPipeline {
     /// Messages being prepared (uncommitted and being replicated).
     prepare_queue: VecDeque<PipelineEntry>,
@@ -386,6 +389,7 @@ pub enum VsrAction {
 }
 
 #[allow(unused)]
+#[derive(Debug)]
 pub struct VsrConsensus {
     cluster: u128,
     replica: u8,
diff --git a/core/consensus/src/vsr_timeout.rs 
b/core/consensus/src/vsr_timeout.rs
index 140d40f02..d5cda482e 100644
--- a/core/consensus/src/vsr_timeout.rs
+++ b/core/consensus/src/vsr_timeout.rs
@@ -109,6 +109,7 @@ pub enum TimeoutKind {
 
 /// Manager for all VSR timeouts of a replica.
 #[allow(unused)]
+#[derive(Debug)]
 pub struct TimeoutManager {
     ping: Timeout,
     prepare: Timeout,
diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index b091bcd3d..07056416f 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -24,21 +24,19 @@ where
     type Header;
     type Entry;
 
-    fn entry(&self, header: &Self::Header) -> Option<&Self::Entry>;
-
-    fn previous_entry(&self, header: &Self::Header) -> Option<Self::Header>;
-
-    fn set_header_as_dirty(&self, header: &Self::Header);
+    fn header(&self, idx: usize) -> Option<&Self::Header>;
+    fn previous_header(&self, header: &Self::Header) -> Option<&Self::Header>;
 
     fn append(&self, entry: Self::Entry) -> impl Future<Output = ()>;
+    fn entry(&self, header: &Self::Header) -> impl Future<Output = 
Option<Self::Entry>>;
 }
 
 // TODO: Move to other crate.
 pub trait Storage {
     type Buffer;
 
-    fn write(&self, buf: Self::Buffer) -> usize;
-    fn read(&self, offset: u64, buffer: Self::Buffer) -> Self::Buffer;
+    fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>;
+    fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output 
= Self::Buffer>;
 }
 
 pub trait JournalHandle {
diff --git a/core/message_bus/src/cache/connection.rs 
b/core/message_bus/src/cache/connection.rs
index d07ac66ec..633dc2280 100644
--- a/core/message_bus/src/cache/connection.rs
+++ b/core/message_bus/src/cache/connection.rs
@@ -34,6 +34,7 @@ pub trait ShardedState {
 }
 
 /// Least-loaded allocation strategy for connections
+#[derive(Debug)]
 pub struct LeastLoadedStrategy {
     total_shards: usize,
     connections_per_shard: RefCell<Vec<(u16, usize)>>,
@@ -193,6 +194,7 @@ impl AllocationStrategy<ConnectionCache> for 
LeastLoadedStrategy {
 }
 
 /// Coordinator that wraps a strategy for a specific sharded state type
+#[derive(Debug)]
 pub struct Coordinator<A, SS>
 where
     SS: ShardedState,
@@ -223,6 +225,7 @@ where
     }
 }
 
+#[derive(Debug)]
 pub struct ShardedConnections<A, SS>
 where
     SS: ShardedState,
@@ -259,7 +262,7 @@ where
 }
 
 /// Cache for connection state per shard
-#[derive(Default)]
+#[derive(Debug, Default)]
 pub struct ConnectionCache {
     pub shard_id: u16,
     pub connections: HashMap<u8, Option<Rc<TcpSender>>>,
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index c578a1fb0..027fe9f56 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -27,8 +27,9 @@ pub trait MessageBus {
     type Client;
     type Replica;
     type Data;
+    type Sender;
 
-    fn add_client(&mut self, client: Self::Client, sender: SenderKind) -> bool;
+    fn add_client(&mut self, client: Self::Client, sender: Self::Sender) -> 
bool;
     fn remove_client(&mut self, client: Self::Client) -> bool;
 
     fn add_replica(&mut self, replica: Self::Replica) -> bool;
@@ -48,6 +49,7 @@ pub trait MessageBus {
 }
 
 // TODO: explore generics for Strategy
+#[derive(Debug)]
 pub struct IggyMessageBus {
     clients: HashMap<u128, SenderKind>,
     replicas: ShardedConnections<LeastLoadedStrategy, ConnectionCache>,
@@ -83,8 +85,9 @@ impl MessageBus for IggyMessageBus {
     type Client = u128;
     type Replica = u8;
     type Data = Message<GenericHeader>;
+    type Sender = SenderKind;
 
-    fn add_client(&mut self, client: Self::Client, sender: SenderKind) -> bool 
{
+    fn add_client(&mut self, client: Self::Client, sender: Self::Sender) -> 
bool {
         if self.clients.contains_key(&client) {
             return false;
         }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index c8c7fdf90..5932c3e09 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -39,15 +39,16 @@ where
 }
 
 #[expect(unused)]
-struct IggyMetadata<C, J, S, M> {
+#[derive(Debug)]
+pub struct IggyMetadata<C, J, S, M> {
     /// Some on shard0, None on other shards
-    consensus: Option<C>,
+    pub consensus: Option<C>,
     /// Some on shard0, None on other shards
-    journal: Option<J>,
+    pub journal: Option<J>,
     /// Some on shard0, None on other shards
-    snapshot: Option<S>,
+    pub snapshot: Option<S>,
     /// State machine - lives on all shards
-    mux_stm: M,
+    pub mux_stm: M,
 }
 
 impl<J, S, M> Metadata<VsrConsensus> for IggyMetadata<VsrConsensus, J, S, M>
@@ -137,14 +138,13 @@ where
         // TODO handle gap in ops.
 
         // Verify hash chain integrity.
-        if let Some(previous) = journal.handle().previous_entry(header) {
+        if let Some(previous) = journal.handle().previous_header(header) {
             self.panic_if_hash_chain_would_break_in_same_view(&previous, 
header);
         }
 
         assert_eq!(header.op, current_op + 1);
 
         consensus.sequencer().set_sequence(header.op);
-        journal.handle().set_header_as_dirty(header);
 
         // Append to journal.
         journal.handle().append(message.clone()).await;
@@ -228,7 +228,8 @@ where
         };
 
         let header = prepare.header();
-        header.op <= consensus.commit() || 
journal.handle().entry(header).is_some()
+        // TODO: Handle idx calculation, for now using header.op, but since 
the journal may get compacted, this may not be correct.
+        header.op <= consensus.commit() || journal.handle().header(header.op 
as usize).is_some()
     }
 
     /// Replicate a prepare message to the next replica in the chain.
@@ -243,9 +244,11 @@ where
 
         let header = message.header();
 
+        // TODO: calculate the index;
+        let idx= header.op as usize;
         assert_eq!(header.command, Command2::Prepare);
         assert!(
-            journal.handle().entry(header).is_none(),
+            journal.handle().header(idx).is_none(),
             "replicate: must not already have prepare"
         );
         assert!(header.op > consensus.commit());
@@ -335,7 +338,7 @@ where
         }
 
         // Verify we have the prepare and it's persisted (not dirty).
-        if journal.handle().entry(header).is_none() {
+        if journal.handle().header(header.op as usize).is_none() {
             debug!(
                 replica = consensus.replica(),
                 op = header.op,
diff --git a/core/metadata/src/lib.rs b/core/metadata/src/lib.rs
index 824cf614f..08c137fff 100644
--- a/core/metadata/src/lib.rs
+++ b/core/metadata/src/lib.rs
@@ -17,8 +17,14 @@
 
 //! Iggy metadata module
 
-mod impls;
+pub mod impls;
 pub mod permissioner;
 pub mod stm;
 
 mod stats;
+
+// Re-export IggyMetadata and Metadata trait for use in other modules
+pub use impls::metadata::{IggyMetadata, Metadata};
+
+// Re-export MuxStateMachine for use in other modules
+pub use stm::mux::MuxStateMachine;
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 9dfb44852..e26651c05 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -31,6 +31,15 @@ where
     inner: UnsafeCell<WriteHandle<T, O>>,
 }
 
+impl<T, O> std::fmt::Debug for WriteCell<T, O>
+where
+    T: Absorb<O>,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("WriteCell").finish_non_exhaustive()
+    }
+}
+
 impl<T, O> WriteCell<T, O>
 where
     T: Absorb<O>,
@@ -65,6 +74,7 @@ pub trait Handler: Command {
     fn handle(&mut self, cmd: &Self::Cmd);
 }
 
+#[derive(Debug)]
 pub struct LeftRight<T, C>
 where
     T: Absorb<C>,
@@ -180,6 +190,7 @@ macro_rules! define_state {
                 )*
             }
 
+            #[derive(Debug)]
             pub struct $state {
                 inner: $crate::stm::LeftRight<[<$state Inner>], [<$state 
Command>]>,
             }
@@ -207,6 +218,8 @@ macro_rules! define_state {
                 }
             }
 
+            // TODO: This can be monomorphized by const generics, instead of 
creating an runtime enum
+            // We can use const generics and specialize each of those methods.
             impl $crate::stm::Command for [<$state Inner>] {
                 type Cmd = [<$state Command>];
                 type Input = 
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 979ff3ccf..9928169a6 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -20,6 +20,7 @@ use iggy_common::{header::PrepareHeader, message::Message};
 use crate::stm::{State, StateMachine};
 
 // MuxStateMachine that proxies to an tuple of variadic state machines
+#[derive(Debug)]
 pub struct MuxStateMachine<T>
 where
     T: StateMachine,
diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
new file mode 100644
index 000000000..4e611090c
--- /dev/null
+++ b/core/simulator/Cargo.toml
@@ -0,0 +1,12 @@
+[package]
+name = "simulator"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+bytes = { workspace = true }
+consensus = { path = "../consensus" }
+iggy_common = { path = "../common" }
+journal = { path = "../journal" }
+message_bus = { path = "../message_bus" }
+metadata = { path = "../metadata" }
\ No newline at end of file
diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs
new file mode 100644
index 000000000..6fefdc20a
--- /dev/null
+++ b/core/simulator/src/bus.rs
@@ -0,0 +1,120 @@
+use iggy_common::{header::GenericHeader, message::Message, IggyError};
+use message_bus::MessageBus;
+use std::cell::RefCell;
+use std::collections::{HashMap, VecDeque};
+
+/// Message envelope for tracking sender/recipient
+#[derive(Debug, Clone)]
+pub struct Envelope {
+    pub from_replica: Option<u8>,
+    pub to_replica: Option<u8>,
+    pub to_client: Option<u128>,
+    pub message: Message<GenericHeader>,
+}
+
+/// In-memory message bus implementing the MessageBus trait
+#[derive(Debug, Default)]
+pub struct MemBus {
+    clients: RefCell<HashMap<u128, ()>>,
+    replicas: RefCell<HashMap<u8, ()>>,
+    pending_messages: RefCell<VecDeque<Envelope>>,
+}
+
+impl MemBus {
+    pub fn new() -> Self {
+        Self {
+            clients: RefCell::new(HashMap::new()),
+            replicas: RefCell::new(HashMap::new()),
+            pending_messages: RefCell::new(VecDeque::new()),
+        }
+    }
+
+    /// Get the next pending message from the bus
+    pub fn receive(&self) -> Option<Envelope> {
+        self.pending_messages.borrow_mut().pop_front()
+    }
+
+    /// Get all pending messages from the bus
+    pub fn receive_all(&self) -> Vec<Envelope> {
+        self.pending_messages.borrow_mut().drain(..).collect()
+    }
+
+    /// Check if there are pending messages
+    pub fn has_pending(&self) -> bool {
+        !self.pending_messages.borrow().is_empty()
+    }
+
+    /// Get the count of pending messages
+    pub fn pending_count(&self) -> usize {
+        self.pending_messages.borrow().len()
+    }
+}
+
+impl MessageBus for MemBus {
+    type Client = u128;
+    type Replica = u8;
+    type Data = Message<GenericHeader>;
+    type Sender = ();
+
+    fn add_client(&mut self, client: Self::Client, _sender: Self::Sender) -> 
bool {
+        if self.clients.borrow().contains_key(&client) {
+            return false;
+        }
+        self.clients.borrow_mut().insert(client, ());
+        true
+    }
+
+    fn remove_client(&mut self, client: Self::Client) -> bool {
+        self.clients.borrow_mut().remove(&client).is_some()
+    }
+
+    fn add_replica(&mut self, replica: Self::Replica) -> bool {
+        if self.replicas.borrow().contains_key(&replica) {
+            return false;
+        }
+        self.replicas.borrow_mut().insert(replica, ());
+        true
+    }
+
+    fn remove_replica(&mut self, replica: Self::Replica) -> bool {
+        self.replicas.borrow_mut().remove(&replica).is_some()
+    }
+
+    async fn send_to_client(
+        &self,
+        client_id: Self::Client,
+        message: Self::Data,
+    ) -> Result<(), IggyError> {
+        if !self.clients.borrow().contains_key(&client_id) {
+            return Err(IggyError::ClientNotFound(client_id as u32));
+        }
+
+        self.pending_messages.borrow_mut().push_back(Envelope {
+            from_replica: None,
+            to_replica: None,
+            to_client: Some(client_id),
+            message,
+        });
+
+        Ok(())
+    }
+
+    async fn send_to_replica(
+        &self,
+        replica: Self::Replica,
+        message: Self::Data,
+    ) -> Result<(), IggyError> {
+        if !self.replicas.borrow().contains_key(&replica) {
+            return Err(IggyError::ResourceNotFound(format!("Replica {}", 
replica)));
+        }
+
+        self.pending_messages.borrow_mut().push_back(Envelope {
+            from_replica: None,
+            to_replica: Some(replica),
+            to_client: None,
+            message,
+        });
+
+        Ok(())
+    }
+}
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
new file mode 100644
index 000000000..5f1790e75
--- /dev/null
+++ b/core/simulator/src/deps.rs
@@ -0,0 +1,138 @@
+use bytes::Bytes;
+use consensus::VsrConsensus;
+use iggy_common::header::PrepareHeader;
+use iggy_common::message::Message;
+use journal::{Journal, JournalHandle, Storage};
+use metadata::stm::consumer_group::ConsumerGroups;
+use metadata::stm::stream::Streams;
+use metadata::stm::user::Users;
+use metadata::{variadic, IggyMetadata, MuxStateMachine};
+use std::cell::{Cell, RefCell, UnsafeCell};
+use std::collections::HashMap;
+
+/// In-memory storage backend for testing/simulation
+#[derive(Debug, Default)]
+pub struct MemStorage {
+    data: RefCell<Vec<u8>>,
+    offset: Cell<u64>,
+}
+
+impl Storage for MemStorage {
+    type Buffer = Vec<u8>;
+
+    async fn write(&self, buf: Self::Buffer) -> usize {
+        let len = buf.len();
+        self.data.borrow_mut().extend_from_slice(&buf);
+        self.offset.set(self.offset.get() + len as u64);
+        len
+    }
+
+    async fn read(&self, offset: usize, mut buffer: Self::Buffer) -> 
Self::Buffer {
+        let data = self.data.borrow();
+        let end = offset + buffer.len();
+        if offset < data.len() && end <= data.len() {
+            buffer[..].copy_from_slice(&data[offset..end]);
+        }
+        buffer
+    }
+}
+
+/// Generic in-memory journal implementation for testing/simulation
+/// Following TigerBeetle's approach: headers in memory, full messages in 
storage
+pub struct SimJournal<S: Storage> {
+    storage: S,
+    headers: UnsafeCell<HashMap<u64, PrepareHeader>>,
+    offsets: UnsafeCell<HashMap<u64, usize>>,
+}
+
+impl<S: Storage + Default> Default for SimJournal<S> {
+    fn default() -> Self {
+        Self {
+            storage: S::default(),
+            headers: UnsafeCell::new(HashMap::new()),
+            offsets: UnsafeCell::new(HashMap::new()),
+        }
+    }
+}
+
+impl<S: Storage> std::fmt::Debug for SimJournal<S> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("SimJournal")
+            .field("storage", &"<Storage>")
+            .field("headers", &"<UnsafeCell>")
+            .field("offsets", &"<UnsafeCell>")
+            .finish()
+    }
+}
+
+impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for SimJournal<S> {
+    type Header = PrepareHeader;
+    type Entry = Message<PrepareHeader>;
+
+    async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
+        let headers = unsafe { &*self.headers.get() };
+        let offsets = unsafe { &*self.offsets.get() };
+
+        let header = headers.get(&header.op)?;
+        let offset = *offsets.get(&header.op)?;
+
+        let mut buffer = Vec::with_capacity(header.size as usize);
+        unsafe {
+            buffer.set_len(header.size as usize);
+        }
+        let buffer = self.storage.read(offset, buffer).await;
+        let message =
+            Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes 
should be valid");
+        Some(message)
+    }
+
+    fn previous_header(&self, header: &Self::Header) -> Option<&Self::Header> {
+        if header.op == 0 {
+            return None;
+        }
+        unsafe { &*self.headers.get() }.get(&(header.op - 1))
+    }
+
+    async fn append(&self, entry: Self::Entry) {
+        let header = *entry.header();
+        let message_bytes = entry.as_bytes();
+
+        let bytes_written = self.storage.write(message_bytes.to_vec()).await;
+
+        let current_offset = unsafe { &mut *self.offsets.get() }
+            .values()
+            .last()
+            .cloned()
+            .unwrap_or_default();
+
+        unsafe { &mut *self.headers.get() }.insert(header.op, header);
+        unsafe { &mut *self.offsets.get() }.insert(header.op, current_offset + 
bytes_written);
+    }
+
+    fn header(&self, idx: usize) -> Option<&Self::Header> {
+        let headers = unsafe { &*self.headers.get() };
+        headers.get(&(idx as u64))
+    }
+}
+
+impl JournalHandle for SimJournal<MemStorage> {
+    type Storage = MemStorage;
+    type Target = SimJournal<MemStorage>;
+
+    fn handle(&self) -> &Self::Target {
+        self
+    }
+}
+
+/// Placeholder snapshot implementation
+#[derive(Debug, Default)]
+pub struct SimSnapshot {}
+
+/// Type aliases for simulator metadata
+pub type SimMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)>;
+pub type SimMetadata =
+    IggyMetadata<VsrConsensus, SimJournal<MemStorage>, SimSnapshot, 
SimMuxStateMachine>;
+
+/// Placeholder for replica partitions
+#[derive(Debug, Default)]
+pub struct ReplicaPartitions {}
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
new file mode 100644
index 000000000..fdd296f94
--- /dev/null
+++ b/core/simulator/src/lib.rs
@@ -0,0 +1,584 @@
+pub mod bus;
+pub mod deps;
+pub mod replica;
+
+use bus::MemBus;
+use iggy_common::header::{PrepareHeader, PrepareOkHeader, RequestHeader};
+use iggy_common::message::{Message, MessageBag};
+use iggy_common::{
+    ClientInfo, ClientInfoDetails, ClusterMetadata, CompressionAlgorithm, 
Consumer,
+    ConsumerGroup, ConsumerGroupDetails, ConsumerOffsetInfo, Identifier, 
IdentityInfo,
+    IggyDuration, IggyError, IggyExpiry, IggyMessage, MaxTopicSize, 
Partitioning, Permissions,
+    PersonalAccessTokenExpiry, PersonalAccessTokenInfo, PolledMessages, 
PollingStrategy,
+    RawPersonalAccessToken, Snapshot, SnapshotCompression, Stats, Stream, 
StreamDetails,
+    SystemSnapshotType, Topic, TopicDetails, UserInfo, UserInfoDetails, 
UserStatus,
+};
+use message_bus::MessageBus;
+use metadata::Metadata;
+use replica::Replica;
+use std::rc::Rc;
+
+/// The main simulator struct that manages all replicas and exposes the SDK API
+#[derive(Debug)]
+pub struct Simulator {
+    pub replicas: Vec<Replica>,
+    pub message_bus: Rc<MemBus>,
+}
+
+impl Simulator {
+    pub fn new(replica_count: usize) -> Self {
+        // Create message bus and preseed all replica connections
+        let mut message_bus = MemBus::new();
+
+        // Register all replicas with the message bus
+        for i in 0..replica_count as u8 {
+            message_bus.add_replica(i);
+        }
+
+        let message_bus = Rc::new(message_bus);
+        let replicas = (0..replica_count)
+            .map(|i| Replica::new(i as u8, format!("replica-{}", i), 
Rc::clone(&message_bus)))
+            .collect();
+
+        Self {
+            replicas,
+            message_bus,
+        }
+    }
+
+    pub fn with_message_bus(replica_count: usize, mut message_bus: MemBus) -> 
Self {
+        // Preseed replica connections
+        for i in 0..replica_count as u8 {
+            message_bus.add_replica(i);
+        }
+
+        let message_bus = Rc::new(message_bus);
+        let replicas = (0..replica_count)
+            .map(|i| Replica::new(i as u8, format!("replica-{}", i), 
Rc::clone(&message_bus)))
+            .collect();
+
+        Self {
+            replicas,
+            message_bus,
+        }
+    }
+}
+
+// 
=============================================================================
+// Internal message dispatch - Implementation details
+// 
=============================================================================
+
+impl Simulator {
+    /// Dispatch a message to the appropriate subsystem
+    /// Private - called by public client methods after building the message
+    ///
+    /// Routes based on message type and operation:
+    /// - Request messages → check operation type
+    ///   - Metadata operations (1-99) → metadata shard
+    ///   - Partition operations (100+) → partition shard
+    /// - Other message types → routed directly
+    fn dispatch(&self, message: MessageBag) {
+        match message {
+            MessageBag::Request(request) => {
+                // Route request based on operation type
+                let operation_value = request.header().operation as u32;
+
+                if operation_value < 100 {
+                    // Metadata operation
+                    self.dispatch_to_metadata(request);
+                } else {
+                    // Partition operation
+                    self.dispatch_to_partition(request);
+                }
+            }
+            MessageBag::Prepare(prepare) => {
+                // TODO: Handle prepare messages (chain replication)
+                todo!("handle prepare: op={}", prepare.header().op);
+            }
+            MessageBag::PrepareOk(prepare_ok) => {
+                // TODO: Handle acknowledgments
+                todo!("handle prepare_ok: op={}", prepare_ok.header().op);
+            }
+            MessageBag::Reply(_reply) => {
+                // TODO: Handle replies back to client
+                todo!("handle reply message");
+            }
+            MessageBag::Commit(_commit) => {
+                // TODO: Handle commit messages
+                todo!("handle commit message");
+            }
+            MessageBag::Generic(_generic) => {
+                // TODO: Handle generic messages
+                todo!("handle generic message");
+            }
+        }
+    }
+
+    /// Dispatch metadata operation to metadata shard
+    fn dispatch_to_metadata(&self, request: Message<RequestHeader>) {
+        // TODO: Determine which replica is primary (for now, use replica 0)
+        let primary_id = 0;
+        let primary = &self.replicas[primary_id];
+
+        // Route to metadata's on_request handler
+        primary.metadata.on_request(request);
+    }
+
+    /// Dispatch partition operation to partition shard
+    fn dispatch_to_partition(&self, request: Message<RequestHeader>) {
+        // TODO: Determine which partition/replica handles this request
+        // For now, just route to replica 0's partitions
+        let replica_id = 0;
+        let _replica = &self.replicas[replica_id];
+
+        // TODO: Route to replica.partitions.on_request(request)
+        todo!(
+            "dispatch partition operation to replica {}: operation={:?}",
+            replica_id,
+            request.header().operation
+        );
+    }
+
+}
+
+// 
=============================================================================
+// Client methods
+// 
=============================================================================
+
+impl Simulator {
+    pub fn connect(&self) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn disconnect(&self) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn shutdown(&self) -> Result<(), IggyError> {
+        todo!()
+    }
+}
+
+// 
=============================================================================
+// System methods
+// 
=============================================================================
+
+impl Simulator {
+    pub fn get_stats(&self) -> Result<Stats, IggyError> {
+        todo!()
+    }
+
+    pub fn get_me(&self) -> Result<ClientInfoDetails, IggyError> {
+        todo!()
+    }
+
+    pub fn get_client(&self, _client_id: u32) -> 
Result<Option<ClientInfoDetails>, IggyError> {
+        todo!()
+    }
+
+    pub fn get_clients(&self) -> Result<Vec<ClientInfo>, IggyError> {
+        todo!()
+    }
+
+    pub fn ping(&self) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn heartbeat_interval(&self) -> IggyDuration {
+        todo!()
+    }
+
+    pub fn snapshot(
+        &self,
+        _compression: SnapshotCompression,
+        _snapshot_types: Vec<SystemSnapshotType>,
+    ) -> Result<Snapshot, IggyError> {
+        todo!()
+    }
+}
+
+// 
=============================================================================
+// User methods
+// 
=============================================================================
+
+impl Simulator {
+    pub fn get_user(&self, _user_id: &Identifier) -> 
Result<Option<UserInfoDetails>, IggyError> {
+        todo!()
+    }
+
+    pub fn get_users(&self) -> Result<Vec<UserInfo>, IggyError> {
+        todo!()
+    }
+
+    pub fn create_user(
+        &self,
+        _username: &str,
+        _password: &str,
+        _status: UserStatus,
+        _permissions: Option<Permissions>,
+    ) -> Result<UserInfoDetails, IggyError> {
+        todo!()
+    }
+
+    pub fn delete_user(&self, _user_id: &Identifier) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn update_user(
+        &self,
+        _user_id: &Identifier,
+        _username: Option<&str>,
+        _status: Option<UserStatus>,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn update_permissions(
+        &self,
+        _user_id: &Identifier,
+        _permissions: Option<Permissions>,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn change_password(
+        &self,
+        _user_id: &Identifier,
+        _current_password: &str,
+        _new_password: &str,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn login_user(&self, _username: &str, _password: &str) -> 
Result<IdentityInfo, IggyError> {
+        todo!()
+    }
+
+    pub fn logout_user(&self) -> Result<(), IggyError> {
+        todo!()
+    }
+}
+
+// 
=============================================================================
+// Personal access token methods
+// 
=============================================================================
+
+impl Simulator {
+    pub fn get_personal_access_tokens(&self) -> 
Result<Vec<PersonalAccessTokenInfo>, IggyError> {
+        todo!()
+    }
+
+    pub fn create_personal_access_token(
+        &self,
+        _name: &str,
+        _expiry: PersonalAccessTokenExpiry,
+    ) -> Result<RawPersonalAccessToken, IggyError> {
+        todo!()
+    }
+
+    pub fn delete_personal_access_token(&self, _name: &str) -> Result<(), 
IggyError> {
+        todo!()
+    }
+
+    pub fn login_with_personal_access_token(
+        &self,
+        _token: &str,
+    ) -> Result<IdentityInfo, IggyError> {
+        todo!()
+    }
+}
+
+// 
=============================================================================
+// Stream methods - Public API
+// 
=============================================================================
+//
+// Pattern for all client methods:
+// 1. Build Message<RequestHeader> from parameters
+// 2. Convert to MessageBag::Request(message)
+// 3. Call self.dispatch(bag) - private method
+// 4. Wait for reply from primary
+// 5. Parse and return result
+
+impl Simulator {
+    pub fn get_stream(&self, _stream_id: &Identifier) -> 
Result<Option<StreamDetails>, IggyError> {
+        // TODO: Build message with Operation::GetStream (not yet in enum)
+        // self.dispatch_request(message);
+        // Wait for reply and parse
+        todo!()
+    }
+
+    pub fn get_streams(&self) -> Result<Vec<Stream>, IggyError> {
+        // TODO: Build message with Operation::ListStreams (not yet in enum)
+        // self.dispatch_request(message);
+        // Wait for reply and parse
+        todo!()
+    }
+
+    pub fn create_stream(&self, name: &str) -> Result<StreamDetails, 
IggyError> {
+        // TODO: Build Message<RequestHeader> with:
+        //    - header.operation = Operation::CreateStream
+        //    - body containing serialized stream name
+        // Convert to MessageBag (using From impl) and dispatch:
+        //   let message: Message<RequestHeader> = build_request(name);
+        //   self.dispatch(message.into());  // ← .into() converts to 
MessageBag
+        // Wait for reply and parse StreamDetails
+        todo!("build message for: {}", name)
+    }
+
+    pub fn update_stream(&self, _stream_id: &Identifier, _name: &str) -> 
Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn delete_stream(&self, _stream_id: &Identifier) -> Result<(), 
IggyError> {
+        todo!()
+    }
+
+    pub fn purge_stream(&self, _stream_id: &Identifier) -> Result<(), 
IggyError> {
+        todo!()
+    }
+}
+
+// 
=============================================================================
+// Topic methods
+// 
=============================================================================
+
+impl Simulator {
+    pub fn get_topic(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+    ) -> Result<Option<TopicDetails>, IggyError> {
+        todo!()
+    }
+
+    pub fn get_topics(&self, _stream_id: &Identifier) -> Result<Vec<Topic>, 
IggyError> {
+        todo!()
+    }
+
+    pub fn create_topic(
+        &self,
+        _stream_id: &Identifier,
+        _name: &str,
+        _partitions_count: u32,
+        _compression_algorithm: CompressionAlgorithm,
+        _replication_factor: Option<u8>,
+        _message_expiry: IggyExpiry,
+        _max_topic_size: MaxTopicSize,
+    ) -> Result<TopicDetails, IggyError> {
+        todo!()
+    }
+
+    pub fn update_topic(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _name: &str,
+        _compression_algorithm: CompressionAlgorithm,
+        _replication_factor: Option<u8>,
+        _message_expiry: IggyExpiry,
+        _max_topic_size: MaxTopicSize,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn delete_topic(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn purge_topic(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+}
+
+// 
=============================================================================
+// Partition methods
+// 
=============================================================================
+
+impl Simulator {
+    pub fn create_partitions(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _partitions_count: u32,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn delete_partitions(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _partitions_count: u32,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+}
+
+// 
=============================================================================
+// Segment methods
+// 
=============================================================================
+
+impl Simulator {
+    pub fn delete_segments(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _partition_id: u32,
+        _segments_count: u32,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+}
+
+// 
=============================================================================
+// Message methods
+// 
=============================================================================
+
+impl Simulator {
+    pub fn poll_messages(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _partition_id: Option<u32>,
+        _consumer: &Consumer,
+        _strategy: &PollingStrategy,
+        _count: u32,
+        _auto_commit: bool,
+    ) -> Result<PolledMessages, IggyError> {
+        todo!()
+    }
+
+    pub fn send_messages(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _partitioning: &Partitioning,
+        _messages: &mut [IggyMessage],
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn flush_unsaved_buffer(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _partition_id: u32,
+        _fsync: bool,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+}
+
+// 
=============================================================================
+// Consumer offset methods
+// 
=============================================================================
+
+impl Simulator {
+    pub fn store_consumer_offset(
+        &self,
+        _consumer: &Consumer,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _partition_id: Option<u32>,
+        _offset: u64,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn get_consumer_offset(
+        &self,
+        _consumer: &Consumer,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _partition_id: Option<u32>,
+    ) -> Result<Option<ConsumerOffsetInfo>, IggyError> {
+        todo!()
+    }
+
+    pub fn delete_consumer_offset(
+        &self,
+        _consumer: &Consumer,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _partition_id: Option<u32>,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+}
+
+// 
=============================================================================
+// Consumer group methods
+// 
=============================================================================
+
+impl Simulator {
+    pub fn get_consumer_group(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _group_id: &Identifier,
+    ) -> Result<Option<ConsumerGroupDetails>, IggyError> {
+        todo!()
+    }
+
+    pub fn get_consumer_groups(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+    ) -> Result<Vec<ConsumerGroup>, IggyError> {
+        todo!()
+    }
+
+    pub fn create_consumer_group(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _name: &str,
+    ) -> Result<ConsumerGroupDetails, IggyError> {
+        todo!()
+    }
+
+    pub fn delete_consumer_group(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _group_id: &Identifier,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn join_consumer_group(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _group_id: &Identifier,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+
+    pub fn leave_consumer_group(
+        &self,
+        _stream_id: &Identifier,
+        _topic_id: &Identifier,
+        _group_id: &Identifier,
+    ) -> Result<(), IggyError> {
+        todo!()
+    }
+}
+
+// 
=============================================================================
+// Cluster methods
+// 
=============================================================================
+
+impl Simulator {
+    pub fn get_cluster_metadata(&self) -> Result<ClusterMetadata, IggyError> {
+        todo!()
+    }
+}
diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs
new file mode 100644
index 000000000..0790f62d5
--- /dev/null
+++ b/core/simulator/src/main.rs
@@ -0,0 +1,9 @@
+use simulator::Simulator;
+
+fn main() {
+    let sim = Simulator::new(3);
+    println!("Created simulator with {} replicas", sim.replicas.len());
+    for replica in &sim.replicas {
+        println!("  - {} (id: {})", replica.name, replica.id);
+    }
+}
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
new file mode 100644
index 000000000..1e93b9640
--- /dev/null
+++ b/core/simulator/src/replica.rs
@@ -0,0 +1,41 @@
+use crate::bus::MemBus;
+use crate::deps::{
+    MemStorage, ReplicaPartitions, SimJournal, SimMetadata, 
SimMuxStateMachine, SimSnapshot,
+};
+use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
+use metadata::stm::stream::{Streams, StreamsInner};
+use metadata::stm::user::{Users, UsersInner};
+use metadata::{variadic, IggyMetadata};
+use std::rc::Rc;
+
+#[derive(Debug)]
+pub struct Replica {
+    pub id: u8,
+    pub name: String,
+    pub metadata: SimMetadata,
+    pub partitions: ReplicaPartitions,
+    pub bus: Rc<MemBus>,
+}
+
+impl Replica {
+    pub fn new(id: u8, name: String, bus: Rc<MemBus>) -> Self {
+        // Create the mux state machine with all state machines
+        let users: Users = UsersInner::new().into();
+        let streams: Streams = StreamsInner::new().into();
+        let consumer_groups: ConsumerGroups = 
ConsumerGroupsInner::new().into();
+        let mux = SimMuxStateMachine::new(variadic!(users, streams, 
consumer_groups));
+
+        Self {
+            id,
+            name,
+            metadata: IggyMetadata {
+                consensus: None, // TODO: Init consensus
+                journal: Some(SimJournal::<MemStorage>::default()),
+                snapshot: Some(SimSnapshot::default()),
+                mux_stm: mux,
+            },
+            partitions: ReplicaPartitions::default(),
+            bus,
+        }
+    }
+}


Reply via email to