This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch plane_demuxer in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 035fe3eaf38c4cce1320c5097fe606b2ee18ccbe Author: numinex <[email protected]> AuthorDate: Thu Feb 19 11:19:17 2026 +0100 good --- core/common/src/lib.rs | 1 + core/common/src/macros.rs | 27 +++++++ core/common/src/types/consensus/header.rs | 88 ++++++++++++++++++--- core/common/src/types/consensus/message.rs | 37 ++++++--- core/consensus/src/lib.rs | 15 +++- core/consensus/src/plane_mux.rs | 121 +++++++++++++++++++++++++++++ core/metadata/src/impls/metadata.rs | 55 ++++++++++++- core/metadata/src/stm/mux.rs | 13 +--- core/partitions/src/iggy_partitions.rs | 36 +++++++-- core/simulator/src/deps.rs | 6 +- core/simulator/src/lib.rs | 46 ++--------- core/simulator/src/replica.rs | 31 +++++--- 12 files changed, 378 insertions(+), 98 deletions(-) diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index a87c07b5c..6e8b4dcf7 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -22,6 +22,7 @@ mod certificates; mod commands; mod deduplication; mod error; +mod macros; mod sender; pub mod sharding; mod traits; diff --git a/core/common/src/macros.rs b/core/common/src/macros.rs new file mode 100644 index 000000000..c7c1ce3f6 --- /dev/null +++ b/core/common/src/macros.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_export] +macro_rules! variadic { + () => (()); + (...$a:ident $(,)?) => ($a); + (...$a:expr $(,)?) => ($a); + ($a:ident $(,)?) => (($a, ())); + ($a:expr $(,)?) => (($a, ())); + ($a:ident, $($b:tt)+) => (($a, $crate::variadic!($($b)+))); + ($a:expr, $($b:tt)+) => (($a, $crate::variadic!($($b)+))); +} diff --git a/core/common/src/types/consensus/header.rs b/core/common/src/types/consensus/header.rs index c8d311fa7..bda75e93b 100644 --- a/core/common/src/types/consensus/header.rs +++ b/core/common/src/types/consensus/header.rs @@ -21,9 +21,10 @@ use thiserror::Error; const HEADER_SIZE: usize = 256; pub trait ConsensusHeader: Sized + Pod + Zeroable { const COMMAND: Command2; - const _SIZE_CHECK: () = assert!(std::mem::size_of::<Self>() == HEADER_SIZE); fn validate(&self) -> Result<(), ConsensusError>; + fn operation(&self) -> Operation; + fn command(&self) -> Command2; fn size(&self) -> u32; } @@ -138,6 +139,7 @@ pub struct GenericHeader { pub namespace: u64, pub reserved_command: [u8; 120], } +const _: () = assert!(core::mem::size_of::<GenericHeader>() == HEADER_SIZE); unsafe impl Pod for GenericHeader {} unsafe impl Zeroable for GenericHeader {} @@ -145,6 +147,14 @@ unsafe impl Zeroable for GenericHeader {} impl ConsensusHeader for GenericHeader { const COMMAND: Command2 = Command2::Reserved; + fn operation(&self) -> Operation { + Operation::Default + } + + fn command(&self) -> Command2 { + self.command + } + fn validate(&self) -> Result<(), ConsensusError> { Ok(()) } @@ -176,6 +186,7 @@ pub struct RequestHeader { pub namespace: u64, pub reserved: [u8; 64], } +const _: () = assert!(core::mem::size_of::<RequestHeader>() == HEADER_SIZE); impl Default for RequestHeader { fn default() -> Self { @@ -207,6 +218,10 @@ unsafe impl Zeroable for RequestHeader {} impl ConsensusHeader for RequestHeader { const COMMAND: Command2 = Command2::Request; + fn operation(&self) -> Operation { + self.operation + } + fn validate(&self) -> Result<(), ConsensusError> { if self.command != Command2::Request { return Err(ConsensusError::InvalidCommand { @@ -216,6 +231,9 @@ impl ConsensusHeader for RequestHeader { } Ok(()) } + fn command(&self) -> Command2 { + self.command + } fn size(&self) -> u32 { self.size @@ -248,6 +266,7 @@ pub struct PrepareHeader { pub namespace: u64, pub reserved: [u8; 32], } +const _: () = assert!(core::mem::size_of::<PrepareHeader>() == HEADER_SIZE); unsafe impl Pod for PrepareHeader {} unsafe impl Zeroable for PrepareHeader {} @@ -255,6 +274,10 @@ unsafe impl Zeroable for PrepareHeader {} impl ConsensusHeader for PrepareHeader { const COMMAND: Command2 = Command2::Prepare; + fn operation(&self) -> Operation { + self.operation + } + fn validate(&self) -> Result<(), ConsensusError> { if self.command != Command2::Prepare { return Err(ConsensusError::InvalidCommand { @@ -264,6 +287,9 @@ impl ConsensusHeader for PrepareHeader { } Ok(()) } + fn command(&self) -> Command2 { + self.command + } fn size(&self) -> u32 { self.size @@ -322,6 +348,7 @@ pub struct PrepareOkHeader { pub namespace: u64, pub reserved: [u8; 48], } +const _: () = assert!(core::mem::size_of::<PrepareOkHeader>() == HEADER_SIZE); unsafe impl Pod for PrepareOkHeader {} unsafe impl Zeroable for PrepareOkHeader {} @@ -329,6 +356,13 @@ unsafe impl Zeroable for PrepareOkHeader {} impl ConsensusHeader for PrepareOkHeader { const COMMAND: Command2 = Command2::PrepareOk; + fn operation(&self) -> Operation { + self.operation + } + fn command(&self) -> Command2 { + self.command + } + fn validate(&self) -> Result<(), ConsensusError> { if self.command != Command2::PrepareOk { return Err(ConsensusError::InvalidCommand { @@ -390,6 +424,7 @@ pub struct CommitHeader { pub namespace: u64, pub reserved: [u8; 80], } +const _: () = assert!(core::mem::size_of::<CommitHeader>() == HEADER_SIZE); unsafe impl Pod for CommitHeader {} unsafe impl Zeroable for CommitHeader {} @@ -397,6 +432,13 @@ unsafe impl Zeroable for CommitHeader {} impl ConsensusHeader for CommitHeader { const COMMAND: Command2 = Command2::Commit; + fn operation(&self) -> Operation { + Operation::Default + } + fn command(&self) -> Command2 { + self.command + } + fn validate(&self) -> Result<(), ConsensusError> { if self.command != Command2::Commit { return Err(ConsensusError::CommitInvalidCommand2); @@ -436,6 +478,7 @@ pub struct ReplyHeader { pub namespace: u64, pub reserved: [u8; 41], } +const _: () = assert!(core::mem::size_of::<ReplyHeader>() == HEADER_SIZE); unsafe impl Pod for ReplyHeader {} unsafe impl Zeroable for ReplyHeader {} @@ -443,6 +486,13 @@ unsafe impl Zeroable for ReplyHeader {} impl ConsensusHeader for ReplyHeader { const COMMAND: Command2 = Command2::Reply; + fn operation(&self) -> Operation { + self.operation + } + fn command(&self) -> Command2 { + self.command + } + fn validate(&self) -> Result<(), ConsensusError> { if self.command != Command2::Reply { return Err(ConsensusError::ReplyInvalidCommand2); @@ -489,19 +539,18 @@ impl Default for ReplyHeader { #[repr(C)] pub struct StartViewChangeHeader { pub checksum: u128, - pub checksum_padding: u128, pub checksum_body: u128, - pub checksum_body_padding: u128, pub cluster: u128, pub size: u32, pub view: u32, pub release: u32, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 42], + pub reserved_frame: [u8; 58], pub reserved: [u8; 128], } +const _: () = assert!(core::mem::size_of::<StartViewChangeHeader>() == HEADER_SIZE); unsafe impl Pod for StartViewChangeHeader {} unsafe impl Zeroable for StartViewChangeHeader {} @@ -509,6 +558,13 @@ unsafe impl Zeroable for StartViewChangeHeader {} impl ConsensusHeader for StartViewChangeHeader { const COMMAND: Command2 = Command2::StartViewChange; + fn operation(&self) -> Operation { + Operation::Default + } + fn command(&self) -> Command2 { + self.command + } + fn validate(&self) -> Result<(), ConsensusError> { if self.command != Command2::StartViewChange { return Err(ConsensusError::InvalidCommand { @@ -536,16 +592,14 @@ impl ConsensusHeader for StartViewChangeHeader { #[repr(C)] pub struct DoViewChangeHeader { pub checksum: u128, - pub checksum_padding: u128, pub checksum_body: u128, - pub checksum_body_padding: u128, pub cluster: u128, pub size: u32, pub view: u32, pub release: u32, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 42], + pub reserved_frame: [u8; 58], /// The highest op-number in this replica's log. /// Used to select the most complete log when log_view values are equal. @@ -559,6 +613,7 @@ pub struct DoViewChangeHeader { pub log_view: u32, pub reserved: [u8; 108], } +const _: () = assert!(core::mem::size_of::<DoViewChangeHeader>() == HEADER_SIZE); unsafe impl Pod for DoViewChangeHeader {} unsafe impl Zeroable for DoViewChangeHeader {} @@ -566,6 +621,13 @@ unsafe impl Zeroable for DoViewChangeHeader {} impl ConsensusHeader for DoViewChangeHeader { const COMMAND: Command2 = Command2::DoViewChange; + fn operation(&self) -> Operation { + Operation::Default + } + fn command(&self) -> Command2 { + self.command + } + fn validate(&self) -> Result<(), ConsensusError> { if self.command != Command2::DoViewChange { return Err(ConsensusError::InvalidCommand { @@ -609,16 +671,14 @@ impl ConsensusHeader for DoViewChangeHeader { #[repr(C)] pub struct StartViewHeader { pub checksum: u128, - pub checksum_padding: u128, pub checksum_body: u128, - pub checksum_body_padding: u128, pub cluster: u128, pub size: u32, pub view: u32, pub release: u32, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 42], + pub reserved_frame: [u8; 58], /// The op-number of the highest entry in the new primary's log. /// Backups set their op to this value. @@ -629,6 +689,7 @@ pub struct StartViewHeader { pub commit: u64, pub reserved: [u8; 112], } +const _: () = assert!(core::mem::size_of::<StartViewHeader>() == HEADER_SIZE); unsafe impl Pod for StartViewHeader {} unsafe impl Zeroable for StartViewHeader {} @@ -636,6 +697,13 @@ unsafe impl Zeroable for StartViewHeader {} impl ConsensusHeader for StartViewHeader { const COMMAND: Command2 = Command2::StartView; + fn operation(&self) -> Operation { + Operation::Default + } + fn command(&self) -> Command2 { + self.command + } + fn validate(&self) -> Result<(), ConsensusError> { if self.command != Command2::StartView { return Err(ConsensusError::InvalidCommand { diff --git a/core/common/src/types/consensus/message.rs b/core/common/src/types/consensus/message.rs index 5fe7e0843..7ef39cdca 100644 --- a/core/common/src/types/consensus/message.rs +++ b/core/common/src/types/consensus/message.rs @@ -22,6 +22,25 @@ use crate::{ use bytes::Bytes; use std::marker::PhantomData; +// TODO: Rename this to Message and ConsensusHeader to Header. +pub trait ConsensusMessage<H> +where + H: ConsensusHeader, +{ + // TODO: fn body(&self) -> Something; + fn header(&self) -> &H; +} + +impl<H> ConsensusMessage<H> for Message<H> +where + H: ConsensusHeader, +{ + fn header(&self) -> &H { + let header_bytes = &self.buffer[..size_of::<H>()]; + bytemuck::from_bytes(header_bytes) + } +} + #[derive(Debug, Clone)] pub struct Message<H: ConsensusHeader> { buffer: Bytes, @@ -32,6 +51,13 @@ impl<H> Message<H> where H: ConsensusHeader, { + #[inline] + #[allow(unused)] + pub fn header(&self) -> &H { + let header_bytes = &self.buffer[..size_of::<H>()]; + bytemuck::from_bytes(header_bytes) + } + /// Create a new message from a buffer. /// /// # Safety @@ -115,17 +141,6 @@ where } } - /// Get a reference to the header using zero-copy access. - /// - /// This uses `bytemuck::from_bytes` to cast the buffer to the header type - /// without any copying or allocation. - #[inline] - #[allow(unused)] - pub fn header(&self) -> &H { - let header_bytes = &self.buffer[..size_of::<H>()]; - bytemuck::from_bytes(header_bytes) - } - /// Get a reference to the message body (everything after the header). /// /// Returns an empty slice if there is no body. diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index 767ee9bf8..f021f3d11 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -16,6 +16,7 @@ // under the License. use iggy_common::header::ConsensusHeader; +use iggy_common::message::ConsensusMessage; use message_bus::MessageBus; pub trait Project<T, C: Consensus> { @@ -53,7 +54,7 @@ pub trait Pipeline { pub trait Consensus: Sized { type MessageBus: MessageBus; #[rustfmt::skip] // Scuffed formatter. - type Message<H> where H: ConsensusHeader; + type Message<H>: ConsensusMessage<H> where H: ConsensusHeader; type RequestHeader: ConsensusHeader; type ReplicateHeader: ConsensusHeader; @@ -95,8 +96,20 @@ where fn on_ack(&self, message: C::Message<C::AckHeader>) -> impl Future<Output = ()>; } +pub trait PlaneIdentity<C> +where + C: Consensus, +{ + fn is_applicable<H>(&self, message: &C::Message<H>) -> bool + where + H: ConsensusHeader, + C::Message<H>: ConsensusMessage<H>; +} + mod impls; pub use impls::*; +mod plane_mux; +pub use plane_mux::*; mod plane_helpers; pub use plane_helpers::*; diff --git a/core/consensus/src/plane_mux.rs b/core/consensus/src/plane_mux.rs new file mode 100644 index 000000000..6e849be5b --- /dev/null +++ b/core/consensus/src/plane_mux.rs @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{Consensus, Plane, PlaneIdentity, Project}; +use iggy_common::variadic; + +#[derive(Debug)] +pub struct MuxPlane<T> { + inner: T, +} + +impl<T> MuxPlane<T> { + pub fn new(inner: T) -> Self { + Self { inner } + } + + pub fn inner(&self) -> &T { + &self.inner + } + + pub fn inner_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl<C, T> Plane<C> for MuxPlane<T> +where + C: Consensus, + T: Plane<C>, +{ + async fn on_request(&self, message: C::Message<C::RequestHeader>) + where + C::Message<C::RequestHeader>: + Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone, + { + self.inner.on_request(message).await; + } + + async fn on_replicate(&self, message: C::Message<C::ReplicateHeader>) + where + C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, Consensus = C> + Clone, + { + self.inner.on_replicate(message).await; + } + + async fn on_ack(&self, message: C::Message<C::AckHeader>) { + self.inner.on_ack(message).await; + } +} + +impl<C> Plane<C> for () +where + C: Consensus, +{ + async fn on_request(&self, _message: C::Message<C::RequestHeader>) + where + C::Message<C::RequestHeader>: + Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone, + { + } + + async fn on_replicate(&self, _message: C::Message<C::ReplicateHeader>) + where + C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, Consensus = C> + Clone, + { + } + + async fn on_ack(&self, _message: C::Message<C::AckHeader>) {} +} + +impl<C, Head, Tail> Plane<C> for variadic!(Head, ...Tail) +where + C: Consensus, + Head: Plane<C> + PlaneIdentity<C>, + Tail: Plane<C>, +{ + async fn on_request(&self, message: C::Message<C::RequestHeader>) + where + C::Message<C::RequestHeader>: + Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone, + { + if self.0.is_applicable(&message) { + self.0.on_request(message).await; + } else { + self.1.on_request(message).await; + } + } + + async fn on_replicate(&self, message: C::Message<C::ReplicateHeader>) + where + C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, Consensus = C> + Clone, + { + if self.0.is_applicable(&message) { + self.0.on_replicate(message).await; + } else { + self.1.on_replicate(message).await; + } + } + + async fn on_ack(&self, message: C::Message<C::AckHeader>) { + if self.0.is_applicable(&message) { + self.0.on_ack(message).await; + } else { + self.1.on_ack(message).await; + } + } +} diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 4ff74f4e2..ce4e324c6 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -17,14 +17,17 @@ use crate::stm::StateMachine; use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, SnapshotError}; use consensus::{ - Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, ack_preflight, - ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, + Consensus, Pipeline, PipelineEntry, Plane, PlaneIdentity, Project, Sequencer, VsrConsensus, + ack_preflight, ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, }; use iggy_common::{ - header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, RequestHeader}, - message::Message, + header::{ + Command2, ConsensusHeader, GenericHeader, Operation, PrepareHeader, PrepareOkHeader, + RequestHeader, + }, + message::{ConsensusMessage, Message}, }; use journal::{Journal, JournalHandle}; use message_bus::MessageBus; @@ -239,6 +242,50 @@ where } } +impl<B, P, J, S, M> PlaneIdentity<VsrConsensus<B, P>> for IggyMetadata<VsrConsensus<B, P>, J, S, M> +where + B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, + J: JournalHandle, + J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = PrepareHeader>, + M: StateMachine<Input = Message<PrepareHeader>>, +{ + fn is_applicable<H>(&self, message: &<VsrConsensus<B, P> as Consensus>::Message<H>) -> bool + where + H: ConsensusHeader, + <VsrConsensus<B, P> as Consensus>::Message<H>: ConsensusMessage<H>, + { + assert!(matches!( + message.header().command(), + Command2::Request | Command2::Prepare | Command2::PrepareOk + )); + let operation = message.header().operation(); + // TODO: Use better heuristic, smth like greater or equal based on op number. + matches!( + operation, + Operation::CreateStream + | Operation::UpdateStream + | Operation::DeleteStream + | Operation::PurgeStream + | Operation::CreateTopic + | Operation::UpdateTopic + | Operation::DeleteTopic + | Operation::PurgeTopic + | Operation::CreatePartitions + | Operation::DeletePartitions + | Operation::CreateConsumerGroup + | Operation::DeleteConsumerGroup + | Operation::CreateUser + | Operation::UpdateUser + | Operation::DeleteUser + | Operation::ChangePassword + | Operation::UpdatePermissions + | Operation::CreatePersonalAccessToken + | Operation::DeletePersonalAccessToken + ) + } +} + impl<B, P, J, S, M> IggyMetadata<VsrConsensus<B, P>, J, S, M> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs index 70830355c..52a3c8bdc 100644 --- a/core/metadata/src/stm/mux.rs +++ b/core/metadata/src/stm/mux.rs @@ -18,6 +18,7 @@ use crate::stm::snapshot::{FillSnapshot, RestoreSnapshot, SnapshotError}; use iggy_common::Either; use iggy_common::{header::PrepareHeader, message::Message}; +use iggy_common::variadic; use crate::stm::{State, StateMachine}; @@ -52,18 +53,6 @@ where } } -//TODO: Move to common -#[macro_export] -macro_rules! variadic { - () => ( () ); - (...$a:ident $(,)? ) => ( $a ); - (...$a:expr $(,)? ) => ( $a ); - ($a:ident $(,)? ) => ( ($a, ()) ); - ($a:expr $(,)? ) => ( ($a, ()) ); - ($a:ident, $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) ); - ($a:expr, $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) ); -} - // TODO: Figure out how to get around the fact that we need to hardcode the Input/Output type for base case. // TODO: I think we could move the base case to the impl site of `State`, so this way we know the `Input` and `Output` types. // Base case of the recursive resolution. diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 16ed124ab..255a31fff 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -21,15 +21,19 @@ use crate::IggyPartition; use crate::Partition; use crate::types::PartitionsConfig; use consensus::{ - Consensus, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, ack_preflight, - ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, pipeline_prepare_common, - replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, + Consensus, PipelineEntry, Plane, PlaneIdentity, Project, Sequencer, VsrConsensus, + ack_preflight, ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, + pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain, + send_prepare_ok as send_prepare_ok_common, }; +use iggy_common::header::Command2; use iggy_common::{ INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, PartitionStats, PooledBuffer, Segment, SegmentStorage, - header::{GenericHeader, Operation, PrepareHeader, PrepareOkHeader, RequestHeader}, - message::Message, + header::{ + ConsensusHeader, GenericHeader, Operation, PrepareHeader, PrepareOkHeader, RequestHeader, + }, + message::{ConsensusMessage, Message}, sharding::{IggyNamespace, LocalIdx, ShardId}, }; use message_bus::MessageBus; @@ -464,6 +468,28 @@ where } } +impl<B> PlaneIdentity<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>> +where + B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, +{ + fn is_applicable<H>(&self, message: &<VsrConsensus<B> as Consensus>::Message<H>) -> bool + where + H: ConsensusHeader, + <VsrConsensus<B> as Consensus>::Message<H>: ConsensusMessage<H>, + { + assert!(matches!( + message.header().command(), + Command2::Request | Command2::Prepare | Command2::PrepareOk + )); + let operation = message.header().operation(); + // TODO: Use better selection, smth like greater or equal based on op number. + matches!( + operation, + Operation::DeleteSegments | Operation::SendMessages | Operation::StoreConsumerOffset + ) + } +} + impl<B> IggyPartitions<VsrConsensus<B>> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs index 25175ea8e..d6fab8623 100644 --- a/core/simulator/src/deps.rs +++ b/core/simulator/src/deps.rs @@ -17,14 +17,15 @@ use crate::bus::SharedMemBus; use bytes::Bytes; -use consensus::VsrConsensus; +use consensus::{MuxPlane, VsrConsensus}; +use iggy_common::variadic; use iggy_common::header::PrepareHeader; use iggy_common::message::Message; use journal::{Journal, JournalHandle, Storage}; use metadata::stm::consumer_group::ConsumerGroups; use metadata::stm::stream::Streams; use metadata::stm::user::Users; -use metadata::{IggyMetadata, MuxStateMachine, variadic}; +use metadata::{IggyMetadata, MuxStateMachine}; use std::cell::{Cell, RefCell, UnsafeCell}; use std::collections::HashMap; @@ -160,3 +161,4 @@ pub type SimMetadata = IggyMetadata< /// Type alias for simulator partitions pub type ReplicaPartitions = partitions::IggyPartitions<VsrConsensus<SharedMemBus>>; +pub type SimPlane = MuxPlane<variadic!(SimMetadata, ReplicaPartitions)>; diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs index 44e063f78..cc848e0ed 100644 --- a/core/simulator/src/lib.rs +++ b/core/simulator/src/lib.rs @@ -37,7 +37,7 @@ impl Simulator { /// Initialize a partition on all replicas (in-memory for simulation) pub fn init_partition(&mut self, namespace: iggy_common::sharding::IggyNamespace) { for replica in &mut self.replicas { - replica.partitions.init_partition_in_memory(namespace); + replica.init_partition_in_memory(namespace); } } @@ -115,46 +115,10 @@ impl Simulator { } async fn dispatch_to_replica(&self, replica: &Replica, message: Message<GenericHeader>) { - let message: MessageBag = message.into(); - let operation = match &message { - MessageBag::Request(message) => message.header().operation, - MessageBag::Prepare(message) => message.header().operation, - MessageBag::PrepareOk(message) => message.header().operation, - } as u8; - - if operation < 200 { - self.dispatch_to_metadata_on_replica(replica, message).await; - } else { - self.dispatch_to_partition_on_replica(replica, message) - .await; - } - } - - async fn dispatch_to_metadata_on_replica(&self, replica: &Replica, message: MessageBag) { - match message { - MessageBag::Request(request) => { - replica.metadata.on_request(request).await; - } - MessageBag::Prepare(prepare) => { - replica.metadata.on_replicate(prepare).await; - } - MessageBag::PrepareOk(prepare_ok) => { - replica.metadata.on_ack(prepare_ok).await; - } - } - } - - async fn dispatch_to_partition_on_replica(&self, replica: &Replica, message: MessageBag) { - match message { - MessageBag::Request(request) => { - replica.partitions.on_request(request).await; - } - MessageBag::Prepare(prepare) => { - replica.partitions.on_replicate(prepare).await; - } - MessageBag::PrepareOk(prepare_ok) => { - replica.partitions.on_ack(prepare_ok).await; - } + match MessageBag::from(message) { + MessageBag::Request(request) => replica.plane.on_request(request).await, + MessageBag::Prepare(prepare) => replica.plane.on_replicate(prepare).await, + MessageBag::PrepareOk(prepare_ok) => replica.plane.on_ack(prepare_ok).await, } } } diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs index ae4631e95..c975ce8d6 100644 --- a/core/simulator/src/replica.rs +++ b/core/simulator/src/replica.rs @@ -17,23 +17,23 @@ use crate::bus::{MemBus, SharedMemBus}; use crate::deps::{ - MemStorage, ReplicaPartitions, SimJournal, SimMetadata, SimMuxStateMachine, SimSnapshot, + MemStorage, ReplicaPartitions, SimJournal, SimMuxStateMachine, SimPlane, SimSnapshot, }; -use consensus::{LocalPipeline, VsrConsensus}; +use consensus::{LocalPipeline, MuxPlane, VsrConsensus}; use iggy_common::IggyByteSize; use iggy_common::sharding::ShardId; +use iggy_common::variadic; use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner}; use metadata::stm::stream::{Streams, StreamsInner}; use metadata::stm::user::{Users, UsersInner}; -use metadata::{IggyMetadata, variadic}; +use metadata::IggyMetadata; use partitions::PartitionsConfig; use std::sync::Arc; pub struct Replica { pub id: u8, pub name: String, - pub metadata: SimMetadata, - pub partitions: ReplicaPartitions, + pub plane: SimPlane, pub bus: Arc<MemBus>, } @@ -83,17 +83,24 @@ impl Replica { ReplicaPartitions::new(ShardId::new(id as u16), partitions_config, None) }; + let metadata = IggyMetadata { + consensus: Some(metadata_consensus), + journal: Some(SimJournal::<MemStorage>::default()), + snapshot: Some(SimSnapshot::default()), + mux_stm: mux, + }; + let plane = MuxPlane::new(variadic!(metadata, partitions)); + Self { id, name, - metadata: IggyMetadata { - consensus: Some(metadata_consensus), - journal: Some(SimJournal::<MemStorage>::default()), - snapshot: Some(SimSnapshot::default()), - mux_stm: mux, - }, - partitions, + plane, bus, } } + + pub fn init_partition_in_memory(&mut self, namespace: iggy_common::sharding::IggyNamespace) { + // TODO: create an accessor for the partitions within mux plane, same for metadata. + self.plane.inner_mut().1.0.init_partition_in_memory(namespace); + } }
