This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch consensus_trait_refactor in repository https://gitbox.apache.org/repos/asf/iggy.git
commit dfd5abffa43e162a1d9b418d4aee7f41fa4c3c77 Author: numinex <[email protected]> AuthorDate: Tue Feb 17 22:08:05 2026 +0100 refactor(consensus): refactor assoc types for messages --- core/consensus/src/impls.rs | 80 ++++++++++++++++++++++------------ core/consensus/src/lib.rs | 43 +++++++++++++----- core/consensus/src/plane_helpers.rs | 6 +-- core/metadata/src/impls/metadata.rs | 23 ++++------ core/partitions/src/iggy_partitions.rs | 8 ++-- 5 files changed, 100 insertions(+), 60 deletions(-) diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs index 4b1e620c8..7c480d99c 100644 --- a/core/consensus/src/impls.rs +++ b/core/consensus/src/impls.rs @@ -22,7 +22,7 @@ use crate::{ }; use bit_set::BitSet; use iggy_common::header::{ - Command2, DoViewChangeHeader, PrepareHeader, PrepareOkHeader, RequestHeader, + Command2, ConsensusHeader, DoViewChangeHeader, PrepareHeader, PrepareOkHeader, RequestHeader, StartViewChangeHeader, StartViewHeader, }; use iggy_common::message::Message; @@ -1088,27 +1088,26 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { } } -impl<B, P> Project<Message<PrepareHeader>, VsrConsensus<B, P>> for Message<RequestHeader> -where - B: MessageBus, - P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, -{ - fn project(self, consensus: &VsrConsensus<B, P>) -> Message<PrepareHeader> { - let op = consensus.sequencer.current_sequence() + 1; +impl Project<Message<PrepareHeader>> for Message<RequestHeader> { + fn project<C>(self, consensus: &C) -> Message<PrepareHeader> + where + C: Consensus, + { + let op = consensus.current_sequence() + 1; self.transmute_header(|old, new| { *new = PrepareHeader { - cluster: consensus.cluster, + cluster: consensus.cluster(), size: old.size, - view: consensus.view.get(), + view: consensus.view(), release: old.release, command: Command2::Prepare, - replica: consensus.replica, + replica: consensus.replica(), client: old.client, parent: 0, // TODO: Get parent checksum from the previous entry in the journal (figure out how to pass that ctx here) request_checksum: old.request_checksum, request: old.request, - commit: consensus.commit.get(), + commit: consensus.commit(), op, timestamp: 0, // 0 for now. Implement correct way to get timestamp later operation: old.operation, @@ -1119,24 +1118,23 @@ where } } -impl<B, P> Project<Message<PrepareOkHeader>, VsrConsensus<B, P>> for Message<PrepareHeader> -where - B: MessageBus, - P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, -{ - fn project(self, consensus: &VsrConsensus<B, P>) -> Message<PrepareOkHeader> { +impl Project<Message<PrepareOkHeader>> for Message<PrepareHeader> { + fn project<C>(self, consensus: &C) -> Message<PrepareOkHeader> + where + C: Consensus, + { self.transmute_header(|old, new| { *new = PrepareOkHeader { command: Command2::PrepareOk, parent: old.parent, prepare_checksum: old.checksum, request: old.request, - cluster: consensus.cluster, - replica: consensus.replica, + cluster: consensus.cluster(), + replica: consensus.replica(), // It's important to use the view of the replica, not the received prepare! - view: consensus.view.get(), + view: consensus.view(), op: old.op, - commit: consensus.commit.get(), + commit: consensus.commit(), timestamp: old.timestamp, operation: old.operation, namespace: old.namespace, @@ -1154,9 +1152,13 @@ where { type MessageBus = B; - type RequestMessage = Message<RequestHeader>; - type ReplicateMessage = Message<PrepareHeader>; - type AckMessage = Message<PrepareOkHeader>; + type Message<H> + = iggy_common::message::Message<H> + where + H: ConsensusHeader; + type RequestHeader = RequestHeader; + type ReplicateHeader = PrepareHeader; + type AckHeader = PrepareOkHeader; type Sequencer = LocalSequencer; type Pipeline = P; @@ -1166,7 +1168,7 @@ where // This avoids serialization/queuing overhead and would also allow // reordering to WAL-first (on_replicate before pipeline_message) // without risking lost self-acks from dispatch timing. - fn pipeline_message(&self, message: Self::ReplicateMessage) { + fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>) { assert!(self.is_primary(), "only primary can pipeline messages"); let mut pipeline = self.pipeline.borrow_mut(); @@ -1178,7 +1180,7 @@ where pipeline.verify(); } - fn post_replicate_verify(&self, message: &Self::ReplicateMessage) { + fn post_replicate_verify(&self, message: &Self::Message<Self::ReplicateHeader>) { let header = message.header(); // verify the message belongs to our cluster @@ -1220,4 +1222,28 @@ where fn is_syncing(&self) -> bool { self.is_syncing() } + + fn sequencer(&self) -> &Self::Sequencer { + &self.sequencer + } + + fn current_sequence(&self) -> u64 { + self.sequencer.current_sequence() + } + + fn cluster(&self) -> u128 { + self.cluster + } + + fn view(&self) -> u32 { + self.view.get() + } + + fn replica(&self) -> u8 { + self.replica + } + + fn commit(&self) -> u64 { + self.commit.get() + } } diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index 17f2fb9ea..3c80865f9 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -15,10 +15,13 @@ // specific language governing permissions and limitations // under the License. +use iggy_common::header::ConsensusHeader; use message_bus::MessageBus; -pub trait Project<T, C: Consensus> { - fn project(self, consensus: &C) -> T; +pub trait Project<T> { + fn project<C>(self, consensus: &C) -> T + where + C: Consensus; } pub trait Pipeline { @@ -49,18 +52,28 @@ pub trait Pipeline { pub trait Consensus: Sized { type MessageBus: MessageBus; - // I am wondering, whether we should create a dedicated trait for cloning, so it's explicit that we do ref counting. - type RequestMessage: Project<Self::ReplicateMessage, Self> + Clone; - type ReplicateMessage: Project<Self::AckMessage, Self> + Clone; - type AckMessage; + #[rustfmt::skip] // Scuffed formatter. + type Message<H> where H: ConsensusHeader; + + type RequestHeader: ConsensusHeader; + type ReplicateHeader: ConsensusHeader; + type AckHeader: ConsensusHeader; + type Sequencer: Sequencer; - type Pipeline: Pipeline<Message = Self::ReplicateMessage>; + type Pipeline: Pipeline<Message = Self::Message<Self::ReplicateHeader>>; - fn pipeline_message(&self, message: Self::ReplicateMessage); + fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>); fn verify_pipeline(&self); // TODO: Figure out how we can achieve that without exposing such methods in the Consensus trait. - fn post_replicate_verify(&self, message: &Self::ReplicateMessage); + fn post_replicate_verify(&self, message: &Self::Message<Self::ReplicateHeader>); + + fn sequencer(&self) -> &Self::Sequencer; + fn current_sequence(&self) -> u64; + fn cluster(&self) -> u128; + fn view(&self) -> u32; + fn replica(&self) -> u8; + fn commit(&self) -> u64; fn is_follower(&self) -> bool; fn is_normal(&self) -> bool; @@ -77,9 +90,15 @@ pub trait Plane<C> where C: Consensus, { - fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = ()>; - fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output = ()>; - fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>; + fn on_request(&self, message: C::Message<C::RequestHeader>) -> impl Future<Output = ()> + where + C::Message<C::RequestHeader>: Project<C::Message<C::ReplicateHeader>> + Clone; + + fn on_replicate(&self, message: C::Message<C::ReplicateHeader>) -> impl Future<Output = ()> + where + C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>> + Clone; + + fn on_ack(&self, message: C::Message<C::AckHeader>) -> impl Future<Output = ()>; } mod impls; diff --git a/core/consensus/src/plane_helpers.rs b/core/consensus/src/plane_helpers.rs index 60ea1dedf..4b3986067 100644 --- a/core/consensus/src/plane_helpers.rs +++ b/core/consensus/src/plane_helpers.rs @@ -26,12 +26,12 @@ use std::ops::AsyncFnOnce; /// Shared pipeline-first request flow used by metadata and partitions. pub async fn pipeline_prepare_common<C, F>( consensus: &C, - prepare: C::ReplicateMessage, + prepare: C::Message<C::ReplicateHeader>, on_replicate: F, ) where C: Consensus, - C::ReplicateMessage: Clone, - F: AsyncFnOnce(C::ReplicateMessage) -> (), + C::Message<C::ReplicateHeader>: Clone, + F: AsyncFnOnce(C::Message<C::ReplicateHeader>) -> (), { assert!(!consensus.is_follower(), "on_request: primary only"); assert!(consensus.is_normal(), "on_request: status must be normal"); diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 9a81461af..4ff74f4e2 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -23,7 +23,7 @@ use consensus::{ replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, }; use iggy_common::{ - header::{Command2, GenericHeader, PrepareHeader}, + header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, RequestHeader}, message::Message, }; use journal::{Journal, JournalHandle}; @@ -101,14 +101,10 @@ where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, J: JournalHandle, - J::Target: Journal< - J::Storage, - Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage, - Header = PrepareHeader, - >, + J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = PrepareHeader>, M: StateMachine<Input = Message<PrepareHeader>>, { - async fn on_request(&self, message: <VsrConsensus<B, P> as Consensus>::RequestMessage) { + async fn on_request(&self, message: <VsrConsensus<B, P> as Consensus>::Message<RequestHeader>) { let consensus = self.consensus.as_ref().unwrap(); // TODO: Bunch of asserts. @@ -117,7 +113,10 @@ where pipeline_prepare_common(consensus, prepare, |prepare| self.on_replicate(prepare)).await; } - async fn on_replicate(&self, message: <VsrConsensus<B, P> as Consensus>::ReplicateMessage) { + async fn on_replicate( + &self, + message: <VsrConsensus<B, P> as Consensus>::Message<PrepareHeader>, + ) { let consensus = self.consensus.as_ref().unwrap(); let journal = self.journal.as_ref().unwrap(); @@ -170,7 +169,7 @@ where } } - async fn on_ack(&self, message: <VsrConsensus<B, P> as Consensus>::AckMessage) { + async fn on_ack(&self, message: <VsrConsensus<B, P> as Consensus>::Message<PrepareOkHeader>) { let consensus = self.consensus.as_ref().unwrap(); let header = message.header(); @@ -245,11 +244,7 @@ where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, J: JournalHandle, - J::Target: Journal< - J::Storage, - Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage, - Header = PrepareHeader, - >, + J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = PrepareHeader>, M: StateMachine<Input = Message<PrepareHeader>>, { /// Replicate a prepare message to the next replica in the chain. diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 54891dfdd..16ed124ab 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -28,7 +28,7 @@ use consensus::{ use iggy_common::{ INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, PartitionStats, PooledBuffer, Segment, SegmentStorage, - header::{GenericHeader, Operation, PrepareHeader}, + header::{GenericHeader, Operation, PrepareHeader, PrepareOkHeader, RequestHeader}, message::Message, sharding::{IggyNamespace, LocalIdx, ShardId}, }; @@ -333,7 +333,7 @@ impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, { - async fn on_request(&self, message: <VsrConsensus<B> as Consensus>::RequestMessage) { + async fn on_request(&self, message: <VsrConsensus<B> as Consensus>::Message<RequestHeader>) { let consensus = self.consensus.as_ref().unwrap(); debug!("handling partition request"); @@ -341,7 +341,7 @@ where pipeline_prepare_common(consensus, prepare, |prepare| self.on_replicate(prepare)).await; } - async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::ReplicateMessage) { + async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareHeader>) { let consensus = self.consensus.as_ref().unwrap(); let header = message.header(); @@ -383,7 +383,7 @@ where } } - async fn on_ack(&self, message: <VsrConsensus<B> as Consensus>::AckMessage) { + async fn on_ack(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareOkHeader>) { let consensus = self.consensus.as_ref().unwrap(); let header = message.header();
