This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch consensus_trait_refactor in repository https://gitbox.apache.org/repos/asf/iggy.git
commit d32c815529b5d3e47d6c4a4559e5ac5d5edaa707 Author: numinex <[email protected]> AuthorDate: Tue Feb 17 22:08:05 2026 +0100 refactor(consensus): refactor assoc types for messages --- core/consensus/src/impls.rs | 27 +++++++++++++++++--------- core/consensus/src/lib.rs | 35 +++++++++++++++++++++++----------- core/consensus/src/plane_helpers.rs | 6 +++--- core/metadata/src/impls/metadata.rs | 23 +++++++++------------- core/partitions/src/iggy_partitions.rs | 8 ++++---- 5 files changed, 58 insertions(+), 41 deletions(-) diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs index 4b1e620c8..5a9abed7c 100644 --- a/core/consensus/src/impls.rs +++ b/core/consensus/src/impls.rs @@ -17,12 +17,13 @@ use crate::vsr_timeout::{TimeoutKind, TimeoutManager}; use crate::{ - Consensus, DvcQuorumArray, Pipeline, Project, StoredDvc, dvc_count, dvc_max_commit, + Consensus, DvcQuorumArray, Pipeline, Project, StoredDvc, dvc_count, + dvc_max_commit, dvc_quorum_array_empty, dvc_record, dvc_reset, dvc_select_winner, }; use bit_set::BitSet; use iggy_common::header::{ - Command2, DoViewChangeHeader, PrepareHeader, PrepareOkHeader, RequestHeader, + Command2, ConsensusHeader, DoViewChangeHeader, PrepareHeader, PrepareOkHeader, RequestHeader, StartViewChangeHeader, StartViewHeader, }; use iggy_common::message::Message; @@ -1093,7 +1094,9 @@ where B: MessageBus, P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, { - fn project(self, consensus: &VsrConsensus<B, P>) -> Message<PrepareHeader> { + type Consensus = VsrConsensus<B, P>; + + fn project(self, consensus: &Self::Consensus) -> Message<PrepareHeader> { let op = consensus.sequencer.current_sequence() + 1; self.transmute_header(|old, new| { @@ -1124,7 +1127,9 @@ where B: MessageBus, P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, { - fn project(self, consensus: &VsrConsensus<B, P>) -> Message<PrepareOkHeader> { + type Consensus = VsrConsensus<B, P>; + + fn project(self, consensus: &Self::Consensus) -> Message<PrepareOkHeader> { self.transmute_header(|old, new| { *new = PrepareOkHeader { command: Command2::PrepareOk, @@ -1154,9 +1159,13 @@ where { type MessageBus = B; - type RequestMessage = Message<RequestHeader>; - type ReplicateMessage = Message<PrepareHeader>; - type AckMessage = Message<PrepareOkHeader>; + type Message<H> + = iggy_common::message::Message<H> + where + H: ConsensusHeader; + type RequestHeader = RequestHeader; + type ReplicateHeader = PrepareHeader; + type AckHeader = PrepareOkHeader; type Sequencer = LocalSequencer; type Pipeline = P; @@ -1166,7 +1175,7 @@ where // This avoids serialization/queuing overhead and would also allow // reordering to WAL-first (on_replicate before pipeline_message) // without risking lost self-acks from dispatch timing. - fn pipeline_message(&self, message: Self::ReplicateMessage) { + fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>) { assert!(self.is_primary(), "only primary can pipeline messages"); let mut pipeline = self.pipeline.borrow_mut(); @@ -1178,7 +1187,7 @@ where pipeline.verify(); } - fn post_replicate_verify(&self, message: &Self::ReplicateMessage) { + fn post_replicate_verify(&self, message: &Self::Message<Self::ReplicateHeader>) { let header = message.header(); // verify the message belongs to our cluster diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index 17f2fb9ea..f908df007 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. +use iggy_common::header::ConsensusHeader; use message_bus::MessageBus; pub trait Project<T, C: Consensus> { - fn project(self, consensus: &C) -> T; + type Consensus: Consensus; + fn project(self, consensus: &Self::Consensus) -> T; } pub trait Pipeline { @@ -49,18 +51,21 @@ pub trait Pipeline { pub trait Consensus: Sized { type MessageBus: MessageBus; - // I am wondering, whether we should create a dedicated trait for cloning, so it's explicit that we do ref counting. - type RequestMessage: Project<Self::ReplicateMessage, Self> + Clone; - type ReplicateMessage: Project<Self::AckMessage, Self> + Clone; - type AckMessage; + #[rustfmt::skip] // Scuffed formatter. + type Message<H> where H: ConsensusHeader; + + type RequestHeader: ConsensusHeader; + type ReplicateHeader: ConsensusHeader; + type AckHeader: ConsensusHeader; + type Sequencer: Sequencer; - type Pipeline: Pipeline<Message = Self::ReplicateMessage>; + type Pipeline: Pipeline<Message = Self::Message<Self::ReplicateHeader>>; - fn pipeline_message(&self, message: Self::ReplicateMessage); + fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>); fn verify_pipeline(&self); // TODO: Figure out how we can achieve that without exposing such methods in the Consensus trait. - fn post_replicate_verify(&self, message: &Self::ReplicateMessage); + fn post_replicate_verify(&self, message: &Self::Message<Self::ReplicateHeader>); fn is_follower(&self) -> bool; fn is_normal(&self) -> bool; @@ -77,9 +82,17 @@ pub trait Plane<C> where C: Consensus, { - fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = ()>; - fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output = ()>; - fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>; + fn on_request(&self, message: C::Message<C::RequestHeader>) -> impl Future<Output = ()> + where + C::Message<C::RequestHeader>: + Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone; + + fn on_replicate(&self, message: C::Message<C::ReplicateHeader>) -> impl Future<Output = ()> + where + C::Message<C::ReplicateHeader>: + Project<C::Message<C::AckHeader>, C, Consensus = C> + Clone; + + fn on_ack(&self, message: C::Message<C::AckHeader>) -> impl Future<Output = ()>; } mod impls; diff --git a/core/consensus/src/plane_helpers.rs b/core/consensus/src/plane_helpers.rs index 60ea1dedf..4b3986067 100644 --- a/core/consensus/src/plane_helpers.rs +++ b/core/consensus/src/plane_helpers.rs @@ -26,12 +26,12 @@ use std::ops::AsyncFnOnce; /// Shared pipeline-first request flow used by metadata and partitions. pub async fn pipeline_prepare_common<C, F>( consensus: &C, - prepare: C::ReplicateMessage, + prepare: C::Message<C::ReplicateHeader>, on_replicate: F, ) where C: Consensus, - C::ReplicateMessage: Clone, - F: AsyncFnOnce(C::ReplicateMessage) -> (), + C::Message<C::ReplicateHeader>: Clone, + F: AsyncFnOnce(C::Message<C::ReplicateHeader>) -> (), { assert!(!consensus.is_follower(), "on_request: primary only"); assert!(consensus.is_normal(), "on_request: status must be normal"); diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 9a81461af..4ff74f4e2 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -23,7 +23,7 @@ use consensus::{ replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, }; use iggy_common::{ - header::{Command2, GenericHeader, PrepareHeader}, + header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, RequestHeader}, message::Message, }; use journal::{Journal, JournalHandle}; @@ -101,14 +101,10 @@ where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, J: JournalHandle, - J::Target: Journal< - J::Storage, - Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage, - Header = PrepareHeader, - >, + J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = PrepareHeader>, M: StateMachine<Input = Message<PrepareHeader>>, { - async fn on_request(&self, message: <VsrConsensus<B, P> as Consensus>::RequestMessage) { + async fn on_request(&self, message: <VsrConsensus<B, P> as Consensus>::Message<RequestHeader>) { let consensus = self.consensus.as_ref().unwrap(); // TODO: Bunch of asserts. @@ -117,7 +113,10 @@ where pipeline_prepare_common(consensus, prepare, |prepare| self.on_replicate(prepare)).await; } - async fn on_replicate(&self, message: <VsrConsensus<B, P> as Consensus>::ReplicateMessage) { + async fn on_replicate( + &self, + message: <VsrConsensus<B, P> as Consensus>::Message<PrepareHeader>, + ) { let consensus = self.consensus.as_ref().unwrap(); let journal = self.journal.as_ref().unwrap(); @@ -170,7 +169,7 @@ where } } - async fn on_ack(&self, message: <VsrConsensus<B, P> as Consensus>::AckMessage) { + async fn on_ack(&self, message: <VsrConsensus<B, P> as Consensus>::Message<PrepareOkHeader>) { let consensus = self.consensus.as_ref().unwrap(); let header = message.header(); @@ -245,11 +244,7 @@ where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, J: JournalHandle, - J::Target: Journal< - J::Storage, - Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage, - Header = PrepareHeader, - >, + J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = PrepareHeader>, M: StateMachine<Input = Message<PrepareHeader>>, { /// Replicate a prepare message to the next replica in the chain. diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 54891dfdd..16ed124ab 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -28,7 +28,7 @@ use consensus::{ use iggy_common::{ INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, PartitionStats, PooledBuffer, Segment, SegmentStorage, - header::{GenericHeader, Operation, PrepareHeader}, + header::{GenericHeader, Operation, PrepareHeader, PrepareOkHeader, RequestHeader}, message::Message, sharding::{IggyNamespace, LocalIdx, ShardId}, }; @@ -333,7 +333,7 @@ impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, { - async fn on_request(&self, message: <VsrConsensus<B> as Consensus>::RequestMessage) { + async fn on_request(&self, message: <VsrConsensus<B> as Consensus>::Message<RequestHeader>) { let consensus = self.consensus.as_ref().unwrap(); debug!("handling partition request"); @@ -341,7 +341,7 @@ where pipeline_prepare_common(consensus, prepare, |prepare| self.on_replicate(prepare)).await; } - async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::ReplicateMessage) { + async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareHeader>) { let consensus = self.consensus.as_ref().unwrap(); let header = message.header(); @@ -383,7 +383,7 @@ where } } - async fn on_ack(&self, message: <VsrConsensus<B> as Consensus>::AckMessage) { + async fn on_ack(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareOkHeader>) { let consensus = self.consensus.as_ref().unwrap(); let header = message.header();
