This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new aff29a544 feat(cluster): create a unified abstraction for subsystems
(#2780)
aff29a544 is described below
commit aff29a544372fe5487598339830dafecb39aa14e
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Feb 20 17:53:25 2026 +0100
feat(cluster): create a unified abstraction for subsystems (#2780)
Adds an `Plane` trait that is used to dispatch requests to different
subsystems based on the plane demux logic (similarly to how
`StateMachine` works).
---
core/common/src/lib.rs | 1 +
core/common/src/macros.rs | 27 ++++
core/common/src/types/consensus/header.rs | 199 ++++++++++++++++++++++++++++-
core/common/src/types/consensus/message.rs | 37 ++++--
core/consensus/src/lib.rs | 30 +++--
core/consensus/src/plane_mux.rs | 120 +++++++++++++++++
core/metadata/src/impls/metadata.rs | 57 ++++++++-
core/metadata/src/stm/mux.rs | 13 +-
core/partitions/src/iggy_partitions.rs | 27 +++-
core/simulator/src/deps.rs | 8 +-
core/simulator/src/lib.rs | 58 +++------
core/simulator/src/replica.rs | 29 ++---
12 files changed, 508 insertions(+), 98 deletions(-)
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index a87c07b5c..6e8b4dcf7 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -22,6 +22,7 @@ mod certificates;
mod commands;
mod deduplication;
mod error;
+mod macros;
mod sender;
pub mod sharding;
mod traits;
diff --git a/core/common/src/macros.rs b/core/common/src/macros.rs
new file mode 100644
index 000000000..c7c1ce3f6
--- /dev/null
+++ b/core/common/src/macros.rs
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_export]
+macro_rules! variadic {
+ () => (());
+ (...$a:ident $(,)?) => ($a);
+ (...$a:expr $(,)?) => ($a);
+ ($a:ident $(,)?) => (($a, ()));
+ ($a:expr $(,)?) => (($a, ()));
+ ($a:ident, $($b:tt)+) => (($a, $crate::variadic!($($b)+)));
+ ($a:expr, $($b:tt)+) => (($a, $crate::variadic!($($b)+)));
+}
diff --git a/core/common/src/types/consensus/header.rs
b/core/common/src/types/consensus/header.rs
index 2991f89c3..2f1a83d62 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -21,11 +21,10 @@ use thiserror::Error;
const HEADER_SIZE: usize = 256;
pub trait ConsensusHeader: Sized + Pod + Zeroable {
const COMMAND: Command2;
- // TODO: Trait consts are never evaluated unless explicitly accessed (e.g.
`<T as ConsensusHeader>::_SIZE_CHECK`).
- // The size invariant is enforced by repr(C) layout + bytemuck Pod derive;
consider adding a static_assert in each impl.
- const _SIZE_CHECK: () = assert!(std::mem::size_of::<Self>() ==
HEADER_SIZE);
fn validate(&self) -> Result<(), ConsensusError>;
+ fn operation(&self) -> Operation;
+ fn command(&self) -> Command2;
fn size(&self) -> u32;
}
@@ -139,6 +138,20 @@ pub struct GenericHeader {
pub reserved_command: [u8; 128],
}
+const _: () = {
+ assert!(core::mem::size_of::<GenericHeader>() == HEADER_SIZE);
+ // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
+ assert!(
+ core::mem::offset_of!(GenericHeader, reserved_command)
+ == core::mem::offset_of!(GenericHeader, reserved_frame)
+ + core::mem::size_of::<[u8; 66]>()
+ );
+ // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
+ assert!(
+ core::mem::offset_of!(GenericHeader, reserved_command) +
core::mem::size_of::<[u8; 128]>()
+ == HEADER_SIZE
+ );
+};
unsafe impl Pod for GenericHeader {}
unsafe impl Zeroable for GenericHeader {}
@@ -146,6 +159,14 @@ unsafe impl Zeroable for GenericHeader {}
impl ConsensusHeader for GenericHeader {
const COMMAND: Command2 = Command2::Reserved;
+ fn operation(&self) -> Operation {
+ Operation::Default
+ }
+
+ fn command(&self) -> Command2 {
+ self.command
+ }
+
fn validate(&self) -> Result<(), ConsensusError> {
Ok(())
}
@@ -177,6 +198,20 @@ pub struct RequestHeader {
pub namespace: u64,
pub reserved: [u8; 64],
}
+const _: () = {
+ assert!(core::mem::size_of::<RequestHeader>() == HEADER_SIZE);
+ // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
+ assert!(
+ core::mem::offset_of!(RequestHeader, client)
+ == core::mem::offset_of!(RequestHeader, reserved_frame)
+ + core::mem::size_of::<[u8; 66]>()
+ );
+ // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
+ assert!(
+ core::mem::offset_of!(RequestHeader, reserved) +
core::mem::size_of::<[u8; 64]>()
+ == HEADER_SIZE
+ );
+};
impl Default for RequestHeader {
fn default() -> Self {
@@ -208,6 +243,10 @@ unsafe impl Zeroable for RequestHeader {}
impl ConsensusHeader for RequestHeader {
const COMMAND: Command2 = Command2::Request;
+ fn operation(&self) -> Operation {
+ self.operation
+ }
+
fn validate(&self) -> Result<(), ConsensusError> {
if self.command != Command2::Request {
return Err(ConsensusError::InvalidCommand {
@@ -217,6 +256,9 @@ impl ConsensusHeader for RequestHeader {
}
Ok(())
}
+ fn command(&self) -> Command2 {
+ self.command
+ }
fn size(&self) -> u32 {
self.size
@@ -249,6 +291,20 @@ pub struct PrepareHeader {
pub namespace: u64,
pub reserved: [u8; 32],
}
+const _: () = {
+ assert!(core::mem::size_of::<PrepareHeader>() == HEADER_SIZE);
+ // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
+ assert!(
+ core::mem::offset_of!(PrepareHeader, client)
+ == core::mem::offset_of!(PrepareHeader, reserved_frame)
+ + core::mem::size_of::<[u8; 66]>()
+ );
+ // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
+ assert!(
+ core::mem::offset_of!(PrepareHeader, reserved) +
core::mem::size_of::<[u8; 32]>()
+ == HEADER_SIZE
+ );
+};
unsafe impl Pod for PrepareHeader {}
unsafe impl Zeroable for PrepareHeader {}
@@ -256,6 +312,10 @@ unsafe impl Zeroable for PrepareHeader {}
impl ConsensusHeader for PrepareHeader {
const COMMAND: Command2 = Command2::Prepare;
+ fn operation(&self) -> Operation {
+ self.operation
+ }
+
fn validate(&self) -> Result<(), ConsensusError> {
if self.command != Command2::Prepare {
return Err(ConsensusError::InvalidCommand {
@@ -265,6 +325,9 @@ impl ConsensusHeader for PrepareHeader {
}
Ok(())
}
+ fn command(&self) -> Command2 {
+ self.command
+ }
fn size(&self) -> u32 {
self.size
@@ -323,6 +386,20 @@ pub struct PrepareOkHeader {
pub namespace: u64,
pub reserved: [u8; 48],
}
+const _: () = {
+ assert!(core::mem::size_of::<PrepareOkHeader>() == HEADER_SIZE);
+ // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
+ assert!(
+ core::mem::offset_of!(PrepareOkHeader, parent)
+ == core::mem::offset_of!(PrepareOkHeader, reserved_frame)
+ + core::mem::size_of::<[u8; 66]>()
+ );
+ // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
+ assert!(
+ core::mem::offset_of!(PrepareOkHeader, reserved) +
core::mem::size_of::<[u8; 48]>()
+ == HEADER_SIZE
+ );
+};
unsafe impl Pod for PrepareOkHeader {}
unsafe impl Zeroable for PrepareOkHeader {}
@@ -330,6 +407,13 @@ unsafe impl Zeroable for PrepareOkHeader {}
impl ConsensusHeader for PrepareOkHeader {
const COMMAND: Command2 = Command2::PrepareOk;
+ fn operation(&self) -> Operation {
+ self.operation
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+
fn validate(&self) -> Result<(), ConsensusError> {
if self.command != Command2::PrepareOk {
return Err(ConsensusError::InvalidCommand {
@@ -391,6 +475,20 @@ pub struct CommitHeader {
pub namespace: u64,
pub reserved: [u8; 80],
}
+const _: () = {
+ assert!(core::mem::size_of::<CommitHeader>() == HEADER_SIZE);
+ // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
+ assert!(
+ core::mem::offset_of!(CommitHeader, commit_checksum)
+ == core::mem::offset_of!(CommitHeader, reserved_frame)
+ + core::mem::size_of::<[u8; 66]>()
+ );
+ // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
+ assert!(
+ core::mem::offset_of!(CommitHeader, reserved) +
core::mem::size_of::<[u8; 80]>()
+ == HEADER_SIZE
+ );
+};
unsafe impl Pod for CommitHeader {}
unsafe impl Zeroable for CommitHeader {}
@@ -398,6 +496,13 @@ unsafe impl Zeroable for CommitHeader {}
impl ConsensusHeader for CommitHeader {
const COMMAND: Command2 = Command2::Commit;
+ fn operation(&self) -> Operation {
+ Operation::Default
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+
fn validate(&self) -> Result<(), ConsensusError> {
if self.command != Command2::Commit {
return Err(ConsensusError::CommitInvalidCommand2);
@@ -435,8 +540,22 @@ pub struct ReplyHeader {
pub operation: Operation,
pub operation_padding: [u8; 7],
pub namespace: u64,
- pub reserved: [u8; 41],
+ pub reserved: [u8; 48],
}
+const _: () = {
+ assert!(core::mem::size_of::<ReplyHeader>() == HEADER_SIZE);
+ // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
+ assert!(
+ core::mem::offset_of!(ReplyHeader, request_checksum)
+ == core::mem::offset_of!(ReplyHeader, reserved_frame)
+ + core::mem::size_of::<[u8; 66]>()
+ );
+ // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
+ assert!(
+ core::mem::offset_of!(ReplyHeader, reserved) +
core::mem::size_of::<[u8; 48]>()
+ == HEADER_SIZE
+ );
+};
unsafe impl Pod for ReplyHeader {}
unsafe impl Zeroable for ReplyHeader {}
@@ -444,6 +563,13 @@ unsafe impl Zeroable for ReplyHeader {}
impl ConsensusHeader for ReplyHeader {
const COMMAND: Command2 = Command2::Reply;
+ fn operation(&self) -> Operation {
+ self.operation
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+
fn validate(&self) -> Result<(), ConsensusError> {
if self.command != Command2::Reply {
return Err(ConsensusError::ReplyInvalidCommand2);
@@ -477,7 +603,7 @@ impl Default for ReplyHeader {
operation: Default::default(),
operation_padding: [0; 7],
namespace: 0,
- reserved: [0; 41],
+ reserved: [0; 48],
}
}
}
@@ -502,6 +628,20 @@ pub struct StartViewChangeHeader {
pub namespace: u64,
pub reserved: [u8; 120],
}
+const _: () = {
+ assert!(core::mem::size_of::<StartViewChangeHeader>() == HEADER_SIZE);
+ // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
+ assert!(
+ core::mem::offset_of!(StartViewChangeHeader, namespace)
+ == core::mem::offset_of!(StartViewChangeHeader, reserved_frame)
+ + core::mem::size_of::<[u8; 66]>()
+ );
+ // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
+ assert!(
+ core::mem::offset_of!(StartViewChangeHeader, reserved) +
core::mem::size_of::<[u8; 120]>()
+ == HEADER_SIZE
+ );
+};
unsafe impl Pod for StartViewChangeHeader {}
unsafe impl Zeroable for StartViewChangeHeader {}
@@ -509,6 +649,13 @@ unsafe impl Zeroable for StartViewChangeHeader {}
impl ConsensusHeader for StartViewChangeHeader {
const COMMAND: Command2 = Command2::StartViewChange;
+ fn operation(&self) -> Operation {
+ Operation::Default
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+
fn validate(&self) -> Result<(), ConsensusError> {
if self.command != Command2::StartViewChange {
return Err(ConsensusError::InvalidCommand {
@@ -558,6 +705,20 @@ pub struct DoViewChangeHeader {
pub log_view: u32,
pub reserved: [u8; 100],
}
+const _: () = {
+ assert!(core::mem::size_of::<DoViewChangeHeader>() == HEADER_SIZE);
+ // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
+ assert!(
+ core::mem::offset_of!(DoViewChangeHeader, op)
+ == core::mem::offset_of!(DoViewChangeHeader, reserved_frame)
+ + core::mem::size_of::<[u8; 66]>()
+ );
+ // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
+ assert!(
+ core::mem::offset_of!(DoViewChangeHeader, reserved) +
core::mem::size_of::<[u8; 100]>()
+ == HEADER_SIZE
+ );
+};
unsafe impl Pod for DoViewChangeHeader {}
unsafe impl Zeroable for DoViewChangeHeader {}
@@ -565,6 +726,13 @@ unsafe impl Zeroable for DoViewChangeHeader {}
impl ConsensusHeader for DoViewChangeHeader {
const COMMAND: Command2 = Command2::DoViewChange;
+ fn operation(&self) -> Operation {
+ Operation::Default
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+
fn validate(&self) -> Result<(), ConsensusError> {
if self.command != Command2::DoViewChange {
return Err(ConsensusError::InvalidCommand {
@@ -627,6 +795,20 @@ pub struct StartViewHeader {
pub namespace: u64,
pub reserved: [u8; 104],
}
+const _: () = {
+ assert!(core::mem::size_of::<StartViewHeader>() == HEADER_SIZE);
+ // Ensure no implicit padding is inserted between reserved_frame and the
body fields.
+ assert!(
+ core::mem::offset_of!(StartViewHeader, op)
+ == core::mem::offset_of!(StartViewHeader, reserved_frame)
+ + core::mem::size_of::<[u8; 66]>()
+ );
+ // Ensure no implicit tail padding is inserted after the explicit trailing
bytes.
+ assert!(
+ core::mem::offset_of!(StartViewHeader, reserved) +
core::mem::size_of::<[u8; 104]>()
+ == HEADER_SIZE
+ );
+};
unsafe impl Pod for StartViewHeader {}
unsafe impl Zeroable for StartViewHeader {}
@@ -634,6 +816,13 @@ unsafe impl Zeroable for StartViewHeader {}
impl ConsensusHeader for StartViewHeader {
const COMMAND: Command2 = Command2::StartView;
+ fn operation(&self) -> Operation {
+ Operation::Default
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+
fn validate(&self) -> Result<(), ConsensusError> {
if self.command != Command2::StartView {
return Err(ConsensusError::InvalidCommand {
diff --git a/core/common/src/types/consensus/message.rs
b/core/common/src/types/consensus/message.rs
index 5fe7e0843..7ef39cdca 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -22,6 +22,25 @@ use crate::{
use bytes::Bytes;
use std::marker::PhantomData;
+// TODO: Rename this to Message and ConsensusHeader to Header.
+pub trait ConsensusMessage<H>
+where
+ H: ConsensusHeader,
+{
+ // TODO: fn body(&self) -> Something;
+ fn header(&self) -> &H;
+}
+
+impl<H> ConsensusMessage<H> for Message<H>
+where
+ H: ConsensusHeader,
+{
+ fn header(&self) -> &H {
+ let header_bytes = &self.buffer[..size_of::<H>()];
+ bytemuck::from_bytes(header_bytes)
+ }
+}
+
#[derive(Debug, Clone)]
pub struct Message<H: ConsensusHeader> {
buffer: Bytes,
@@ -32,6 +51,13 @@ impl<H> Message<H>
where
H: ConsensusHeader,
{
+ #[inline]
+ #[allow(unused)]
+ pub fn header(&self) -> &H {
+ let header_bytes = &self.buffer[..size_of::<H>()];
+ bytemuck::from_bytes(header_bytes)
+ }
+
/// Create a new message from a buffer.
///
/// # Safety
@@ -115,17 +141,6 @@ where
}
}
- /// Get a reference to the header using zero-copy access.
- ///
- /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
- /// without any copying or allocation.
- #[inline]
- #[allow(unused)]
- pub fn header(&self) -> &H {
- let header_bytes = &self.buffer[..size_of::<H>()];
- bytemuck::from_bytes(header_bytes)
- }
-
/// Get a reference to the message body (everything after the header).
///
/// Returns an empty slice if there is no body.
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 0ee87eab6..b1f7460e2 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -16,6 +16,7 @@
// under the License.
use iggy_common::header::ConsensusHeader;
+use iggy_common::message::ConsensusMessage;
use message_bus::MessageBus;
pub trait Project<T, C: Consensus> {
@@ -48,11 +49,14 @@ pub trait Pipeline {
fn verify(&self);
}
-// TODO: Create type aliases for the Message types, both here and on the
`Plane` trait.
+pub type RequestMessage<C> = <C as Consensus>::Message<<C as
Consensus>::RequestHeader>;
+pub type ReplicateMessage<C> = <C as Consensus>::Message<<C as
Consensus>::ReplicateHeader>;
+pub type AckMessage<C> = <C as Consensus>::Message<<C as
Consensus>::AckHeader>;
+
pub trait Consensus: Sized {
type MessageBus: MessageBus;
#[rustfmt::skip] // Scuffed formatter.
- type Message<H> where H: ConsensusHeader;
+ type Message<H>: ConsensusMessage<H> where H: ConsensusHeader;
type RequestHeader: ConsensusHeader;
type ReplicateHeader: ConsensusHeader;
@@ -79,20 +83,30 @@ pub trait Plane<C>
where
C: Consensus,
{
- fn on_request(&self, message: C::Message<C::RequestHeader>) -> impl
Future<Output = ()>
+ fn on_request(&self, message: RequestMessage<C>) -> impl Future<Output =
()>
where
- C::Message<C::RequestHeader>:
- Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone;
+ RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> +
Clone;
- fn on_replicate(&self, message: C::Message<C::ReplicateHeader>) -> impl
Future<Output = ()>
+ fn on_replicate(&self, message: ReplicateMessage<C>) -> impl Future<Output
= ()>
where
- C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C,
Consensus = C> + Clone;
+ ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone;
+
+ fn on_ack(&self, message: AckMessage<C>) -> impl Future<Output = ()>;
+}
- fn on_ack(&self, message: C::Message<C::AckHeader>) -> impl Future<Output
= ()>;
+pub trait PlaneIdentity<C>
+where
+ C: Consensus,
+{
+ fn is_applicable<H>(&self, message: &C::Message<H>) -> bool
+ where
+ H: ConsensusHeader;
}
mod impls;
pub use impls::*;
+mod plane_mux;
+pub use plane_mux::*;
mod namespaced_pipeline;
pub use namespaced_pipeline::*;
mod plane_helpers;
diff --git a/core/consensus/src/plane_mux.rs b/core/consensus/src/plane_mux.rs
new file mode 100644
index 000000000..1f32cb810
--- /dev/null
+++ b/core/consensus/src/plane_mux.rs
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{
+ AckMessage, Consensus, Plane, PlaneIdentity, Project, ReplicateMessage,
RequestMessage,
+};
+use iggy_common::variadic;
+
+#[derive(Debug)]
+pub struct MuxPlane<T> {
+ inner: T,
+}
+
+impl<T> MuxPlane<T> {
+ pub fn new(inner: T) -> Self {
+ Self { inner }
+ }
+
+ pub fn inner(&self) -> &T {
+ &self.inner
+ }
+
+ pub fn inner_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+}
+
+impl<C, T> Plane<C> for MuxPlane<T>
+where
+ C: Consensus,
+ T: Plane<C>,
+{
+ async fn on_request(&self, message: RequestMessage<C>)
+ where
+ RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> +
Clone,
+ {
+ self.inner.on_request(message).await;
+ }
+
+ async fn on_replicate(&self, message: ReplicateMessage<C>)
+ where
+ ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone,
+ {
+ self.inner.on_replicate(message).await;
+ }
+
+ async fn on_ack(&self, message: AckMessage<C>) {
+ self.inner.on_ack(message).await;
+ }
+}
+
+impl<C> Plane<C> for ()
+where
+ C: Consensus,
+{
+ async fn on_request(&self, _message: RequestMessage<C>)
+ where
+ RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> +
Clone,
+ {
+ }
+
+ async fn on_replicate(&self, _message: ReplicateMessage<C>)
+ where
+ ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone,
+ {
+ }
+
+ async fn on_ack(&self, _message: AckMessage<C>) {}
+}
+
+impl<C, Head, Tail> Plane<C> for variadic!(Head, ...Tail)
+where
+ C: Consensus,
+ Head: Plane<C> + PlaneIdentity<C>,
+ Tail: Plane<C>,
+{
+ async fn on_request(&self, message: RequestMessage<C>)
+ where
+ RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> +
Clone,
+ {
+ if self.0.is_applicable(&message) {
+ self.0.on_request(message).await;
+ } else {
+ self.1.on_request(message).await;
+ }
+ }
+
+ async fn on_replicate(&self, message: ReplicateMessage<C>)
+ where
+ ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone,
+ {
+ if self.0.is_applicable(&message) {
+ self.0.on_replicate(message).await;
+ } else {
+ self.1.on_replicate(message).await;
+ }
+ }
+
+ async fn on_ack(&self, message: AckMessage<C>) {
+ if self.0.is_applicable(&message) {
+ self.0.on_ack(message).await;
+ } else {
+ self.1.on_ack(message).await;
+ }
+ }
+}
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index 35386b182..64f9fe959 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -17,13 +17,17 @@
use crate::stm::StateMachine;
use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot,
SnapshotError};
use consensus::{
- Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer,
VsrConsensus, ack_preflight,
- ack_quorum_reached, build_reply_message, drain_committable_prefix,
fence_old_prepare_by_commit,
- panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common,
replicate_preflight,
- replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
+ Consensus, Pipeline, PipelineEntry, Plane, PlaneIdentity, Project,
Sequencer, VsrConsensus,
+ ack_preflight, ack_quorum_reached, build_reply_message,
drain_committable_prefix,
+ fence_old_prepare_by_commit, panic_if_hash_chain_would_break_in_same_view,
+ pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
+ send_prepare_ok as send_prepare_ok_common,
};
use iggy_common::{
- header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader,
RequestHeader},
+ header::{
+ Command2, ConsensusHeader, GenericHeader, Operation, PrepareHeader,
PrepareOkHeader,
+ RequestHeader,
+ },
message::Message,
};
use journal::{Journal, JournalHandle};
@@ -244,6 +248,49 @@ where
}
}
+impl<B, P, J, S, M> PlaneIdentity<VsrConsensus<B, P>> for
IggyMetadata<VsrConsensus<B, P>, J, S, M>
+where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+ J: JournalHandle,
+ J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header =
PrepareHeader>,
+ M: StateMachine<Input = Message<PrepareHeader>>,
+{
+ fn is_applicable<H>(&self, message: &<VsrConsensus<B, P> as
Consensus>::Message<H>) -> bool
+ where
+ H: ConsensusHeader,
+ {
+ assert!(matches!(
+ message.header().command(),
+ Command2::Request | Command2::Prepare | Command2::PrepareOk
+ ));
+ let operation = message.header().operation();
+ // TODO: Use better selection, smth like greater or equal based on op
number.
+ matches!(
+ operation,
+ Operation::CreateStream
+ | Operation::UpdateStream
+ | Operation::DeleteStream
+ | Operation::PurgeStream
+ | Operation::CreateTopic
+ | Operation::UpdateTopic
+ | Operation::DeleteTopic
+ | Operation::PurgeTopic
+ | Operation::CreatePartitions
+ | Operation::DeletePartitions
+ | Operation::CreateConsumerGroup
+ | Operation::DeleteConsumerGroup
+ | Operation::CreateUser
+ | Operation::UpdateUser
+ | Operation::DeleteUser
+ | Operation::ChangePassword
+ | Operation::UpdatePermissions
+ | Operation::CreatePersonalAccessToken
+ | Operation::DeletePersonalAccessToken
+ )
+ }
+}
+
impl<B, P, J, S, M> IggyMetadata<VsrConsensus<B, P>, J, S, M>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 70830355c..648d8893e 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -17,6 +17,7 @@
use crate::stm::snapshot::{FillSnapshot, RestoreSnapshot, SnapshotError};
use iggy_common::Either;
+use iggy_common::variadic;
use iggy_common::{header::PrepareHeader, message::Message};
use crate::stm::{State, StateMachine};
@@ -52,18 +53,6 @@ where
}
}
-//TODO: Move to common
-#[macro_export]
-macro_rules! variadic {
- () => ( () );
- (...$a:ident $(,)? ) => ( $a );
- (...$a:expr $(,)? ) => ( $a );
- ($a:ident $(,)? ) => ( ($a, ()) );
- ($a:expr $(,)? ) => ( ($a, ()) );
- ($a:ident, $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) );
- ($a:expr, $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) );
-}
-
// TODO: Figure out how to get around the fact that we need to hardcode the
Input/Output type for base case.
// TODO: I think we could move the base case to the impl site of `State`, so
this way we know the `Input` and `Output` types.
// Base case of the recursive resolution.
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index 2c646072d..f05b294db 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -20,16 +20,20 @@
use crate::IggyPartition;
use crate::Partition;
use crate::types::PartitionsConfig;
+use consensus::PlaneIdentity;
use consensus::{
Consensus, NamespacedPipeline, Pipeline, PipelineEntry, Plane, Project,
Sequencer,
VsrConsensus, ack_preflight, build_reply_message,
fence_old_prepare_by_commit,
pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
send_prepare_ok as send_prepare_ok_common,
};
+use iggy_common::header::Command2;
use iggy_common::{
INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut,
PartitionStats, PooledBuffer,
Segment, SegmentStorage,
- header::{GenericHeader, Operation, PrepareHeader, PrepareOkHeader,
RequestHeader},
+ header::{
+ ConsensusHeader, GenericHeader, Operation, PrepareHeader,
PrepareOkHeader, RequestHeader,
+ },
message::Message,
sharding::{IggyNamespace, LocalIdx, ShardId},
};
@@ -508,6 +512,27 @@ where
}
}
+impl<B> PlaneIdentity<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B,
NamespacedPipeline>>
+where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+{
+ fn is_applicable<H>(&self, message: &<VsrConsensus<B> as
Consensus>::Message<H>) -> bool
+ where
+ H: ConsensusHeader,
+ {
+ assert!(matches!(
+ message.header().command(),
+ Command2::Request | Command2::Prepare | Command2::PrepareOk
+ ));
+ let operation = message.header().operation();
+ // TODO: Use better selection, smth like greater or equal based on op
number.
+ matches!(
+ operation,
+ Operation::DeleteSegments | Operation::SendMessages |
Operation::StoreConsumerOffset
+ )
+ }
+}
+
impl<B> IggyPartitions<VsrConsensus<B, NamespacedPipeline>>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index c4afc1bcf..9630c3aa3 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -17,14 +17,17 @@
use crate::bus::SharedMemBus;
use bytes::Bytes;
-use consensus::{NamespacedPipeline, VsrConsensus};
+use consensus::{
+ MuxPlane, {NamespacedPipeline, VsrConsensus},
+};
use iggy_common::header::PrepareHeader;
use iggy_common::message::Message;
+use iggy_common::variadic;
use journal::{Journal, JournalHandle, Storage};
use metadata::stm::consumer_group::ConsumerGroups;
use metadata::stm::stream::Streams;
use metadata::stm::user::Users;
-use metadata::{IggyMetadata, MuxStateMachine, variadic};
+use metadata::{IggyMetadata, MuxStateMachine};
use std::cell::{Cell, RefCell, UnsafeCell};
use std::collections::HashMap;
@@ -160,3 +163,4 @@ pub type SimMetadata = IggyMetadata<
/// Type alias for simulator partitions
pub type ReplicaPartitions =
partitions::IggyPartitions<VsrConsensus<SharedMemBus, NamespacedPipeline>>;
+pub type SimPlane = MuxPlane<variadic!(SimMetadata, ReplicaPartitions)>;
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 332b7fc31..13858b62c 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -21,8 +21,8 @@ pub mod deps;
pub mod replica;
use bus::MemBus;
-use consensus::Plane;
-use iggy_common::header::{GenericHeader, Operation, ReplyHeader};
+use consensus::{Plane, PlaneIdentity};
+use iggy_common::header::{GenericHeader, ReplyHeader};
use iggy_common::message::{Message, MessageBag};
use message_bus::MessageBus;
use replica::Replica;
@@ -115,48 +115,28 @@ impl Simulator {
}
async fn dispatch_to_replica(&self, replica: &Replica, message:
Message<GenericHeader>) {
- let message: MessageBag = message.into();
- let operation = match &message {
- MessageBag::Request(message) => message.header().operation,
- MessageBag::Prepare(message) => message.header().operation,
- MessageBag::PrepareOk(message) => message.header().operation,
- };
-
- match operation {
- Operation::SendMessages | Operation::StoreConsumerOffset => {
- self.dispatch_to_partition_on_replica(replica, message)
- .await;
- }
- _ => {
- self.dispatch_to_metadata_on_replica(replica, message).await;
- }
- }
- }
-
- async fn dispatch_to_metadata_on_replica(&self, replica: &Replica,
message: MessageBag) {
- match message {
- MessageBag::Request(request) => {
- replica.metadata.on_request(request).await;
- }
- MessageBag::Prepare(prepare) => {
- replica.metadata.on_replicate(prepare).await;
- }
- MessageBag::PrepareOk(prepare_ok) => {
- replica.metadata.on_ack(prepare_ok).await;
- }
- }
- }
-
- async fn dispatch_to_partition_on_replica(&self, replica: &Replica,
message: MessageBag) {
- match message {
+ let planes = replica.plane.inner();
+ match MessageBag::from(message) {
MessageBag::Request(request) => {
- replica.partitions.on_request(request).await;
+ if planes.0.is_applicable(&request) {
+ planes.0.on_request(request).await;
+ } else {
+ planes.1.0.on_request(request).await;
+ }
}
MessageBag::Prepare(prepare) => {
- replica.partitions.on_replicate(prepare).await;
+ if planes.0.is_applicable(&prepare) {
+ planes.0.on_replicate(prepare).await;
+ } else {
+ planes.1.0.on_replicate(prepare).await;
+ }
}
MessageBag::PrepareOk(prepare_ok) => {
- replica.partitions.on_ack(prepare_ok).await;
+ if planes.0.is_applicable(&prepare_ok) {
+ planes.0.on_ack(prepare_ok).await;
+ } else {
+ planes.1.0.on_ack(prepare_ok).await;
+ }
}
}
}
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index 9f8081656..b7728f622 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -17,15 +17,14 @@
use crate::bus::{MemBus, SharedMemBus};
use crate::deps::{
- MemStorage, ReplicaPartitions, SimJournal, SimMetadata,
SimMuxStateMachine, SimSnapshot,
+ ReplicaPartitions, SimJournal, SimMetadata, SimMuxStateMachine, SimPlane,
SimSnapshot,
};
use consensus::{LocalPipeline, NamespacedPipeline, VsrConsensus};
-use iggy_common::IggyByteSize;
use iggy_common::sharding::{IggyNamespace, ShardId};
+use iggy_common::{IggyByteSize, variadic};
use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
use metadata::stm::stream::{Streams, StreamsInner};
use metadata::stm::user::{Users, UsersInner};
-use metadata::{IggyMetadata, variadic};
use partitions::PartitionsConfig;
use std::sync::Arc;
@@ -36,8 +35,7 @@ pub struct Replica {
pub id: u8,
pub name: String,
pub replica_count: u8,
- pub metadata: SimMetadata,
- pub partitions: ReplicaPartitions,
+ pub plane: SimPlane,
pub bus: Arc<MemBus>,
}
@@ -58,6 +56,12 @@ impl Replica {
LocalPipeline::new(),
);
metadata_consensus.init();
+ let metadata = SimMetadata {
+ consensus: Some(metadata_consensus),
+ journal: Some(SimJournal::default()),
+ snapshot: Some(SimSnapshot::default()),
+ mux_stm: mux,
+ };
let partitions_config = PartitionsConfig {
messages_required_to_save: 1000,
@@ -80,25 +84,20 @@ impl Replica {
);
partition_consensus.init();
partitions.set_consensus(partition_consensus);
+ let plane = SimPlane::new(variadic!(metadata, partitions));
Self {
id,
name,
+ plane,
replica_count,
- metadata: IggyMetadata {
- consensus: Some(metadata_consensus),
- journal: Some(SimJournal::<MemStorage>::default()),
- snapshot: Some(SimSnapshot::default()),
- mux_stm: mux,
- },
- partitions,
bus,
}
}
pub fn init_partition(&mut self, namespace: IggyNamespace) {
- self.partitions.init_partition_in_memory(namespace);
- self.partitions
- .register_namespace_in_pipeline(namespace.inner());
+ let partitions = &mut self.plane.inner_mut().1.0;
+ partitions.init_partition_in_memory(namespace);
+ partitions.register_namespace_in_pipeline(namespace.inner());
}
}