This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch simulator in repository https://gitbox.apache.org/repos/asf/iggy.git
commit ea951fbdaf0cb501f49ec4911a9a091050d61b0a Author: numinex <[email protected]> AuthorDate: Tue Feb 3 19:32:33 2026 +0100 top --- Cargo.lock | 12 + Cargo.toml | 1 + core/common/src/sender/mod.rs | 1 + core/common/src/types/consensus/message.rs | 11 +- core/consensus/src/impls.rs | 4 + core/consensus/src/vsr_timeout.rs | 1 + core/journal/src/lib.rs | 12 +- core/message_bus/src/cache/connection.rs | 5 +- core/message_bus/src/lib.rs | 7 +- core/metadata/src/impls/metadata.rs | 23 +- core/metadata/src/lib.rs | 8 +- core/metadata/src/stm/mod.rs | 13 + core/metadata/src/stm/mux.rs | 1 + core/simulator/Cargo.toml | 12 + core/simulator/src/bus.rs | 120 ++++++ core/simulator/src/deps.rs | 138 +++++++ core/simulator/src/lib.rs | 584 +++++++++++++++++++++++++++++ core/simulator/src/main.rs | 9 + core/simulator/src/replica.rs | 41 ++ 19 files changed, 980 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f2806dd02..faad05d5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8449,6 +8449,18 @@ dependencies = [ "time", ] +[[package]] +name = "simulator" +version = "0.1.0" +dependencies = [ + "bytes", + "consensus", + "iggy_common", + "journal", + "message_bus", + "metadata", +] + [[package]] name = "siphasher" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 0be7d8a9c..082b02b0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ members = [ "core/partitions", "core/sdk", "core/server", + "core/simulator", "core/tools", "examples/rust", ] diff --git a/core/common/src/sender/mod.rs b/core/common/src/sender/mod.rs index 27c98d890..07ca24c40 100644 --- a/core/common/src/sender/mod.rs +++ b/core/common/src/sender/mod.rs @@ -83,6 +83,7 @@ pub trait Sender { } #[allow(clippy::large_enum_variant)] +#[derive(Debug)] pub enum SenderKind { Tcp(TcpSender), TcpTls(TcpTlsSender), diff --git a/core/common/src/types/consensus/message.rs b/core/common/src/types/consensus/message.rs index ac32cc3c8..a9fca6524 100644 --- a/core/common/src/types/consensus/message.rs +++ b/core/common/src/types/consensus/message.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::types::consensus::header::{ +use crate::{header::RequestHeader, types::consensus::header::{ self, CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, PrepareOkHeader, ReplyHeader, -}; +}}; use bytes::Bytes; use std::marker::PhantomData; @@ -278,6 +278,7 @@ where #[derive(Debug)] #[allow(unused)] pub enum MessageBag { + Request(Message<RequestHeader>), Generic(Message<GenericHeader>), Prepare(Message<PrepareHeader>), PrepareOk(Message<PrepareOkHeader>), @@ -289,6 +290,7 @@ impl MessageBag { #[allow(unused)] pub fn command(&self) -> header::Command2 { match self { + MessageBag::Request(message) => message.header().command, MessageBag::Generic(message) => message.header().command, MessageBag::Prepare(message) => message.header().command, MessageBag::PrepareOk(message) => message.header().command, @@ -300,6 +302,7 @@ impl MessageBag { #[allow(unused)] pub fn size(&self) -> u32 { match self { + MessageBag::Request(message) => message.header().size(), MessageBag::Generic(message) => message.header().size(), MessageBag::Prepare(message) => message.header().size(), MessageBag::PrepareOk(message) => message.header().size(), @@ -333,6 +336,10 @@ where header::Command2::Reply => { let msg = unsafe { Message::<header::ReplyHeader>::from_buffer_unchecked(buffer) }; MessageBag::Reply(msg) + }, + header::Command2::Request => { + let msg = unsafe { Message::<header::RequestHeader>::from_buffer_unchecked(buffer) }; + MessageBag::Request(msg) } _ => unreachable!( "For now we only support Prepare, Commit, and Reply. In the future we will support more commands. Command2: {command:?}" diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs index 154a6c3a4..34b08e654 100644 --- a/core/consensus/src/impls.rs +++ b/core/consensus/src/impls.rs @@ -43,6 +43,7 @@ pub trait Sequencer { fn set_sequence(&self, sequence: Self::Sequence); } +#[derive(Debug)] pub struct LocalSequencer { op: Cell<u64>, } @@ -82,6 +83,7 @@ pub const PIPELINE_PREPARE_QUEUE_MAX: usize = 8; /// Maximum number of replicas in a cluster. pub const REPLICAS_MAX: usize = 32; +#[derive(Debug)] pub struct PipelineEntry { pub message: Message<PrepareHeader>, /// Bitmap of replicas that have acknowledged this prepare. @@ -133,6 +135,7 @@ impl RequestEntry { } } +#[derive(Debug)] pub struct LocalPipeline { /// Messages being prepared (uncommitted and being replicated). prepare_queue: VecDeque<PipelineEntry>, @@ -386,6 +389,7 @@ pub enum VsrAction { } #[allow(unused)] +#[derive(Debug)] pub struct VsrConsensus { cluster: u128, replica: u8, diff --git a/core/consensus/src/vsr_timeout.rs b/core/consensus/src/vsr_timeout.rs index 140d40f02..d5cda482e 100644 --- a/core/consensus/src/vsr_timeout.rs +++ b/core/consensus/src/vsr_timeout.rs @@ -109,6 +109,7 @@ pub enum TimeoutKind { /// Manager for all VSR timeouts of a replica. #[allow(unused)] +#[derive(Debug)] pub struct TimeoutManager { ping: Timeout, prepare: Timeout, diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs index b091bcd3d..07056416f 100644 --- a/core/journal/src/lib.rs +++ b/core/journal/src/lib.rs @@ -24,21 +24,19 @@ where type Header; type Entry; - fn entry(&self, header: &Self::Header) -> Option<&Self::Entry>; - - fn previous_entry(&self, header: &Self::Header) -> Option<Self::Header>; - - fn set_header_as_dirty(&self, header: &Self::Header); + fn header(&self, idx: usize) -> Option<&Self::Header>; + fn previous_header(&self, header: &Self::Header) -> Option<&Self::Header>; fn append(&self, entry: Self::Entry) -> impl Future<Output = ()>; + fn entry(&self, header: &Self::Header) -> impl Future<Output = Option<Self::Entry>>; } // TODO: Move to other crate. pub trait Storage { type Buffer; - fn write(&self, buf: Self::Buffer) -> usize; - fn read(&self, offset: u64, buffer: Self::Buffer) -> Self::Buffer; + fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>; + fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output = Self::Buffer>; } pub trait JournalHandle { diff --git a/core/message_bus/src/cache/connection.rs b/core/message_bus/src/cache/connection.rs index d07ac66ec..633dc2280 100644 --- a/core/message_bus/src/cache/connection.rs +++ b/core/message_bus/src/cache/connection.rs @@ -34,6 +34,7 @@ pub trait ShardedState { } /// Least-loaded allocation strategy for connections +#[derive(Debug)] pub struct LeastLoadedStrategy { total_shards: usize, connections_per_shard: RefCell<Vec<(u16, usize)>>, @@ -193,6 +194,7 @@ impl AllocationStrategy<ConnectionCache> for LeastLoadedStrategy { } /// Coordinator that wraps a strategy for a specific sharded state type +#[derive(Debug)] pub struct Coordinator<A, SS> where SS: ShardedState, @@ -223,6 +225,7 @@ where } } +#[derive(Debug)] pub struct ShardedConnections<A, SS> where SS: ShardedState, @@ -259,7 +262,7 @@ where } /// Cache for connection state per shard -#[derive(Default)] +#[derive(Debug, Default)] pub struct ConnectionCache { pub shard_id: u16, pub connections: HashMap<u8, Option<Rc<TcpSender>>>, diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs index c578a1fb0..027fe9f56 100644 --- a/core/message_bus/src/lib.rs +++ b/core/message_bus/src/lib.rs @@ -27,8 +27,9 @@ pub trait MessageBus { type Client; type Replica; type Data; + type Sender; - fn add_client(&mut self, client: Self::Client, sender: SenderKind) -> bool; + fn add_client(&mut self, client: Self::Client, sender: Self::Sender) -> bool; fn remove_client(&mut self, client: Self::Client) -> bool; fn add_replica(&mut self, replica: Self::Replica) -> bool; @@ -48,6 +49,7 @@ pub trait MessageBus { } // TODO: explore generics for Strategy +#[derive(Debug)] pub struct IggyMessageBus { clients: HashMap<u128, SenderKind>, replicas: ShardedConnections<LeastLoadedStrategy, ConnectionCache>, @@ -83,8 +85,9 @@ impl MessageBus for IggyMessageBus { type Client = u128; type Replica = u8; type Data = Message<GenericHeader>; + type Sender = SenderKind; - fn add_client(&mut self, client: Self::Client, sender: SenderKind) -> bool { + fn add_client(&mut self, client: Self::Client, sender: Self::Sender) -> bool { if self.clients.contains_key(&client) { return false; } diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index c8c7fdf90..5932c3e09 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -39,15 +39,16 @@ where } #[expect(unused)] -struct IggyMetadata<C, J, S, M> { +#[derive(Debug)] +pub struct IggyMetadata<C, J, S, M> { /// Some on shard0, None on other shards - consensus: Option<C>, + pub consensus: Option<C>, /// Some on shard0, None on other shards - journal: Option<J>, + pub journal: Option<J>, /// Some on shard0, None on other shards - snapshot: Option<S>, + pub snapshot: Option<S>, /// State machine - lives on all shards - mux_stm: M, + pub mux_stm: M, } impl<J, S, M> Metadata<VsrConsensus> for IggyMetadata<VsrConsensus, J, S, M> @@ -137,14 +138,13 @@ where // TODO handle gap in ops. // Verify hash chain integrity. - if let Some(previous) = journal.handle().previous_entry(header) { + if let Some(previous) = journal.handle().previous_header(header) { self.panic_if_hash_chain_would_break_in_same_view(&previous, header); } assert_eq!(header.op, current_op + 1); consensus.sequencer().set_sequence(header.op); - journal.handle().set_header_as_dirty(header); // Append to journal. journal.handle().append(message.clone()).await; @@ -228,7 +228,8 @@ where }; let header = prepare.header(); - header.op <= consensus.commit() || journal.handle().entry(header).is_some() + // TODO: Handle idx calculation, for now using header.op, but since the journal may get compacted, this may not be correct. + header.op <= consensus.commit() || journal.handle().header(header.op as usize).is_some() } /// Replicate a prepare message to the next replica in the chain. @@ -243,9 +244,11 @@ where let header = message.header(); + // TODO: calculate the index; + let idx= header.op as usize; assert_eq!(header.command, Command2::Prepare); assert!( - journal.handle().entry(header).is_none(), + journal.handle().header(idx).is_none(), "replicate: must not already have prepare" ); assert!(header.op > consensus.commit()); @@ -335,7 +338,7 @@ where } // Verify we have the prepare and it's persisted (not dirty). - if journal.handle().entry(header).is_none() { + if journal.handle().header(header.op as usize).is_none() { debug!( replica = consensus.replica(), op = header.op, diff --git a/core/metadata/src/lib.rs b/core/metadata/src/lib.rs index 824cf614f..08c137fff 100644 --- a/core/metadata/src/lib.rs +++ b/core/metadata/src/lib.rs @@ -17,8 +17,14 @@ //! Iggy metadata module -mod impls; +pub mod impls; pub mod permissioner; pub mod stm; mod stats; + +// Re-export IggyMetadata and Metadata trait for use in other modules +pub use impls::metadata::{IggyMetadata, Metadata}; + +// Re-export MuxStateMachine for use in other modules +pub use stm::mux::MuxStateMachine; diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index 9dfb44852..e26651c05 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -31,6 +31,15 @@ where inner: UnsafeCell<WriteHandle<T, O>>, } +impl<T, O> std::fmt::Debug for WriteCell<T, O> +where + T: Absorb<O>, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WriteCell").finish_non_exhaustive() + } +} + impl<T, O> WriteCell<T, O> where T: Absorb<O>, @@ -65,6 +74,7 @@ pub trait Handler: Command { fn handle(&mut self, cmd: &Self::Cmd); } +#[derive(Debug)] pub struct LeftRight<T, C> where T: Absorb<C>, @@ -180,6 +190,7 @@ macro_rules! define_state { )* } + #[derive(Debug)] pub struct $state { inner: $crate::stm::LeftRight<[<$state Inner>], [<$state Command>]>, } @@ -207,6 +218,8 @@ macro_rules! define_state { } } + // TODO: This can be monomorphized by const generics, instead of creating an runtime enum + // We can use const generics and specialize each of those methods. impl $crate::stm::Command for [<$state Inner>] { type Cmd = [<$state Command>]; type Input = ::iggy_common::message::Message<::iggy_common::header::PrepareHeader>; diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs index 979ff3ccf..9928169a6 100644 --- a/core/metadata/src/stm/mux.rs +++ b/core/metadata/src/stm/mux.rs @@ -20,6 +20,7 @@ use iggy_common::{header::PrepareHeader, message::Message}; use crate::stm::{State, StateMachine}; // MuxStateMachine that proxies to an tuple of variadic state machines +#[derive(Debug)] pub struct MuxStateMachine<T> where T: StateMachine, diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml new file mode 100644 index 000000000..4e611090c --- /dev/null +++ b/core/simulator/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "simulator" +version = "0.1.0" +edition = "2024" + +[dependencies] +bytes = { workspace = true } +consensus = { path = "../consensus" } +iggy_common = { path = "../common" } +journal = { path = "../journal" } +message_bus = { path = "../message_bus" } +metadata = { path = "../metadata" } \ No newline at end of file diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs new file mode 100644 index 000000000..6fefdc20a --- /dev/null +++ b/core/simulator/src/bus.rs @@ -0,0 +1,120 @@ +use iggy_common::{header::GenericHeader, message::Message, IggyError}; +use message_bus::MessageBus; +use std::cell::RefCell; +use std::collections::{HashMap, VecDeque}; + +/// Message envelope for tracking sender/recipient +#[derive(Debug, Clone)] +pub struct Envelope { + pub from_replica: Option<u8>, + pub to_replica: Option<u8>, + pub to_client: Option<u128>, + pub message: Message<GenericHeader>, +} + +/// In-memory message bus implementing the MessageBus trait +#[derive(Debug, Default)] +pub struct MemBus { + clients: RefCell<HashMap<u128, ()>>, + replicas: RefCell<HashMap<u8, ()>>, + pending_messages: RefCell<VecDeque<Envelope>>, +} + +impl MemBus { + pub fn new() -> Self { + Self { + clients: RefCell::new(HashMap::new()), + replicas: RefCell::new(HashMap::new()), + pending_messages: RefCell::new(VecDeque::new()), + } + } + + /// Get the next pending message from the bus + pub fn receive(&self) -> Option<Envelope> { + self.pending_messages.borrow_mut().pop_front() + } + + /// Get all pending messages from the bus + pub fn receive_all(&self) -> Vec<Envelope> { + self.pending_messages.borrow_mut().drain(..).collect() + } + + /// Check if there are pending messages + pub fn has_pending(&self) -> bool { + !self.pending_messages.borrow().is_empty() + } + + /// Get the count of pending messages + pub fn pending_count(&self) -> usize { + self.pending_messages.borrow().len() + } +} + +impl MessageBus for MemBus { + type Client = u128; + type Replica = u8; + type Data = Message<GenericHeader>; + type Sender = (); + + fn add_client(&mut self, client: Self::Client, _sender: Self::Sender) -> bool { + if self.clients.borrow().contains_key(&client) { + return false; + } + self.clients.borrow_mut().insert(client, ()); + true + } + + fn remove_client(&mut self, client: Self::Client) -> bool { + self.clients.borrow_mut().remove(&client).is_some() + } + + fn add_replica(&mut self, replica: Self::Replica) -> bool { + if self.replicas.borrow().contains_key(&replica) { + return false; + } + self.replicas.borrow_mut().insert(replica, ()); + true + } + + fn remove_replica(&mut self, replica: Self::Replica) -> bool { + self.replicas.borrow_mut().remove(&replica).is_some() + } + + async fn send_to_client( + &self, + client_id: Self::Client, + message: Self::Data, + ) -> Result<(), IggyError> { + if !self.clients.borrow().contains_key(&client_id) { + return Err(IggyError::ClientNotFound(client_id as u32)); + } + + self.pending_messages.borrow_mut().push_back(Envelope { + from_replica: None, + to_replica: None, + to_client: Some(client_id), + message, + }); + + Ok(()) + } + + async fn send_to_replica( + &self, + replica: Self::Replica, + message: Self::Data, + ) -> Result<(), IggyError> { + if !self.replicas.borrow().contains_key(&replica) { + return Err(IggyError::ResourceNotFound(format!("Replica {}", replica))); + } + + self.pending_messages.borrow_mut().push_back(Envelope { + from_replica: None, + to_replica: Some(replica), + to_client: None, + message, + }); + + Ok(()) + } +} diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs new file mode 100644 index 000000000..5f1790e75 --- /dev/null +++ b/core/simulator/src/deps.rs @@ -0,0 +1,138 @@ +use bytes::Bytes; +use consensus::VsrConsensus; +use iggy_common::header::PrepareHeader; +use iggy_common::message::Message; +use journal::{Journal, JournalHandle, Storage}; +use metadata::stm::consumer_group::ConsumerGroups; +use metadata::stm::stream::Streams; +use metadata::stm::user::Users; +use metadata::{variadic, IggyMetadata, MuxStateMachine}; +use std::cell::{Cell, RefCell, UnsafeCell}; +use std::collections::HashMap; + +/// In-memory storage backend for testing/simulation +#[derive(Debug, Default)] +pub struct MemStorage { + data: RefCell<Vec<u8>>, + offset: Cell<u64>, +} + +impl Storage for MemStorage { + type Buffer = Vec<u8>; + + async fn write(&self, buf: Self::Buffer) -> usize { + let len = buf.len(); + self.data.borrow_mut().extend_from_slice(&buf); + self.offset.set(self.offset.get() + len as u64); + len + } + + async fn read(&self, offset: usize, mut buffer: Self::Buffer) -> Self::Buffer { + let data = self.data.borrow(); + let end = offset + buffer.len(); + if offset < data.len() && end <= data.len() { + buffer[..].copy_from_slice(&data[offset..end]); + } + buffer + } +} + +/// Generic in-memory journal implementation for testing/simulation +/// Following TigerBeetle's approach: headers in memory, full messages in storage +pub struct SimJournal<S: Storage> { + storage: S, + headers: UnsafeCell<HashMap<u64, PrepareHeader>>, + offsets: UnsafeCell<HashMap<u64, usize>>, +} + +impl<S: Storage + Default> Default for SimJournal<S> { + fn default() -> Self { + Self { + storage: S::default(), + headers: UnsafeCell::new(HashMap::new()), + offsets: UnsafeCell::new(HashMap::new()), + } + } +} + +impl<S: Storage> std::fmt::Debug for SimJournal<S> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SimJournal") + .field("storage", &"<Storage>") + .field("headers", &"<UnsafeCell>") + .field("offsets", &"<UnsafeCell>") + .finish() + } +} + +impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for SimJournal<S> { + type Header = PrepareHeader; + type Entry = Message<PrepareHeader>; + + async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> { + let headers = unsafe { &*self.headers.get() }; + let offsets = unsafe { &*self.offsets.get() }; + + let header = headers.get(&header.op)?; + let offset = *offsets.get(&header.op)?; + + let mut buffer = Vec::with_capacity(header.size as usize); + unsafe { + buffer.set_len(header.size as usize); + } + let buffer = self.storage.read(offset, buffer).await; + let message = + Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes should be valid"); + Some(message) + } + + fn previous_header(&self, header: &Self::Header) -> Option<&Self::Header> { + if header.op == 0 { + return None; + } + unsafe { &*self.headers.get() }.get(&(header.op - 1)) + } + + async fn append(&self, entry: Self::Entry) { + let header = *entry.header(); + let message_bytes = entry.as_bytes(); + + let bytes_written = self.storage.write(message_bytes.to_vec()).await; + + let current_offset = unsafe { &mut *self.offsets.get() } + .values() + .last() + .cloned() + .unwrap_or_default(); + + unsafe { &mut *self.headers.get() }.insert(header.op, header); + unsafe { &mut *self.offsets.get() }.insert(header.op, current_offset + bytes_written); + } + + fn header(&self, idx: usize) -> Option<&Self::Header> { + let headers = unsafe { &*self.headers.get() }; + headers.get(&(idx as u64)) + } +} + +impl JournalHandle for SimJournal<MemStorage> { + type Storage = MemStorage; + type Target = SimJournal<MemStorage>; + + fn handle(&self) -> &Self::Target { + self + } +} + +/// Placeholder snapshot implementation +#[derive(Debug, Default)] +pub struct SimSnapshot {} + +/// Type aliases for simulator metadata +pub type SimMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, ConsumerGroups)>; +pub type SimMetadata = + IggyMetadata<VsrConsensus, SimJournal<MemStorage>, SimSnapshot, SimMuxStateMachine>; + +/// Placeholder for replica partitions +#[derive(Debug, Default)] +pub struct ReplicaPartitions {} diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs new file mode 100644 index 000000000..fdd296f94 --- /dev/null +++ b/core/simulator/src/lib.rs @@ -0,0 +1,584 @@ +pub mod bus; +pub mod deps; +pub mod replica; + +use bus::MemBus; +use iggy_common::header::{PrepareHeader, PrepareOkHeader, RequestHeader}; +use iggy_common::message::{Message, MessageBag}; +use iggy_common::{ + ClientInfo, ClientInfoDetails, ClusterMetadata, CompressionAlgorithm, Consumer, + ConsumerGroup, ConsumerGroupDetails, ConsumerOffsetInfo, Identifier, IdentityInfo, + IggyDuration, IggyError, IggyExpiry, IggyMessage, MaxTopicSize, Partitioning, Permissions, + PersonalAccessTokenExpiry, PersonalAccessTokenInfo, PolledMessages, PollingStrategy, + RawPersonalAccessToken, Snapshot, SnapshotCompression, Stats, Stream, StreamDetails, + SystemSnapshotType, Topic, TopicDetails, UserInfo, UserInfoDetails, UserStatus, +}; +use message_bus::MessageBus; +use metadata::Metadata; +use replica::Replica; +use std::rc::Rc; + +/// The main simulator struct that manages all replicas and exposes the SDK API +#[derive(Debug)] +pub struct Simulator { + pub replicas: Vec<Replica>, + pub message_bus: Rc<MemBus>, +} + +impl Simulator { + pub fn new(replica_count: usize) -> Self { + // Create message bus and preseed all replica connections + let mut message_bus = MemBus::new(); + + // Register all replicas with the message bus + for i in 0..replica_count as u8 { + message_bus.add_replica(i); + } + + let message_bus = Rc::new(message_bus); + let replicas = (0..replica_count) + .map(|i| Replica::new(i as u8, format!("replica-{}", i), Rc::clone(&message_bus))) + .collect(); + + Self { + replicas, + message_bus, + } + } + + pub fn with_message_bus(replica_count: usize, mut message_bus: MemBus) -> Self { + // Preseed replica connections + for i in 0..replica_count as u8 { + message_bus.add_replica(i); + } + + let message_bus = Rc::new(message_bus); + let replicas = (0..replica_count) + .map(|i| Replica::new(i as u8, format!("replica-{}", i), Rc::clone(&message_bus))) + .collect(); + + Self { + replicas, + message_bus, + } + } +} + +// ============================================================================= +// Internal message dispatch - Implementation details +// ============================================================================= + +impl Simulator { + /// Dispatch a message to the appropriate subsystem + /// Private - called by public client methods after building the message + /// + /// Routes based on message type and operation: + /// - Request messages → check operation type + /// - Metadata operations (1-99) → metadata shard + /// - Partition operations (100+) → partition shard + /// - Other message types → routed directly + fn dispatch(&self, message: MessageBag) { + match message { + MessageBag::Request(request) => { + // Route request based on operation type + let operation_value = request.header().operation as u32; + + if operation_value < 100 { + // Metadata operation + self.dispatch_to_metadata(request); + } else { + // Partition operation + self.dispatch_to_partition(request); + } + } + MessageBag::Prepare(prepare) => { + // TODO: Handle prepare messages (chain replication) + todo!("handle prepare: op={}", prepare.header().op); + } + MessageBag::PrepareOk(prepare_ok) => { + // TODO: Handle acknowledgments + todo!("handle prepare_ok: op={}", prepare_ok.header().op); + } + MessageBag::Reply(_reply) => { + // TODO: Handle replies back to client + todo!("handle reply message"); + } + MessageBag::Commit(_commit) => { + // TODO: Handle commit messages + todo!("handle commit message"); + } + MessageBag::Generic(_generic) => { + // TODO: Handle generic messages + todo!("handle generic message"); + } + } + } + + /// Dispatch metadata operation to metadata shard + fn dispatch_to_metadata(&self, request: Message<RequestHeader>) { + // TODO: Determine which replica is primary (for now, use replica 0) + let primary_id = 0; + let primary = &self.replicas[primary_id]; + + // Route to metadata's on_request handler + primary.metadata.on_request(request); + } + + /// Dispatch partition operation to partition shard + fn dispatch_to_partition(&self, request: Message<RequestHeader>) { + // TODO: Determine which partition/replica handles this request + // For now, just route to replica 0's partitions + let replica_id = 0; + let _replica = &self.replicas[replica_id]; + + // TODO: Route to replica.partitions.on_request(request) + todo!( + "dispatch partition operation to replica {}: operation={:?}", + replica_id, + request.header().operation + ); + } + +} + +// ============================================================================= +// Client methods +// ============================================================================= + +impl Simulator { + pub fn connect(&self) -> Result<(), IggyError> { + todo!() + } + + pub fn disconnect(&self) -> Result<(), IggyError> { + todo!() + } + + pub fn shutdown(&self) -> Result<(), IggyError> { + todo!() + } +} + +// ============================================================================= +// System methods +// ============================================================================= + +impl Simulator { + pub fn get_stats(&self) -> Result<Stats, IggyError> { + todo!() + } + + pub fn get_me(&self) -> Result<ClientInfoDetails, IggyError> { + todo!() + } + + pub fn get_client(&self, _client_id: u32) -> Result<Option<ClientInfoDetails>, IggyError> { + todo!() + } + + pub fn get_clients(&self) -> Result<Vec<ClientInfo>, IggyError> { + todo!() + } + + pub fn ping(&self) -> Result<(), IggyError> { + todo!() + } + + pub fn heartbeat_interval(&self) -> IggyDuration { + todo!() + } + + pub fn snapshot( + &self, + _compression: SnapshotCompression, + _snapshot_types: Vec<SystemSnapshotType>, + ) -> Result<Snapshot, IggyError> { + todo!() + } +} + +// ============================================================================= +// User methods +// ============================================================================= + +impl Simulator { + pub fn get_user(&self, _user_id: &Identifier) -> Result<Option<UserInfoDetails>, IggyError> { + todo!() + } + + pub fn get_users(&self) -> Result<Vec<UserInfo>, IggyError> { + todo!() + } + + pub fn create_user( + &self, + _username: &str, + _password: &str, + _status: UserStatus, + _permissions: Option<Permissions>, + ) -> Result<UserInfoDetails, IggyError> { + todo!() + } + + pub fn delete_user(&self, _user_id: &Identifier) -> Result<(), IggyError> { + todo!() + } + + pub fn update_user( + &self, + _user_id: &Identifier, + _username: Option<&str>, + _status: Option<UserStatus>, + ) -> Result<(), IggyError> { + todo!() + } + + pub fn update_permissions( + &self, + _user_id: &Identifier, + _permissions: Option<Permissions>, + ) -> Result<(), IggyError> { + todo!() + } + + pub fn change_password( + &self, + _user_id: &Identifier, + _current_password: &str, + _new_password: &str, + ) -> Result<(), IggyError> { + todo!() + } + + pub fn login_user(&self, _username: &str, _password: &str) -> Result<IdentityInfo, IggyError> { + todo!() + } + + pub fn logout_user(&self) -> Result<(), IggyError> { + todo!() + } +} + +// ============================================================================= +// Personal access token methods +// ============================================================================= + +impl Simulator { + pub fn get_personal_access_tokens(&self) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> { + todo!() + } + + pub fn create_personal_access_token( + &self, + _name: &str, + _expiry: PersonalAccessTokenExpiry, + ) -> Result<RawPersonalAccessToken, IggyError> { + todo!() + } + + pub fn delete_personal_access_token(&self, _name: &str) -> Result<(), IggyError> { + todo!() + } + + pub fn login_with_personal_access_token( + &self, + _token: &str, + ) -> Result<IdentityInfo, IggyError> { + todo!() + } +} + +// ============================================================================= +// Stream methods - Public API +// ============================================================================= +// +// Pattern for all client methods: +// 1. Build Message<RequestHeader> from parameters +// 2. Convert to MessageBag::Request(message) +// 3. Call self.dispatch(bag) - private method +// 4. Wait for reply from primary +// 5. Parse and return result + +impl Simulator { + pub fn get_stream(&self, _stream_id: &Identifier) -> Result<Option<StreamDetails>, IggyError> { + // TODO: Build message with Operation::GetStream (not yet in enum) + // self.dispatch_request(message); + // Wait for reply and parse + todo!() + } + + pub fn get_streams(&self) -> Result<Vec<Stream>, IggyError> { + // TODO: Build message with Operation::ListStreams (not yet in enum) + // self.dispatch_request(message); + // Wait for reply and parse + todo!() + } + + pub fn create_stream(&self, name: &str) -> Result<StreamDetails, IggyError> { + // TODO: Build Message<RequestHeader> with: + // - header.operation = Operation::CreateStream + // - body containing serialized stream name + // Convert to MessageBag (using From impl) and dispatch: + // let message: Message<RequestHeader> = build_request(name); + // self.dispatch(message.into()); // ← .into() converts to MessageBag + // Wait for reply and parse StreamDetails + todo!("build message for: {}", name) + } + + pub fn update_stream(&self, _stream_id: &Identifier, _name: &str) -> Result<(), IggyError> { + todo!() + } + + pub fn delete_stream(&self, _stream_id: &Identifier) -> Result<(), IggyError> { + todo!() + } + + pub fn purge_stream(&self, _stream_id: &Identifier) -> Result<(), IggyError> { + todo!() + } +} + +// ============================================================================= +// Topic methods +// ============================================================================= + +impl Simulator { + pub fn get_topic( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + ) -> Result<Option<TopicDetails>, IggyError> { + todo!() + } + + pub fn get_topics(&self, _stream_id: &Identifier) -> Result<Vec<Topic>, IggyError> { + todo!() + } + + pub fn create_topic( + &self, + _stream_id: &Identifier, + _name: &str, + _partitions_count: u32, + _compression_algorithm: CompressionAlgorithm, + _replication_factor: Option<u8>, + _message_expiry: IggyExpiry, + _max_topic_size: MaxTopicSize, + ) -> Result<TopicDetails, IggyError> { + todo!() + } + + pub fn update_topic( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _name: &str, + _compression_algorithm: CompressionAlgorithm, + _replication_factor: Option<u8>, + _message_expiry: IggyExpiry, + _max_topic_size: MaxTopicSize, + ) -> Result<(), IggyError> { + todo!() + } + + pub fn delete_topic( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + ) -> Result<(), IggyError> { + todo!() + } + + pub fn purge_topic( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + ) -> Result<(), IggyError> { + todo!() + } +} + +// ============================================================================= +// Partition methods +// ============================================================================= + +impl Simulator { + pub fn create_partitions( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _partitions_count: u32, + ) -> Result<(), IggyError> { + todo!() + } + + pub fn delete_partitions( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _partitions_count: u32, + ) -> Result<(), IggyError> { + todo!() + } +} + +// ============================================================================= +// Segment methods +// ============================================================================= + +impl Simulator { + pub fn delete_segments( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _partition_id: u32, + _segments_count: u32, + ) -> Result<(), IggyError> { + todo!() + } +} + +// ============================================================================= +// Message methods +// ============================================================================= + +impl Simulator { + pub fn poll_messages( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _partition_id: Option<u32>, + _consumer: &Consumer, + _strategy: &PollingStrategy, + _count: u32, + _auto_commit: bool, + ) -> Result<PolledMessages, IggyError> { + todo!() + } + + pub fn send_messages( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _partitioning: &Partitioning, + _messages: &mut [IggyMessage], + ) -> Result<(), IggyError> { + todo!() + } + + pub fn flush_unsaved_buffer( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _partition_id: u32, + _fsync: bool, + ) -> Result<(), IggyError> { + todo!() + } +} + +// ============================================================================= +// Consumer offset methods +// ============================================================================= + +impl Simulator { + pub fn store_consumer_offset( + &self, + _consumer: &Consumer, + _stream_id: &Identifier, + _topic_id: &Identifier, + _partition_id: Option<u32>, + _offset: u64, + ) -> Result<(), IggyError> { + todo!() + } + + pub fn get_consumer_offset( + &self, + _consumer: &Consumer, + _stream_id: &Identifier, + _topic_id: &Identifier, + _partition_id: Option<u32>, + ) -> Result<Option<ConsumerOffsetInfo>, IggyError> { + todo!() + } + + pub fn delete_consumer_offset( + &self, + _consumer: &Consumer, + _stream_id: &Identifier, + _topic_id: &Identifier, + _partition_id: Option<u32>, + ) -> Result<(), IggyError> { + todo!() + } +} + +// ============================================================================= +// Consumer group methods +// ============================================================================= + +impl Simulator { + pub fn get_consumer_group( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _group_id: &Identifier, + ) -> Result<Option<ConsumerGroupDetails>, IggyError> { + todo!() + } + + pub fn get_consumer_groups( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + ) -> Result<Vec<ConsumerGroup>, IggyError> { + todo!() + } + + pub fn create_consumer_group( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _name: &str, + ) -> Result<ConsumerGroupDetails, IggyError> { + todo!() + } + + pub fn delete_consumer_group( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _group_id: &Identifier, + ) -> Result<(), IggyError> { + todo!() + } + + pub fn join_consumer_group( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _group_id: &Identifier, + ) -> Result<(), IggyError> { + todo!() + } + + pub fn leave_consumer_group( + &self, + _stream_id: &Identifier, + _topic_id: &Identifier, + _group_id: &Identifier, + ) -> Result<(), IggyError> { + todo!() + } +} + +// ============================================================================= +// Cluster methods +// ============================================================================= + +impl Simulator { + pub fn get_cluster_metadata(&self) -> Result<ClusterMetadata, IggyError> { + todo!() + } +} diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs new file mode 100644 index 000000000..0790f62d5 --- /dev/null +++ b/core/simulator/src/main.rs @@ -0,0 +1,9 @@ +use simulator::Simulator; + +fn main() { + let sim = Simulator::new(3); + println!("Created simulator with {} replicas", sim.replicas.len()); + for replica in &sim.replicas { + println!(" - {} (id: {})", replica.name, replica.id); + } +} diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs new file mode 100644 index 000000000..1e93b9640 --- /dev/null +++ b/core/simulator/src/replica.rs @@ -0,0 +1,41 @@ +use crate::bus::MemBus; +use crate::deps::{ + MemStorage, ReplicaPartitions, SimJournal, SimMetadata, SimMuxStateMachine, SimSnapshot, +}; +use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner}; +use metadata::stm::stream::{Streams, StreamsInner}; +use metadata::stm::user::{Users, UsersInner}; +use metadata::{variadic, IggyMetadata}; +use std::rc::Rc; + +#[derive(Debug)] +pub struct Replica { + pub id: u8, + pub name: String, + pub metadata: SimMetadata, + pub partitions: ReplicaPartitions, + pub bus: Rc<MemBus>, +} + +impl Replica { + pub fn new(id: u8, name: String, bus: Rc<MemBus>) -> Self { + // Create the mux state machine with all state machines + let users: Users = UsersInner::new().into(); + let streams: Streams = StreamsInner::new().into(); + let consumer_groups: ConsumerGroups = ConsumerGroupsInner::new().into(); + let mux = SimMuxStateMachine::new(variadic!(users, streams, consumer_groups)); + + Self { + id, + name, + metadata: IggyMetadata { + consensus: None, // TODO: Init consensus + journal: Some(SimJournal::<MemStorage>::default()), + snapshot: Some(SimSnapshot::default()), + mux_stm: mux, + }, + partitions: ReplicaPartitions::default(), + bus, + } + } +}
