This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch simulator in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 1b799401eb9fadb646e3782a1d5eed0cb4888d7a Author: numinex <[email protected]> AuthorDate: Thu Feb 5 18:17:25 2026 +0100 simulator v1 --- Cargo.lock | 3 + core/common/src/types/consensus/header.rs | 28 +- core/common/src/types/consensus/message.rs | 54 +-- core/consensus/src/impls.rs | 98 +++-- core/consensus/src/lib.rs | 18 +- core/metadata/Cargo.toml | 1 + core/metadata/src/impls/metadata.rs | 160 +++++--- core/metadata/src/stm/mod.rs | 25 +- core/metadata/src/stm/mux.rs | 19 +- core/metadata/src/stm/stream.rs | 1 + core/simulator/Cargo.toml | 2 + core/simulator/src/bus.rs | 119 ++++-- core/simulator/src/client.rs | 83 ++++ core/simulator/src/deps.rs | 32 +- core/simulator/src/lib.rs | 627 +++++------------------------ core/simulator/src/main.rs | 103 ++++- core/simulator/src/replica.rs | 40 +- 17 files changed, 670 insertions(+), 743 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index faad05d5b..76de899a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5666,6 +5666,7 @@ version = "0.1.0" dependencies = [ "ahash 0.8.12", "consensus", + "futures", "iggy_common", "journal", "left-right", @@ -8453,8 +8454,10 @@ dependencies = [ name = "simulator" version = "0.1.0" dependencies = [ + "bytemuck", "bytes", "consensus", + "futures", "iggy_common", "journal", "message_bus", diff --git a/core/common/src/types/consensus/header.rs b/core/common/src/types/consensus/header.rs index 376b6be05..8f9538557 100644 --- a/core/common/src/types/consensus/header.rs +++ b/core/common/src/types/consensus/header.rs @@ -114,6 +114,8 @@ pub enum Operation { UpdatePermissions = 145, CreatePersonalAccessToken = 146, DeletePersonalAccessToken = 147, + + Reserved = 200, } #[repr(C)] @@ -172,6 +174,30 @@ pub struct RequestHeader { pub reserved: [u8; 95], } +impl Default for RequestHeader { + fn default() -> Self { + Self { + reserved: [0; 95], + checksum: 0, + checksum_body: 0, + cluster: 0, + size: 0, + epoch: 0, + view: 0, + release: 0, + protocol: 0, + command: Default::default(), + replica: 0, + reserved_frame: [0; 12], + client: 0, + request_checksum: 0, + timestamp: 0, + request: 0, + operation: Default::default(), + } + } +} + unsafe impl Pod for RequestHeader {} unsafe impl Zeroable for RequestHeader {} @@ -348,7 +374,7 @@ impl ConsensusHeader for CommitHeader { } #[repr(C)] -#[derive(Debug, Clone, Copy)] +#[derive(Default, Debug, Clone, Copy)] pub struct ReplyHeader { pub checksum: u128, pub checksum_body: u128, diff --git a/core/common/src/types/consensus/message.rs b/core/common/src/types/consensus/message.rs index a9fca6524..dfb9e7090 100644 --- a/core/common/src/types/consensus/message.rs +++ b/core/common/src/types/consensus/message.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::{header::RequestHeader, types::consensus::header::{ - self, CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, PrepareOkHeader, ReplyHeader, -}}; +use crate::{ + header::RequestHeader, + types::consensus::header::{self, ConsensusHeader, PrepareHeader, PrepareOkHeader}, +}; use bytes::Bytes; use std::marker::PhantomData; @@ -279,11 +280,8 @@ where #[allow(unused)] pub enum MessageBag { Request(Message<RequestHeader>), - Generic(Message<GenericHeader>), Prepare(Message<PrepareHeader>), PrepareOk(Message<PrepareOkHeader>), - Commit(Message<CommitHeader>), - Reply(Message<ReplyHeader>), } impl MessageBag { @@ -291,11 +289,8 @@ impl MessageBag { pub fn command(&self) -> header::Command2 { match self { MessageBag::Request(message) => message.header().command, - MessageBag::Generic(message) => message.header().command, MessageBag::Prepare(message) => message.header().command, MessageBag::PrepareOk(message) => message.header().command, - MessageBag::Commit(message) => message.header().command, - MessageBag::Reply(message) => message.header().command, } } @@ -303,11 +298,17 @@ impl MessageBag { pub fn size(&self) -> u32 { match self { MessageBag::Request(message) => message.header().size(), - MessageBag::Generic(message) => message.header().size(), MessageBag::Prepare(message) => message.header().size(), MessageBag::PrepareOk(message) => message.header().size(), - MessageBag::Commit(message) => message.header().size(), - MessageBag::Reply(message) => message.header().size(), + } + } + + #[allow(unused)] + pub fn operation(&self) -> header::Operation { + match self { + MessageBag::Request(message) => message.header().operation, + MessageBag::Prepare(message) => message.header().operation, + MessageBag::PrepareOk(message) => message.header().operation, } } } @@ -329,21 +330,17 @@ where unsafe { Message::<header::PrepareHeader>::from_buffer_unchecked(buffer) }; MessageBag::Prepare(msg) } - header::Command2::Commit => { - let msg = unsafe { Message::<header::CommitHeader>::from_buffer_unchecked(buffer) }; - MessageBag::Commit(msg) - } - header::Command2::Reply => { - let msg = unsafe { Message::<header::ReplyHeader>::from_buffer_unchecked(buffer) }; - MessageBag::Reply(msg) - }, header::Command2::Request => { - let msg = unsafe { Message::<header::RequestHeader>::from_buffer_unchecked(buffer) }; + let msg = + unsafe { Message::<header::RequestHeader>::from_buffer_unchecked(buffer) }; MessageBag::Request(msg) } - _ => unreachable!( - "For now we only support Prepare, Commit, and Reply. In the future we will support more commands. Command2: {command:?}" - ), + header::Command2::PrepareOk => { + let msg = + unsafe { Message::<header::PrepareOkHeader>::from_buffer_unchecked(buffer) }; + MessageBag::PrepareOk(msg) + } + _ => unreachable!(), } } } @@ -507,9 +504,6 @@ mod tests { assert_eq!(bag.command(), header::Command2::Prepare); assert!(matches!(bag, MessageBag::Prepare(_))); - assert!(!matches!(bag, MessageBag::Commit(_))); - assert!(!matches!(bag, MessageBag::Reply(_))); - assert!(!matches!(bag, MessageBag::Generic(_))); } #[test] @@ -519,9 +513,6 @@ mod tests { assert_eq!(bag.command(), header::Command2::Commit); assert!(!matches!(bag, MessageBag::Prepare(_))); - assert!(matches!(bag, MessageBag::Commit(_))); - assert!(!matches!(bag, MessageBag::Reply(_))); - assert!(!matches!(bag, MessageBag::Generic(_))); } #[test] @@ -531,9 +522,6 @@ mod tests { assert_eq!(bag.command(), header::Command2::Reply); assert!(!matches!(bag, MessageBag::Prepare(_))); - assert!(!matches!(bag, MessageBag::Commit(_))); - assert!(matches!(bag, MessageBag::Reply(_))); - assert!(!matches!(bag, MessageBag::Generic(_))); } } diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs index 34b08e654..36b595163 100644 --- a/core/consensus/src/impls.rs +++ b/core/consensus/src/impls.rs @@ -27,6 +27,7 @@ use iggy_common::header::{ }; use iggy_common::message::Message; use message_bus::IggyMessageBus; +use message_bus::MessageBus; use std::cell::{Cell, RefCell}; use std::collections::VecDeque; @@ -221,12 +222,8 @@ impl LocalPipeline { self.prepare_queue.back() } - /// Find a message by op number and checksum. - pub fn message_by_op_and_checksum( - &mut self, - op: u64, - checksum: u128, - ) -> Option<&mut PipelineEntry> { + /// Find a message by op number and checksum (immutable). + pub fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> Option<&PipelineEntry> { let head_op = self.prepare_queue.front()?.message.header().op; let tail_op = self.prepare_queue.back()?.message.header().op; @@ -242,7 +239,7 @@ impl LocalPipeline { } let index = (op - head_op) as usize; - let entry = self.prepare_queue.get_mut(index)?; + let entry = self.prepare_queue.get(index)?; debug_assert_eq!(entry.message.header().op, op); @@ -323,6 +320,20 @@ impl LocalPipeline { pub fn clear(&mut self) { self.prepare_queue.clear(); } + + /// Extract and remove a message by op number. + /// Returns None if op is not in the pipeline. + pub fn extract_by_op(&mut self, op: u64) -> Option<PipelineEntry> { + let head_op = self.prepare_queue.front()?.message.header().op; + if op < head_op { + return None; + } + let index = (op - head_op) as usize; + if index >= self.prepare_queue.len() { + return None; + } + self.prepare_queue.remove(index) + } } impl Pipeline for LocalPipeline { @@ -337,15 +348,23 @@ impl Pipeline for LocalPipeline { LocalPipeline::pop_message(self) } + fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry> { + LocalPipeline::extract_by_op(self, op) + } + fn clear(&mut self) { LocalPipeline::clear(self) } + fn message_by_op(&self, op: u64) -> Option<&Self::Entry> { + LocalPipeline::message_by_op(self, op) + } + fn message_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry> { LocalPipeline::message_by_op_mut(self, op) } - fn message_by_op_and_checksum(&mut self, op: u64, checksum: u128) -> Option<&mut Self::Entry> { + fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> Option<&Self::Entry> { LocalPipeline::message_by_op_and_checksum(self, op, checksum) } @@ -390,7 +409,10 @@ pub enum VsrAction { #[allow(unused)] #[derive(Debug)] -pub struct VsrConsensus { +pub struct VsrConsensus<B = IggyMessageBus> +where + B: MessageBus, +{ cluster: u128, replica: u8, replica_count: u8, @@ -416,7 +438,7 @@ pub struct VsrConsensus { pipeline: RefCell<LocalPipeline>, - message_bus: IggyMessageBus, + message_bus: B, // TODO: Add loopback_queue for messages to self /// Tracks start view change messages received from all replicas (including self) start_view_change_from_all_replicas: RefCell<BitSet<u32>>, @@ -434,8 +456,8 @@ pub struct VsrConsensus { timeouts: RefCell<TimeoutManager>, } -impl VsrConsensus { - pub fn new(cluster: u128, replica: u8, replica_count: u8) -> Self { +impl<B: MessageBus> VsrConsensus<B> { + pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B) -> Self { assert!( replica < replica_count, "replica index must be < replica_count" @@ -453,7 +475,7 @@ impl VsrConsensus { last_timestamp: Cell::new(0), last_prepare_checksum: Cell::new(0), pipeline: RefCell::new(LocalPipeline::new()), - message_bus: IggyMessageBus::new(replica_count as usize, replica as u16, 0), + message_bus, start_view_change_from_all_replicas: RefCell::new(BitSet::with_capacity(REPLICAS_MAX)), do_view_change_from_all_replicas: RefCell::new(dvc_quorum_array_empty()), do_view_change_quorum: Cell::new(false), @@ -463,6 +485,11 @@ impl VsrConsensus { } } + // TODO: More init logic. + pub fn init(&self) { + self.status.set(Status::Normal); + } + pub fn primary_index(&self, view: u32) -> u8 { view as u8 % self.replica_count } @@ -999,9 +1026,9 @@ impl VsrConsensus { /// Called on the primary when a follower acknowledges a prepare. /// /// Returns true if quorum was just reached for this op. - pub fn handle_prepare_ok(&self, message: Message<PrepareOkHeader>) -> bool { - let header = message.header(); - + /// Handle a PrepareOk message. Returns true if quorum was reached. + /// Note: Caller (on_ack) should validate is_primary and status before calling. + pub fn handle_prepare_ok(&self, header: &PrepareOkHeader) -> bool { assert_eq!(header.command, Command2::PrepareOk); assert!( header.replica < self.replica_count, @@ -1009,26 +1036,16 @@ impl VsrConsensus { header.replica ); - // Ignore if not in normal status - if self.status() != Status::Normal { - return false; - } - // Ignore if from older view if header.view < self.view() { return false; } - // Ignore if from newer view. This shouldn't happen if we're primary + // Ignore if from newer view if header.view > self.view() { return false; } - // We must be primary to process prepare_ok - if !self.is_primary() { - return false; - } - // Ignore if syncing if self.is_syncing() { return false; @@ -1047,9 +1064,6 @@ impl VsrConsensus { return false; } - // Verify the prepare is for a valid op range - let _commit = self.commit(); - // Check for duplicate ack if entry.has_ack(header.replica) { return false; @@ -1062,22 +1076,19 @@ impl VsrConsensus { // Check if we've reached quorum if ack_count >= quorum && !entry.ok_quorum_received { entry.ok_quorum_received = true; - return true; } false } - pub fn message_bus(&self) -> &IggyMessageBus { + pub fn message_bus(&self) -> &B { &self.message_bus } } -impl Project<Message<PrepareHeader>> for Message<RequestHeader> { - type Consensus = VsrConsensus; - - fn project(self, consensus: &Self::Consensus) -> Message<PrepareHeader> { +impl<B: MessageBus> Project<Message<PrepareHeader>, VsrConsensus<B>> for Message<RequestHeader> { + fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareHeader> { let op = consensus.sequencer.current_sequence() + 1; self.transmute_header(|old, new| { @@ -1089,6 +1100,7 @@ impl Project<Message<PrepareHeader>> for Message<RequestHeader> { release: old.release, command: Command2::Prepare, replica: consensus.replica, + client: old.client, parent: 0, // TODO: Get parent checksum from the previous entry in the journal (figure out how to pass that ctx here) request_checksum: old.request_checksum, request: old.request, @@ -1102,10 +1114,8 @@ impl Project<Message<PrepareHeader>> for Message<RequestHeader> { } } -impl Project<Message<PrepareOkHeader>> for Message<PrepareHeader> { - type Consensus = VsrConsensus; - - fn project(self, consensus: &Self::Consensus) -> Message<PrepareOkHeader> { +impl<B: MessageBus> Project<Message<PrepareOkHeader>, VsrConsensus<B>> for Message<PrepareHeader> { + fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareOkHeader> { self.transmute_header(|old, new| { *new = PrepareOkHeader { command: Command2::PrepareOk, @@ -1128,8 +1138,8 @@ impl Project<Message<PrepareOkHeader>> for Message<PrepareHeader> { } } -impl Consensus for VsrConsensus { - type MessageBus = IggyMessageBus; +impl<B: MessageBus> Consensus for VsrConsensus<B> { + type MessageBus = B; type RequestMessage = Message<RequestHeader>; type ReplicateMessage = Message<PrepareHeader>; @@ -1166,9 +1176,9 @@ impl Consensus for VsrConsensus { // verify op is sequential assert_eq!( header.op, - self.sequencer.current_sequence() + 1, + self.sequencer.current_sequence(), "op must be sequential: expected {}, got {}", - self.sequencer.current_sequence() + 1, + self.sequencer.current_sequence(), header.op ); diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index 1cf323d94..5c3e152e6 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -17,9 +17,8 @@ use message_bus::MessageBus; -pub trait Project<T> { - type Consensus: Consensus; - fn project(self, consensus: &Self::Consensus) -> T; +pub trait Project<T, C: Consensus> { + fn project(self, consensus: &C) -> T; } pub trait Pipeline { @@ -30,11 +29,16 @@ pub trait Pipeline { fn pop_message(&mut self) -> Option<Self::Entry>; + /// Extract and remove a message by op number. + fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry>; + fn clear(&mut self); + fn message_by_op(&self, op: u64) -> Option<&Self::Entry>; + fn message_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry>; - fn message_by_op_and_checksum(&mut self, op: u64, checksum: u128) -> Option<&mut Self::Entry>; + fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> Option<&Self::Entry>; fn is_full(&self) -> bool; @@ -43,11 +47,11 @@ pub trait Pipeline { fn verify(&self); } -pub trait Consensus { +pub trait Consensus: Sized { type MessageBus: MessageBus; // I am wondering, whether we should create a dedicated trait for cloning, so it's explicit that we do ref counting. - type RequestMessage: Project<Self::ReplicateMessage, Consensus = Self> + Clone; - type ReplicateMessage: Project<Self::AckMessage, Consensus = Self> + Clone; + type RequestMessage: Project<Self::ReplicateMessage, Self> + Clone; + type ReplicateMessage: Project<Self::AckMessage, Self> + Clone; type AckMessage; type Sequencer: Sequencer; type Pipeline: Pipeline<Message = Self::ReplicateMessage>; diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml index 193bf5788..05dc872a1 100644 --- a/core/metadata/Cargo.toml +++ b/core/metadata/Cargo.toml @@ -30,6 +30,7 @@ readme = "../../../README.md" [dependencies] ahash = { workspace = true } consensus = { workspace = true } +futures = { workspace = true } iggy_common = { workspace = true } journal = { workspace = true } left-right = { workspace = true } diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 5932c3e09..7dbf9fe71 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -14,31 +14,30 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +use crate::stm::StateMachine; use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus}; use iggy_common::{ - header::{Command2, PrepareHeader, PrepareOkHeader}, + header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, ReplyHeader}, message::Message, }; use journal::{Journal, JournalHandle}; use message_bus::MessageBus; use tracing::{debug, warn}; -#[expect(unused)] pub trait Metadata<C> where C: Consensus, { /// Handle a request message. - fn on_request(&self, message: C::RequestMessage); + fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = ()>; /// Handle a replicate message (Prepare in VSR). fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output = ()>; /// Handle an ack message (PrepareOk in VSR). - fn on_ack(&self, message: C::AckMessage); + fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>; } -#[expect(unused)] #[derive(Debug)] pub struct IggyMetadata<C, J, S, M> { /// Some on shard0, None on other shards @@ -51,25 +50,27 @@ pub struct IggyMetadata<C, J, S, M> { pub mux_stm: M, } -impl<J, S, M> Metadata<VsrConsensus> for IggyMetadata<VsrConsensus, J, S, M> +impl<B, J, S, M> Metadata<VsrConsensus<B>> for IggyMetadata<VsrConsensus<B>, J, S, M> where + B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, J: JournalHandle, J::Target: Journal< J::Storage, - Entry = <VsrConsensus as Consensus>::ReplicateMessage, + Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage, Header = PrepareHeader, >, + M: StateMachine<Input = Message<PrepareHeader>>, { - fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage) { + async fn on_request(&self, message: <VsrConsensus<B> as Consensus>::RequestMessage) { let consensus = self.consensus.as_ref().unwrap(); // TODO: Bunch of asserts. debug!("handling metadata request"); let prepare = message.project(consensus); - self.pipeline_prepare(prepare); + self.pipeline_prepare(prepare).await; } - async fn on_replicate(&self, message: <VsrConsensus as Consensus>::ReplicateMessage) { + async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::ReplicateMessage) { let consensus = self.consensus.as_ref().unwrap(); let journal = self.journal.as_ref().unwrap(); @@ -94,20 +95,6 @@ where let current_op = consensus.sequencer().current_sequence(); - // Old message (handle as repair). Not replicating. - if header.view < consensus.view() - || (consensus.status() == Status::Normal - && header.view == consensus.view() - && header.op <= current_op) - { - debug!( - replica = consensus.replica(), - "on_replicate: ignoring (repair)" - ); - self.on_repair(message); - return; - } - // If status is not normal, ignore the replicate. if consensus.status() != Status::Normal { warn!( @@ -158,7 +145,7 @@ where } } - fn on_ack(&self, message: <VsrConsensus as Consensus>::AckMessage) { + async fn on_ack(&self, message: <VsrConsensus<B> as Consensus>::AckMessage) { let consensus = self.consensus.as_ref().unwrap(); let header = message.header(); @@ -172,60 +159,112 @@ where return; } - // Find the prepare in pipeline - let mut pipeline = consensus.pipeline().borrow_mut(); - let Some(entry) = pipeline.message_by_op_and_checksum(header.op, header.prepare_checksum) - else { - debug!("on_ack: prepare not in pipeline op={}", header.op); - return; - }; - - // Verify checksum matches - if entry.message.header().checksum != header.prepare_checksum { - warn!("on_ack: checksum mismatch"); - return; + // Verify checksum by checking pipeline entry exists + { + let pipeline = consensus.pipeline().borrow(); + let Some(entry) = + pipeline.message_by_op_and_checksum(header.op, header.prepare_checksum) + else { + debug!("on_ack: prepare not in pipeline op={}", header.op); + return; + }; + + if entry.message.header().checksum != header.prepare_checksum { + warn!("on_ack: checksum mismatch"); + return; + } } - // Record ack - let count = entry.add_ack(header.replica); - - // Check quorum - if count >= consensus.quorum() && !entry.ok_quorum_received { - entry.ok_quorum_received = true; + // Let consensus handle the ack increment and quorum check + if consensus.handle_prepare_ok(header) { debug!("on_ack: quorum received for op={}", header.op); - - // Advance commit number and trigger commit journal consensus.advance_commit_number(header.op); - self.commit_journal(); + + // Extract the prepare message from the pipeline by op + let entry = consensus.pipeline().borrow_mut().extract_by_op(header.op); + let Some(entry) = entry else { + warn!("on_ack: prepare not found in pipeline for op={}", header.op); + return; + }; + + let prepare = entry.message; + let prepare_header = *prepare.header(); + + // Apply the state (consumes prepare) + // TODO: Handle appending result to response + let _result = self.mux_stm.update(prepare); + debug!("on_ack: state applied for op={}", prepare_header.op); + + // TODO: Figure out better infra for this, its messy. + let reply = Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>()) + .transmute_header(|_, new| { + *new = ReplyHeader { + checksum: 0, + checksum_body: 0, + cluster: consensus.cluster(), + size: std::mem::size_of::<ReplyHeader>() as u32, + epoch: prepare_header.epoch, + view: consensus.view(), + release: 0, + protocol: 0, + command: Command2::Reply, + replica: consensus.replica(), + reserved_frame: [0; 12], + request_checksum: prepare_header.request_checksum, + request_checksum_padding: 0, + context: 0, + context_padding: 0, + op: prepare_header.op, + commit: consensus.commit(), + timestamp: prepare_header.timestamp, + request: prepare_header.request, + operation: prepare_header.operation, + ..Default::default() + }; + }); + + // Send reply to client + let generic_reply = reply.into_generic(); + debug!( + "on_ack: sending reply to client={} for op={}", + prepare_header.client, prepare_header.op + ); + + // TODO: Error handling + consensus + .message_bus() + .send_to_client(prepare_header.client, generic_reply) + .await + .unwrap() } } } -impl<J, S, M> IggyMetadata<VsrConsensus, J, S, M> +impl<B, J, S, M> IggyMetadata<VsrConsensus<B>, J, S, M> where + B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, J: JournalHandle, J::Target: Journal< J::Storage, - Entry = <VsrConsensus as Consensus>::ReplicateMessage, + Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage, Header = PrepareHeader, >, + M: StateMachine<Input = Message<PrepareHeader>>, { - #[expect(unused)] - fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) { + async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) { let consensus = self.consensus.as_ref().unwrap(); debug!("inserting prepare into metadata pipeline"); consensus.verify_pipeline(); consensus.pipeline_message(prepare.clone()); - self.on_replicate(prepare.clone()); + self.on_replicate(prepare.clone()).await; consensus.post_replicate_verify(&prepare); } fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool { - let (Some(consensus), Some(journal)) = (&self.consensus, &self.journal) else { - todo!("dispatch fence_old_prepare to shard0"); - }; + let consensus = self.consensus.as_ref().unwrap(); + let journal = self.journal.as_ref().unwrap(); let header = prepare.header(); // TODO: Handle idx calculation, for now using header.op, but since the journal may get compacted, this may not be correct. @@ -245,7 +284,7 @@ where let header = message.header(); // TODO: calculate the index; - let idx= header.op as usize; + let idx = header.op as usize; assert_eq!(header.command, Command2::Prepare); assert!( journal.handle().header(idx).is_none(), @@ -282,10 +321,6 @@ where .unwrap(); } - fn on_repair(&self, _message: Message<PrepareHeader>) { - todo!() - } - /// Verify hash chain would not break if we add this header. fn panic_if_hash_chain_would_break_in_same_view( &self, @@ -309,7 +344,6 @@ where // TODO: Implement commit logic // Walk through journal from last committed to current commit number // Apply each entry to the state machine - todo!() } /// Send a prepare_ok message to the primary. @@ -401,6 +435,12 @@ where "send_prepare_ok: loopback to self" ); // TODO: Queue for self-processing or call handle_prepare_ok directly + // TODO: This is temporal, to test simulator, but we should send message to ourselves properly. + consensus + .message_bus() + .send_to_replica(primary, generic_message) + .await + .unwrap(); } else { debug!( replica = consensus.replica(), diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index e26651c05..c92b0ca10 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -62,11 +62,12 @@ where } /// Parses type-erased input into a command. Macro-generated. +/// Returns `Ok(cmd)` if applicable, `Err(input)` to pass ownership back. pub trait Command { type Cmd; type Input; - fn parse(input: &Self::Input) -> Option<Self::Cmd>; + fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>; } /// Handles commands. User-implemented business logic. @@ -110,17 +111,18 @@ where } /// Public interface for state machines. +/// Returns `Ok(output)` if applicable, `Err(input)` to pass ownership back. pub trait State { type Output; type Input; - fn apply(&self, input: &Self::Input) -> Option<Self::Output>; + fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input>; } pub trait StateMachine { type Input; type Output; - fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>); + fn update(&self, input: Self::Input) -> Self::Output; } /// Generates a state machine with convention-based storage. @@ -212,32 +214,31 @@ macro_rules! define_state { type Input = <[<$state Inner>] as $crate::stm::Command>::Input; type Output = (); - fn apply(&self, input: &Self::Input) -> Option<Self::Output> { - <[<$state Inner>] as $crate::stm::Command>::parse(input) - .map(|cmd| self.inner.do_apply(cmd)) + fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input> { + let cmd = <[<$state Inner>] as $crate::stm::Command>::parse(input)?; + self.inner.do_apply(cmd); + Ok(()) } } - // TODO: This can be monomorphized by const generics, instead of creating an runtime enum - // We can use const generics and specialize each of those methods. impl $crate::stm::Command for [<$state Inner>] { type Cmd = [<$state Command>]; type Input = ::iggy_common::message::Message<::iggy_common::header::PrepareHeader>; - fn parse(input: &Self::Input) -> Option<Self::Cmd> { + fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input> { use ::iggy_common::BytesSerializable; use ::iggy_common::header::Operation; - let body = input.body_bytes(); match input.header().operation { $( Operation::$operation => { - Some([<$state Command>]::$operation( + let body = input.body_bytes(); + Ok([<$state Command>]::$operation( $operation::from_bytes(body).unwrap() )) }, )* - _ => None, + _ => Err(input), } } } diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs index 9928169a6..f68d99bf7 100644 --- a/core/metadata/src/stm/mux.rs +++ b/core/metadata/src/stm/mux.rs @@ -44,8 +44,8 @@ where type Input = T::Input; type Output = T::Output; - fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>) { - self.inner.update(input, output); + fn update(&self, input: Self::Input) -> Self::Output { + self.inner.update(input) } } @@ -67,12 +67,14 @@ macro_rules! variadic { impl StateMachine for () { type Input = Message<PrepareHeader>; // TODO: Make sure that the `Output` matches to the output type of the rest of list. + // TODO: Add a trait bound to the output that will allow us to get the response in bytes. type Output = (); - fn update(&self, _input: &Self::Input, _output: &mut Vec<Self::Output>) {} + fn update(&self, _input: Self::Input) -> Self::Output {} } // Recursive case: process head and recurse on tail +// No Clone bound needed - ownership passes through via Result impl<O, S, Rest> StateMachine for variadic!(S, ...Rest) where S: State<Output = O>, @@ -81,11 +83,11 @@ where type Input = Rest::Input; type Output = O; - fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>) { - if let Some(result) = self.0.apply(input) { - output.push(result); + fn update(&self, input: Self::Input) -> Self::Output { + match self.0.apply(input) { + Ok(result) => result, + Err(input) => self.1.update(input), } - self.1.update(input, output) } } @@ -105,8 +107,7 @@ mod tests { let mux = MuxStateMachine::new(variadic!(users, streams)); let input = Message::new(std::mem::size_of::<PrepareHeader>()); - let mut output = Vec::new(); - mux.update(&input, &mut output); + mux.update(input); } } diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index 5106d6122..d1a3da212 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -182,6 +182,7 @@ define_state! { DeletePartitions ] } + impl_absorb!(StreamsInner, StreamsCommand); impl StreamsInner { diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml index 4e611090c..ad11311b0 100644 --- a/core/simulator/Cargo.toml +++ b/core/simulator/Cargo.toml @@ -5,7 +5,9 @@ edition = "2024" [dependencies] bytes = { workspace = true } +bytemuck = { workspace = true } consensus = { path = "../consensus" } +futures = { workspace = true } iggy_common = { path = "../common" } journal = { path = "../journal" } message_bus = { path = "../message_bus" } diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs index 6fefdc20a..0df3b19fa 100644 --- a/core/simulator/src/bus.rs +++ b/core/simulator/src/bus.rs @@ -1,7 +1,8 @@ -use iggy_common::{header::GenericHeader, message::Message, IggyError}; +use iggy_common::{IggyError, header::GenericHeader, message::Message}; use message_bus::MessageBus; -use std::cell::RefCell; use std::collections::{HashMap, VecDeque}; +use std::ops::Deref; +use std::sync::{Arc, Mutex}; /// Message envelope for tracking sender/recipient #[derive(Debug, Clone)] @@ -12,41 +13,29 @@ pub struct Envelope { pub message: Message<GenericHeader>, } -/// In-memory message bus implementing the MessageBus trait +// TODO: Proper bus with an `Network` component which would simulate sending packets. +// Tigerbeetle handles this by having an list of "buses", and calling callbacks for clients when an response is send. +// This requires self-referntial structs (as message_bus has to store collection of other buses), which is overcomplilcated. +// I think the way we could handle that is by having an dedicated collection for client responses (clients_table). #[derive(Debug, Default)] pub struct MemBus { - clients: RefCell<HashMap<u128, ()>>, - replicas: RefCell<HashMap<u8, ()>>, - pending_messages: RefCell<VecDeque<Envelope>>, + clients: Mutex<HashMap<u128, ()>>, + replicas: Mutex<HashMap<u8, ()>>, + pending_messages: Mutex<VecDeque<Envelope>>, } impl MemBus { pub fn new() -> Self { Self { - clients: RefCell::new(HashMap::new()), - replicas: RefCell::new(HashMap::new()), - pending_messages: RefCell::new(VecDeque::new()), + clients: Mutex::new(HashMap::new()), + replicas: Mutex::new(HashMap::new()), + pending_messages: Mutex::new(VecDeque::new()), } } /// Get the next pending message from the bus pub fn receive(&self) -> Option<Envelope> { - self.pending_messages.borrow_mut().pop_front() - } - - /// Get all pending messages from the bus - pub fn receive_all(&self) -> Vec<Envelope> { - self.pending_messages.borrow_mut().drain(..).collect() - } - - /// Check if there are pending messages - pub fn has_pending(&self) -> bool { - !self.pending_messages.borrow().is_empty() - } - - /// Get the count of pending messages - pub fn pending_count(&self) -> usize { - self.pending_messages.borrow().len() + self.pending_messages.lock().unwrap().pop_front() } } @@ -57,27 +46,27 @@ impl MessageBus for MemBus { type Sender = (); fn add_client(&mut self, client: Self::Client, _sender: Self::Sender) -> bool { - if self.clients.borrow().contains_key(&client) { + if self.clients.lock().unwrap().contains_key(&client) { return false; } - self.clients.borrow_mut().insert(client, ()); + self.clients.lock().unwrap().insert(client, ()); true } fn remove_client(&mut self, client: Self::Client) -> bool { - self.clients.borrow_mut().remove(&client).is_some() + self.clients.lock().unwrap().remove(&client).is_some() } fn add_replica(&mut self, replica: Self::Replica) -> bool { - if self.replicas.borrow().contains_key(&replica) { + if self.replicas.lock().unwrap().contains_key(&replica) { return false; } - self.replicas.borrow_mut().insert(replica, ()); + self.replicas.lock().unwrap().insert(replica, ()); true } fn remove_replica(&mut self, replica: Self::Replica) -> bool { - self.replicas.borrow_mut().remove(&replica).is_some() + self.replicas.lock().unwrap().remove(&replica).is_some() } async fn send_to_client( @@ -85,11 +74,11 @@ impl MessageBus for MemBus { client_id: Self::Client, message: Self::Data, ) -> Result<(), IggyError> { - if !self.clients.borrow().contains_key(&client_id) { + if !self.clients.lock().unwrap().contains_key(&client_id) { return Err(IggyError::ClientNotFound(client_id as u32)); } - self.pending_messages.borrow_mut().push_back(Envelope { + self.pending_messages.lock().unwrap().push_back(Envelope { from_replica: None, to_replica: None, to_client: Some(client_id), @@ -104,11 +93,11 @@ impl MessageBus for MemBus { replica: Self::Replica, message: Self::Data, ) -> Result<(), IggyError> { - if !self.replicas.borrow().contains_key(&replica) { + if !self.replicas.lock().unwrap().contains_key(&replica) { return Err(IggyError::ResourceNotFound(format!("Replica {}", replica))); } - self.pending_messages.borrow_mut().push_back(Envelope { + self.pending_messages.lock().unwrap().push_back(Envelope { from_replica: None, to_replica: Some(replica), to_client: None, @@ -118,3 +107,63 @@ impl MessageBus for MemBus { Ok(()) } } + +/// Newtype wrapper for shared MemBus that implements MessageBus +#[derive(Debug, Clone)] +pub struct SharedMemBus(pub Arc<MemBus>); + +impl Deref for SharedMemBus { + type Target = MemBus; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl MessageBus for SharedMemBus { + type Client = u128; + type Replica = u8; + type Data = Message<GenericHeader>; + type Sender = (); + + fn add_client(&mut self, client: Self::Client, sender: Self::Sender) -> bool { + self.0 + .clients + .lock() + .unwrap() + .insert(client, sender) + .is_none() + } + + fn remove_client(&mut self, client: Self::Client) -> bool { + self.0.clients.lock().unwrap().remove(&client).is_some() + } + + fn add_replica(&mut self, replica: Self::Replica) -> bool { + self.0 + .replicas + .lock() + .unwrap() + .insert(replica, ()) + .is_none() + } + + fn remove_replica(&mut self, replica: Self::Replica) -> bool { + self.0.replicas.lock().unwrap().remove(&replica).is_some() + } + + async fn send_to_client( + &self, + client_id: Self::Client, + message: Self::Data, + ) -> Result<(), IggyError> { + self.0.send_to_client(client_id, message).await + } + + async fn send_to_replica( + &self, + replica: Self::Replica, + message: Self::Data, + ) -> Result<(), IggyError> { + self.0.send_to_replica(replica, message).await + } +} diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs new file mode 100644 index 000000000..d3fbfe42b --- /dev/null +++ b/core/simulator/src/client.rs @@ -0,0 +1,83 @@ +use iggy_common::{ + BytesSerializable, Identifier, + create_stream::CreateStream, + delete_stream::DeleteStream, + header::{Operation, RequestHeader}, + message::Message, +}; +use std::cell::Cell; + +// TODO: Proper client which implements the full client SDK API +pub struct SimClient { + client_id: u128, + request_counter: Cell<u64>, +} + +impl SimClient { + pub fn new(client_id: u128) -> Self { + Self { + client_id, + request_counter: Cell::new(0), + } + } + + fn next_request_number(&self) -> u64 { + let current = self.request_counter.get(); + self.request_counter.set(current + 1); + current + } + + pub fn create_stream(&self, name: &str) -> Message<RequestHeader> { + let create_stream = CreateStream { + name: name.to_string(), + }; + let payload = bytes::Bytes::from(create_stream.to_bytes()); + + self.build_request(Operation::CreateStream, payload) + } + + pub fn delete_stream(&self, name: &str) -> Message<RequestHeader> { + let delete_stream = DeleteStream { + stream_id: Identifier::named(name).unwrap(), + }; + let payload = bytes::Bytes::from(delete_stream.to_bytes()); + + self.build_request(Operation::DeleteStream, payload) + } + + fn build_request(&self, operation: Operation, payload: bytes::Bytes) -> Message<RequestHeader> { + use bytes::Bytes; + + let header_size = std::mem::size_of::<RequestHeader>(); + let total_size = header_size + payload.len(); + + let header = RequestHeader { + command: iggy_common::header::Command2::Request, + operation, + size: total_size as u32, + cluster: 0, // TODO: Get from config + checksum: 0, + checksum_body: 0, + epoch: 0, + view: 0, + release: 0, + protocol: 0, + replica: 0, + reserved_frame: [0; 12], + client: self.client_id, + request_checksum: 0, + timestamp: 0, // TODO: Use actual timestamp + request: self.next_request_number(), + ..Default::default() + }; + + let header_bytes = bytemuck::bytes_of(&header); + let mut buffer = Vec::with_capacity(total_size); + buffer.extend_from_slice(header_bytes); + buffer.extend_from_slice(&payload); + + let message = Message::<RequestHeader>::from_bytes(Bytes::from(buffer)) + .expect("failed to build request message"); + message + } +} diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs index 5f1790e75..0900456a3 100644 --- a/core/simulator/src/deps.rs +++ b/core/simulator/src/deps.rs @@ -1,3 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::bus::SharedMemBus; use bytes::Bytes; use consensus::VsrConsensus; use iggy_common::header::PrepareHeader; @@ -6,7 +24,7 @@ use journal::{Journal, JournalHandle, Storage}; use metadata::stm::consumer_group::ConsumerGroups; use metadata::stm::stream::Streams; use metadata::stm::user::Users; -use metadata::{variadic, IggyMetadata, MuxStateMachine}; +use metadata::{IggyMetadata, MuxStateMachine, variadic}; use std::cell::{Cell, RefCell, UnsafeCell}; use std::collections::HashMap; @@ -37,8 +55,8 @@ impl Storage for MemStorage { } } +// TODO: Replace with actual Journal, the only thing that we will need to change is the `Storage` impl for an in-memory one. /// Generic in-memory journal implementation for testing/simulation -/// Following TigerBeetle's approach: headers in memory, full messages in storage pub struct SimJournal<S: Storage> { storage: S, headers: UnsafeCell<HashMap<u64, PrepareHeader>>, @@ -124,15 +142,17 @@ impl JournalHandle for SimJournal<MemStorage> { } } -/// Placeholder snapshot implementation #[derive(Debug, Default)] pub struct SimSnapshot {} /// Type aliases for simulator metadata pub type SimMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, ConsumerGroups)>; -pub type SimMetadata = - IggyMetadata<VsrConsensus, SimJournal<MemStorage>, SimSnapshot, SimMuxStateMachine>; +pub type SimMetadata = IggyMetadata< + VsrConsensus<SharedMemBus>, + SimJournal<MemStorage>, + SimSnapshot, + SimMuxStateMachine, +>; -/// Placeholder for replica partitions #[derive(Debug, Default)] pub struct ReplicaPartitions {} diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs index fdd296f94..3bc17882a 100644 --- a/core/simulator/src/lib.rs +++ b/core/simulator/src/lib.rs @@ -1,43 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + pub mod bus; +pub mod client; pub mod deps; pub mod replica; use bus::MemBus; -use iggy_common::header::{PrepareHeader, PrepareOkHeader, RequestHeader}; +use iggy_common::header::{GenericHeader, ReplyHeader}; use iggy_common::message::{Message, MessageBag}; -use iggy_common::{ - ClientInfo, ClientInfoDetails, ClusterMetadata, CompressionAlgorithm, Consumer, - ConsumerGroup, ConsumerGroupDetails, ConsumerOffsetInfo, Identifier, IdentityInfo, - IggyDuration, IggyError, IggyExpiry, IggyMessage, MaxTopicSize, Partitioning, Permissions, - PersonalAccessTokenExpiry, PersonalAccessTokenInfo, PolledMessages, PollingStrategy, - RawPersonalAccessToken, Snapshot, SnapshotCompression, Stats, Stream, StreamDetails, - SystemSnapshotType, Topic, TopicDetails, UserInfo, UserInfoDetails, UserStatus, -}; use message_bus::MessageBus; use metadata::Metadata; use replica::Replica; -use std::rc::Rc; +use std::sync::Arc; -/// The main simulator struct that manages all replicas and exposes the SDK API #[derive(Debug)] pub struct Simulator { pub replicas: Vec<Replica>, - pub message_bus: Rc<MemBus>, + pub message_bus: Arc<MemBus>, } impl Simulator { - pub fn new(replica_count: usize) -> Self { - // Create message bus and preseed all replica connections + pub fn new(replica_count: usize, clients: impl Iterator<Item = u128>) -> Self { let mut message_bus = MemBus::new(); + for client in clients { + message_bus.add_client(client, ()); + } - // Register all replicas with the message bus for i in 0..replica_count as u8 { message_bus.add_replica(i); } - let message_bus = Rc::new(message_bus); + let message_bus = Arc::new(message_bus); let replicas = (0..replica_count) - .map(|i| Replica::new(i as u8, format!("replica-{}", i), Rc::clone(&message_bus))) + .map(|i| { + Replica::new( + i as u8, + format!("replica-{}", i), + Arc::clone(&message_bus), + replica_count as u8, + ) + }) .collect(); Self { @@ -47,14 +64,20 @@ impl Simulator { } pub fn with_message_bus(replica_count: usize, mut message_bus: MemBus) -> Self { - // Preseed replica connections for i in 0..replica_count as u8 { message_bus.add_replica(i); } - let message_bus = Rc::new(message_bus); + let message_bus = Arc::new(message_bus); let replicas = (0..replica_count) - .map(|i| Replica::new(i as u8, format!("replica-{}", i), Rc::clone(&message_bus))) + .map(|i| { + Replica::new( + i as u8, + format!("replica-{}", i), + Arc::clone(&message_bus), + replica_count as u8, + ) + }) .collect(); Self { @@ -64,521 +87,79 @@ impl Simulator { } } -// ============================================================================= -// Internal message dispatch - Implementation details -// ============================================================================= - impl Simulator { - /// Dispatch a message to the appropriate subsystem - /// Private - called by public client methods after building the message - /// - /// Routes based on message type and operation: - /// - Request messages → check operation type - /// - Metadata operations (1-99) → metadata shard - /// - Partition operations (100+) → partition shard - /// - Other message types → routed directly - fn dispatch(&self, message: MessageBag) { - match message { - MessageBag::Request(request) => { - // Route request based on operation type - let operation_value = request.header().operation as u32; + pub async fn step(&self) -> Option<Message<ReplyHeader>> { + if let Some(envelope) = self.message_bus.receive() { + if let Some(_client_id) = envelope.to_client { + let reply: Message<ReplyHeader> = envelope + .message + .try_into_typed() + .expect("invalid message, wrong command type for an client response"); + return Some(reply); + } - if operation_value < 100 { - // Metadata operation - self.dispatch_to_metadata(request); - } else { - // Partition operation - self.dispatch_to_partition(request); + if let Some(replica_id) = envelope.to_replica { + if let Some(replica) = self.replicas.get(replica_id as usize) { + self.dispatch_to_replica(replica, envelope.message).await; } } - MessageBag::Prepare(prepare) => { - // TODO: Handle prepare messages (chain replication) - todo!("handle prepare: op={}", prepare.header().op); - } - MessageBag::PrepareOk(prepare_ok) => { - // TODO: Handle acknowledgments - todo!("handle prepare_ok: op={}", prepare_ok.header().op); - } - MessageBag::Reply(_reply) => { - // TODO: Handle replies back to client - todo!("handle reply message"); - } - MessageBag::Commit(_commit) => { - // TODO: Handle commit messages - todo!("handle commit message"); - } - MessageBag::Generic(_generic) => { - // TODO: Handle generic messages - todo!("handle generic message"); - } } - } - - /// Dispatch metadata operation to metadata shard - fn dispatch_to_metadata(&self, request: Message<RequestHeader>) { - // TODO: Determine which replica is primary (for now, use replica 0) - let primary_id = 0; - let primary = &self.replicas[primary_id]; - - // Route to metadata's on_request handler - primary.metadata.on_request(request); - } - - /// Dispatch partition operation to partition shard - fn dispatch_to_partition(&self, request: Message<RequestHeader>) { - // TODO: Determine which partition/replica handles this request - // For now, just route to replica 0's partitions - let replica_id = 0; - let _replica = &self.replicas[replica_id]; - - // TODO: Route to replica.partitions.on_request(request) - todo!( - "dispatch partition operation to replica {}: operation={:?}", - replica_id, - request.header().operation - ); - } - -} - -// ============================================================================= -// Client methods -// ============================================================================= - -impl Simulator { - pub fn connect(&self) -> Result<(), IggyError> { - todo!() - } - - pub fn disconnect(&self) -> Result<(), IggyError> { - todo!() - } - - pub fn shutdown(&self) -> Result<(), IggyError> { - todo!() - } -} - -// ============================================================================= -// System methods -// ============================================================================= - -impl Simulator { - pub fn get_stats(&self) -> Result<Stats, IggyError> { - todo!() - } - - pub fn get_me(&self) -> Result<ClientInfoDetails, IggyError> { - todo!() - } - - pub fn get_client(&self, _client_id: u32) -> Result<Option<ClientInfoDetails>, IggyError> { - todo!() - } - - pub fn get_clients(&self) -> Result<Vec<ClientInfo>, IggyError> { - todo!() - } - - pub fn ping(&self) -> Result<(), IggyError> { - todo!() - } - - pub fn heartbeat_interval(&self) -> IggyDuration { - todo!() - } - - pub fn snapshot( - &self, - _compression: SnapshotCompression, - _snapshot_types: Vec<SystemSnapshotType>, - ) -> Result<Snapshot, IggyError> { - todo!() - } -} - -// ============================================================================= -// User methods -// ============================================================================= - -impl Simulator { - pub fn get_user(&self, _user_id: &Identifier) -> Result<Option<UserInfoDetails>, IggyError> { - todo!() - } - - pub fn get_users(&self) -> Result<Vec<UserInfo>, IggyError> { - todo!() - } - - pub fn create_user( - &self, - _username: &str, - _password: &str, - _status: UserStatus, - _permissions: Option<Permissions>, - ) -> Result<UserInfoDetails, IggyError> { - todo!() - } - - pub fn delete_user(&self, _user_id: &Identifier) -> Result<(), IggyError> { - todo!() - } - - pub fn update_user( - &self, - _user_id: &Identifier, - _username: Option<&str>, - _status: Option<UserStatus>, - ) -> Result<(), IggyError> { - todo!() - } - - pub fn update_permissions( - &self, - _user_id: &Identifier, - _permissions: Option<Permissions>, - ) -> Result<(), IggyError> { - todo!() - } - - pub fn change_password( - &self, - _user_id: &Identifier, - _current_password: &str, - _new_password: &str, - ) -> Result<(), IggyError> { - todo!() - } - - pub fn login_user(&self, _username: &str, _password: &str) -> Result<IdentityInfo, IggyError> { - todo!() - } - - pub fn logout_user(&self) -> Result<(), IggyError> { - todo!() - } -} - -// ============================================================================= -// Personal access token methods -// ============================================================================= - -impl Simulator { - pub fn get_personal_access_tokens(&self) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> { - todo!() - } - - pub fn create_personal_access_token( - &self, - _name: &str, - _expiry: PersonalAccessTokenExpiry, - ) -> Result<RawPersonalAccessToken, IggyError> { - todo!() - } - - pub fn delete_personal_access_token(&self, _name: &str) -> Result<(), IggyError> { - todo!() - } - - pub fn login_with_personal_access_token( - &self, - _token: &str, - ) -> Result<IdentityInfo, IggyError> { - todo!() - } -} - -// ============================================================================= -// Stream methods - Public API -// ============================================================================= -// -// Pattern for all client methods: -// 1. Build Message<RequestHeader> from parameters -// 2. Convert to MessageBag::Request(message) -// 3. Call self.dispatch(bag) - private method -// 4. Wait for reply from primary -// 5. Parse and return result - -impl Simulator { - pub fn get_stream(&self, _stream_id: &Identifier) -> Result<Option<StreamDetails>, IggyError> { - // TODO: Build message with Operation::GetStream (not yet in enum) - // self.dispatch_request(message); - // Wait for reply and parse - todo!() - } - - pub fn get_streams(&self) -> Result<Vec<Stream>, IggyError> { - // TODO: Build message with Operation::ListStreams (not yet in enum) - // self.dispatch_request(message); - // Wait for reply and parse - todo!() - } - - pub fn create_stream(&self, name: &str) -> Result<StreamDetails, IggyError> { - // TODO: Build Message<RequestHeader> with: - // - header.operation = Operation::CreateStream - // - body containing serialized stream name - // Convert to MessageBag (using From impl) and dispatch: - // let message: Message<RequestHeader> = build_request(name); - // self.dispatch(message.into()); // ← .into() converts to MessageBag - // Wait for reply and parse StreamDetails - todo!("build message for: {}", name) - } - - pub fn update_stream(&self, _stream_id: &Identifier, _name: &str) -> Result<(), IggyError> { - todo!() - } - - pub fn delete_stream(&self, _stream_id: &Identifier) -> Result<(), IggyError> { - todo!() - } - - pub fn purge_stream(&self, _stream_id: &Identifier) -> Result<(), IggyError> { - todo!() - } -} - -// ============================================================================= -// Topic methods -// ============================================================================= - -impl Simulator { - pub fn get_topic( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - ) -> Result<Option<TopicDetails>, IggyError> { - todo!() - } - pub fn get_topics(&self, _stream_id: &Identifier) -> Result<Vec<Topic>, IggyError> { - todo!() + None } - pub fn create_topic( - &self, - _stream_id: &Identifier, - _name: &str, - _partitions_count: u32, - _compression_algorithm: CompressionAlgorithm, - _replication_factor: Option<u8>, - _message_expiry: IggyExpiry, - _max_topic_size: MaxTopicSize, - ) -> Result<TopicDetails, IggyError> { - todo!() - } - - pub fn update_topic( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - _name: &str, - _compression_algorithm: CompressionAlgorithm, - _replication_factor: Option<u8>, - _message_expiry: IggyExpiry, - _max_topic_size: MaxTopicSize, - ) -> Result<(), IggyError> { - todo!() - } - - pub fn delete_topic( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - ) -> Result<(), IggyError> { - todo!() - } - - pub fn purge_topic( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - ) -> Result<(), IggyError> { - todo!() - } -} - -// ============================================================================= -// Partition methods -// ============================================================================= - -impl Simulator { - pub fn create_partitions( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - _partitions_count: u32, - ) -> Result<(), IggyError> { - todo!() - } - - pub fn delete_partitions( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - _partitions_count: u32, - ) -> Result<(), IggyError> { - todo!() - } -} - -// ============================================================================= -// Segment methods -// ============================================================================= - -impl Simulator { - pub fn delete_segments( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - _partition_id: u32, - _segments_count: u32, - ) -> Result<(), IggyError> { - todo!() - } -} - -// ============================================================================= -// Message methods -// ============================================================================= - -impl Simulator { - pub fn poll_messages( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - _partition_id: Option<u32>, - _consumer: &Consumer, - _strategy: &PollingStrategy, - _count: u32, - _auto_commit: bool, - ) -> Result<PolledMessages, IggyError> { - todo!() - } - - pub fn send_messages( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - _partitioning: &Partitioning, - _messages: &mut [IggyMessage], - ) -> Result<(), IggyError> { - todo!() - } + async fn dispatch_to_replica(&self, replica: &Replica, message: Message<GenericHeader>) { + let message: MessageBag = message.into(); + let operation = match &message { + MessageBag::Request(message) => message.header().operation, + MessageBag::Prepare(message) => message.header().operation, + MessageBag::PrepareOk(message) => message.header().operation, + } as u8; - pub fn flush_unsaved_buffer( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - _partition_id: u32, - _fsync: bool, - ) -> Result<(), IggyError> { - todo!() - } -} - -// ============================================================================= -// Consumer offset methods -// ============================================================================= - -impl Simulator { - pub fn store_consumer_offset( - &self, - _consumer: &Consumer, - _stream_id: &Identifier, - _topic_id: &Identifier, - _partition_id: Option<u32>, - _offset: u64, - ) -> Result<(), IggyError> { - todo!() - } - - pub fn get_consumer_offset( - &self, - _consumer: &Consumer, - _stream_id: &Identifier, - _topic_id: &Identifier, - _partition_id: Option<u32>, - ) -> Result<Option<ConsumerOffsetInfo>, IggyError> { - todo!() - } - - pub fn delete_consumer_offset( - &self, - _consumer: &Consumer, - _stream_id: &Identifier, - _topic_id: &Identifier, - _partition_id: Option<u32>, - ) -> Result<(), IggyError> { - todo!() - } -} - -// ============================================================================= -// Consumer group methods -// ============================================================================= - -impl Simulator { - pub fn get_consumer_group( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - _group_id: &Identifier, - ) -> Result<Option<ConsumerGroupDetails>, IggyError> { - todo!() - } - - pub fn get_consumer_groups( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - ) -> Result<Vec<ConsumerGroup>, IggyError> { - todo!() - } - - pub fn create_consumer_group( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - _name: &str, - ) -> Result<ConsumerGroupDetails, IggyError> { - todo!() - } - - pub fn delete_consumer_group( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - _group_id: &Identifier, - ) -> Result<(), IggyError> { - todo!() - } - - pub fn join_consumer_group( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - _group_id: &Identifier, - ) -> Result<(), IggyError> { - todo!() + if operation < 200 { + self.dispatch_to_metadata_on_replica(replica, message).await; + } else { + self.dispatch_to_partition_on_replica(replica, message); + } } - pub fn leave_consumer_group( - &self, - _stream_id: &Identifier, - _topic_id: &Identifier, - _group_id: &Identifier, - ) -> Result<(), IggyError> { - todo!() + async fn dispatch_to_metadata_on_replica(&self, replica: &Replica, message: MessageBag) { + match message { + MessageBag::Request(request) => { + replica.metadata.on_request(request).await; + } + MessageBag::Prepare(prepare) => { + replica.metadata.on_replicate(prepare).await; + } + MessageBag::PrepareOk(prepare_ok) => { + replica.metadata.on_ack(prepare_ok).await; + } + } } -} -// ============================================================================= -// Cluster methods -// ============================================================================= - -impl Simulator { - pub fn get_cluster_metadata(&self) -> Result<ClusterMetadata, IggyError> { - todo!() + fn dispatch_to_partition_on_replica(&self, replica: &Replica, message: MessageBag) { + match message { + MessageBag::Request(request) => { + todo!( + "dispatch request to partition replica {}: operation={:?}", + replica.id, + request.header().operation + ); + } + MessageBag::Prepare(prepare) => { + todo!( + "dispatch prepare to partition replica {}: operation={:?}", + replica.id, + prepare.header().operation + ); + } + MessageBag::PrepareOk(prepare_ok) => { + todo!( + "dispatch prepare_ok to partition replica {}: op={}", + replica.id, + prepare_ok.header().op + ); + } + } } } diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs index 0790f62d5..995c717a3 100644 --- a/core/simulator/src/main.rs +++ b/core/simulator/src/main.rs @@ -1,9 +1,100 @@ -use simulator::Simulator; +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. -fn main() { - let sim = Simulator::new(3); - println!("Created simulator with {} replicas", sim.replicas.len()); - for replica in &sim.replicas { - println!(" - {} (id: {})", replica.name, replica.id); +use iggy_common::header::ReplyHeader; +use iggy_common::message::Message; +use message_bus::MessageBus; +use simulator::{Simulator, client::SimClient}; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; + +/// Shared response queue for client replies +#[derive(Default)] +pub struct Responses { + queue: VecDeque<Message<ReplyHeader>>, +} + +impl Responses { + pub fn push(&mut self, msg: Message<ReplyHeader>) { + self.queue.push_back(msg); } + + pub fn pop(&mut self) -> Option<Message<ReplyHeader>> { + self.queue.pop_front() + } +} + +fn main() { + let client_id: u128 = 1; + let leader: u8 = 0; + let sim = Simulator::new(3, std::iter::once(client_id)); + let bus = sim.message_bus.clone(); + + // Responses queue + let responses = Arc::new(Mutex::new(Responses::default())); + let responses_clone = responses.clone(); + + // TODO: Scuffed client/simulator setup. + // We need a better interface on simulator + let client_handle = std::thread::spawn(move || { + futures::executor::block_on(async { + let client = SimClient::new(client_id); + + let create_msg = client.create_stream("test-stream"); + bus.send_to_replica(leader, create_msg.into_generic()) + .await + .expect("failed to send create_stream"); + + loop { + if let Some(reply) = responses_clone.lock().unwrap().pop() { + println!("[client] Got create_stream reply: {:?}", reply.header()); + break; + } + std::thread::sleep(std::time::Duration::from_millis(1)); + } + + let delete_msg = client.delete_stream("test-stream"); + bus.send_to_replica(leader, delete_msg.into_generic()) + .await + .expect("failed to send delete_stream"); + + loop { + if let Some(reply) = responses_clone.lock().unwrap().pop() { + println!("[client] Got delete_stream reply: {:?}", reply.header()); + break; + } + std::thread::sleep(std::time::Duration::from_millis(1)); + } + }); + }); + + println!("[sim] Starting simulator loop"); + futures::executor::block_on(async { + loop { + if let Some(reply) = sim.step().await { + responses.lock().unwrap().push(reply); + } + + if client_handle.is_finished() { + break; + } + } + }); + + client_handle.join().expect("client thread panicked"); + println!("[sim] Simulator loop ended"); } diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs index 1e93b9640..74c26b72f 100644 --- a/core/simulator/src/replica.rs +++ b/core/simulator/src/replica.rs @@ -1,12 +1,30 @@ -use crate::bus::MemBus; +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::bus::{MemBus, SharedMemBus}; use crate::deps::{ MemStorage, ReplicaPartitions, SimJournal, SimMetadata, SimMuxStateMachine, SimSnapshot, }; +use consensus::VsrConsensus; use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner}; use metadata::stm::stream::{Streams, StreamsInner}; use metadata::stm::user::{Users, UsersInner}; -use metadata::{variadic, IggyMetadata}; -use std::rc::Rc; +use metadata::{IggyMetadata, variadic}; +use std::sync::Arc; #[derive(Debug)] pub struct Replica { @@ -14,22 +32,30 @@ pub struct Replica { pub name: String, pub metadata: SimMetadata, pub partitions: ReplicaPartitions, - pub bus: Rc<MemBus>, + pub bus: Arc<MemBus>, } impl Replica { - pub fn new(id: u8, name: String, bus: Rc<MemBus>) -> Self { - // Create the mux state machine with all state machines + pub fn new(id: u8, name: String, bus: Arc<MemBus>, replica_count: u8) -> Self { let users: Users = UsersInner::new().into(); let streams: Streams = StreamsInner::new().into(); let consumer_groups: ConsumerGroups = ConsumerGroupsInner::new().into(); let mux = SimMuxStateMachine::new(variadic!(users, streams, consumer_groups)); + let cluster_id: u128 = 1; // TODO: Make configurable + let consensus = VsrConsensus::new( + cluster_id, + id, + replica_count, + SharedMemBus(Arc::clone(&bus)), + ); + consensus.init(); + Self { id, name, metadata: IggyMetadata { - consensus: None, // TODO: Init consensus + consensus: Some(consensus), journal: Some(SimJournal::<MemStorage>::default()), snapshot: Some(SimSnapshot::default()), mux_stm: mux,
