This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch consensus_trait_refactor
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit d32c815529b5d3e47d6c4a4559e5ac5d5edaa707
Author: numinex <[email protected]>
AuthorDate: Tue Feb 17 22:08:05 2026 +0100

    refactor(consensus): refactor assoc types for messages
---
 core/consensus/src/impls.rs            | 27 +++++++++++++++++---------
 core/consensus/src/lib.rs              | 35 +++++++++++++++++++++++-----------
 core/consensus/src/plane_helpers.rs    |  6 +++---
 core/metadata/src/impls/metadata.rs    | 23 +++++++++-------------
 core/partitions/src/iggy_partitions.rs |  8 ++++----
 5 files changed, 58 insertions(+), 41 deletions(-)

diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 4b1e620c8..5a9abed7c 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -17,12 +17,13 @@
 
 use crate::vsr_timeout::{TimeoutKind, TimeoutManager};
 use crate::{
-    Consensus, DvcQuorumArray, Pipeline, Project, StoredDvc, dvc_count, 
dvc_max_commit,
+    Consensus, DvcQuorumArray, Pipeline, Project, StoredDvc, dvc_count,
+    dvc_max_commit,
     dvc_quorum_array_empty, dvc_record, dvc_reset, dvc_select_winner,
 };
 use bit_set::BitSet;
 use iggy_common::header::{
-    Command2, DoViewChangeHeader, PrepareHeader, PrepareOkHeader, 
RequestHeader,
+    Command2, ConsensusHeader, DoViewChangeHeader, PrepareHeader, 
PrepareOkHeader, RequestHeader,
     StartViewChangeHeader, StartViewHeader,
 };
 use iggy_common::message::Message;
@@ -1093,7 +1094,9 @@ where
     B: MessageBus,
     P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
 {
-    fn project(self, consensus: &VsrConsensus<B, P>) -> Message<PrepareHeader> 
{
+    type Consensus = VsrConsensus<B, P>;
+
+    fn project(self, consensus: &Self::Consensus) -> Message<PrepareHeader> {
         let op = consensus.sequencer.current_sequence() + 1;
 
         self.transmute_header(|old, new| {
@@ -1124,7 +1127,9 @@ where
     B: MessageBus,
     P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
 {
-    fn project(self, consensus: &VsrConsensus<B, P>) -> 
Message<PrepareOkHeader> {
+    type Consensus = VsrConsensus<B, P>;
+
+    fn project(self, consensus: &Self::Consensus) -> Message<PrepareOkHeader> {
         self.transmute_header(|old, new| {
             *new = PrepareOkHeader {
                 command: Command2::PrepareOk,
@@ -1154,9 +1159,13 @@ where
 {
     type MessageBus = B;
 
-    type RequestMessage = Message<RequestHeader>;
-    type ReplicateMessage = Message<PrepareHeader>;
-    type AckMessage = Message<PrepareOkHeader>;
+    type Message<H>
+        = iggy_common::message::Message<H>
+    where
+        H: ConsensusHeader;
+    type RequestHeader = RequestHeader;
+    type ReplicateHeader = PrepareHeader;
+    type AckHeader = PrepareOkHeader;
     type Sequencer = LocalSequencer;
     type Pipeline = P;
 
@@ -1166,7 +1175,7 @@ where
     // This avoids serialization/queuing overhead and would also allow
     // reordering to WAL-first (on_replicate before pipeline_message)
     // without risking lost self-acks from dispatch timing.
-    fn pipeline_message(&self, message: Self::ReplicateMessage) {
+    fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>) {
         assert!(self.is_primary(), "only primary can pipeline messages");
 
         let mut pipeline = self.pipeline.borrow_mut();
@@ -1178,7 +1187,7 @@ where
         pipeline.verify();
     }
 
-    fn post_replicate_verify(&self, message: &Self::ReplicateMessage) {
+    fn post_replicate_verify(&self, message: 
&Self::Message<Self::ReplicateHeader>) {
         let header = message.header();
 
         // verify the message belongs to our cluster
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 17f2fb9ea..f908df007 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -15,10 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use iggy_common::header::ConsensusHeader;
 use message_bus::MessageBus;
 
 pub trait Project<T, C: Consensus> {
-    fn project(self, consensus: &C) -> T;
+    type Consensus: Consensus;
+    fn project(self, consensus: &Self::Consensus) -> T;
 }
 
 pub trait Pipeline {
@@ -49,18 +51,21 @@ pub trait Pipeline {
 
 pub trait Consensus: Sized {
     type MessageBus: MessageBus;
-    // I am wondering, whether we should create a dedicated trait for cloning, 
so it's explicit that we do ref counting.
-    type RequestMessage: Project<Self::ReplicateMessage, Self> + Clone;
-    type ReplicateMessage: Project<Self::AckMessage, Self> + Clone;
-    type AckMessage;
+    #[rustfmt::skip] // Scuffed formatter.
+    type Message<H> where H: ConsensusHeader;
+
+    type RequestHeader: ConsensusHeader;
+    type ReplicateHeader: ConsensusHeader;
+    type AckHeader: ConsensusHeader;
+
     type Sequencer: Sequencer;
-    type Pipeline: Pipeline<Message = Self::ReplicateMessage>;
+    type Pipeline: Pipeline<Message = Self::Message<Self::ReplicateHeader>>;
 
-    fn pipeline_message(&self, message: Self::ReplicateMessage);
+    fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>);
     fn verify_pipeline(&self);
 
     // TODO: Figure out how we can achieve that without exposing such methods 
in the Consensus trait.
-    fn post_replicate_verify(&self, message: &Self::ReplicateMessage);
+    fn post_replicate_verify(&self, message: 
&Self::Message<Self::ReplicateHeader>);
 
     fn is_follower(&self) -> bool;
     fn is_normal(&self) -> bool;
@@ -77,9 +82,17 @@ pub trait Plane<C>
 where
     C: Consensus,
 {
-    fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = 
()>;
-    fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output 
= ()>;
-    fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
+    fn on_request(&self, message: C::Message<C::RequestHeader>) -> impl 
Future<Output = ()>
+    where
+        C::Message<C::RequestHeader>:
+            Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone;
+
+    fn on_replicate(&self, message: C::Message<C::ReplicateHeader>) -> impl 
Future<Output = ()>
+    where
+        C::Message<C::ReplicateHeader>:
+            Project<C::Message<C::AckHeader>, C, Consensus = C> + Clone;
+
+    fn on_ack(&self, message: C::Message<C::AckHeader>) -> impl Future<Output 
= ()>;
 }
 
 mod impls;
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index 60ea1dedf..4b3986067 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -26,12 +26,12 @@ use std::ops::AsyncFnOnce;
 /// Shared pipeline-first request flow used by metadata and partitions.
 pub async fn pipeline_prepare_common<C, F>(
     consensus: &C,
-    prepare: C::ReplicateMessage,
+    prepare: C::Message<C::ReplicateHeader>,
     on_replicate: F,
 ) where
     C: Consensus,
-    C::ReplicateMessage: Clone,
-    F: AsyncFnOnce(C::ReplicateMessage) -> (),
+    C::Message<C::ReplicateHeader>: Clone,
+    F: AsyncFnOnce(C::Message<C::ReplicateHeader>) -> (),
 {
     assert!(!consensus.is_follower(), "on_request: primary only");
     assert!(consensus.is_normal(), "on_request: status must be normal");
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 9a81461af..4ff74f4e2 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -23,7 +23,7 @@ use consensus::{
     replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
 };
 use iggy_common::{
-    header::{Command2, GenericHeader, PrepareHeader},
+    header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, 
RequestHeader},
     message::Message,
 };
 use journal::{Journal, JournalHandle};
@@ -101,14 +101,10 @@ where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
     J: JournalHandle,
-    J::Target: Journal<
-            J::Storage,
-            Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage,
-            Header = PrepareHeader,
-        >,
+    J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
     M: StateMachine<Input = Message<PrepareHeader>>,
 {
-    async fn on_request(&self, message: <VsrConsensus<B, P> as 
Consensus>::RequestMessage) {
+    async fn on_request(&self, message: <VsrConsensus<B, P> as 
Consensus>::Message<RequestHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
 
         // TODO: Bunch of asserts.
@@ -117,7 +113,10 @@ where
         pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
     }
 
-    async fn on_replicate(&self, message: <VsrConsensus<B, P> as 
Consensus>::ReplicateMessage) {
+    async fn on_replicate(
+        &self,
+        message: <VsrConsensus<B, P> as Consensus>::Message<PrepareHeader>,
+    ) {
         let consensus = self.consensus.as_ref().unwrap();
         let journal = self.journal.as_ref().unwrap();
 
@@ -170,7 +169,7 @@ where
         }
     }
 
-    async fn on_ack(&self, message: <VsrConsensus<B, P> as 
Consensus>::AckMessage) {
+    async fn on_ack(&self, message: <VsrConsensus<B, P> as 
Consensus>::Message<PrepareOkHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
 
@@ -245,11 +244,7 @@ where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
     J: JournalHandle,
-    J::Target: Journal<
-            J::Storage,
-            Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage,
-            Header = PrepareHeader,
-        >,
+    J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
     M: StateMachine<Input = Message<PrepareHeader>>,
 {
     /// Replicate a prepare message to the next replica in the chain.
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 54891dfdd..16ed124ab 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -28,7 +28,7 @@ use consensus::{
 use iggy_common::{
     INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, 
PartitionStats, PooledBuffer,
     Segment, SegmentStorage,
-    header::{GenericHeader, Operation, PrepareHeader},
+    header::{GenericHeader, Operation, PrepareHeader, PrepareOkHeader, 
RequestHeader},
     message::Message,
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
@@ -333,7 +333,7 @@ impl<B> Plane<VsrConsensus<B>> for 
IggyPartitions<VsrConsensus<B>>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
 {
-    async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::RequestMessage) {
+    async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::Message<RequestHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
 
         debug!("handling partition request");
@@ -341,7 +341,7 @@ where
         pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
     }
 
-    async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::ReplicateMessage) {
+    async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
 
         let header = message.header();
@@ -383,7 +383,7 @@ where
         }
     }
 
-    async fn on_ack(&self, message: <VsrConsensus<B> as 
Consensus>::AckMessage) {
+    async fn on_ack(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareOkHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
 

Reply via email to