This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch plane_demuxer
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit f3ffba934c6ae8e8b15261a9390a1e134435343d
Author: numinex <[email protected]>
AuthorDate: Thu Feb 19 11:26:48 2026 +0100

    type aliases
---
 core/consensus/src/lib.rs              | 21 ++++++++++---------
 core/consensus/src/plane_mux.rs        | 37 +++++++++++++++++-----------------
 core/metadata/src/impls/metadata.rs    |  3 +--
 core/partitions/src/iggy_partitions.rs |  1 -
 4 files changed, 30 insertions(+), 32 deletions(-)

diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index f021f3d11..d9fc41e6e 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -19,6 +19,7 @@ use iggy_common::header::ConsensusHeader;
 use iggy_common::message::ConsensusMessage;
 use message_bus::MessageBus;
 
+
 pub trait Project<T, C: Consensus> {
     type Consensus: Consensus;
     fn project(self, consensus: &Self::Consensus) -> T;
@@ -50,7 +51,10 @@ pub trait Pipeline {
     fn verify(&self);
 }
 
-// TODO: Create type aliases for the Message types, both here and on the 
`Plane` trait.
+pub type RequestMessage<C> = <C as Consensus>::Message<<C as 
Consensus>::RequestHeader>;
+pub type ReplicateMessage<C> = <C as Consensus>::Message<<C as 
Consensus>::ReplicateHeader>;
+pub type AckMessage<C> = <C as Consensus>::Message<<C as 
Consensus>::AckHeader>;
+
 pub trait Consensus: Sized {
     type MessageBus: MessageBus;
     #[rustfmt::skip] // Scuffed formatter.
@@ -84,16 +88,15 @@ pub trait Plane<C>
 where
     C: Consensus,
 {
-    fn on_request(&self, message: C::Message<C::RequestHeader>) -> impl 
Future<Output = ()>
+    fn on_request(&self, message: RequestMessage<C>) -> impl Future<Output = 
()>
     where
-        C::Message<C::RequestHeader>:
-            Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone;
+        RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> + 
Clone;
 
-    fn on_replicate(&self, message: C::Message<C::ReplicateHeader>) -> impl 
Future<Output = ()>
+    fn on_replicate(&self, message: ReplicateMessage<C>) -> impl Future<Output 
= ()>
     where
-        C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, 
Consensus = C> + Clone;
+        ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone;
 
-    fn on_ack(&self, message: C::Message<C::AckHeader>) -> impl Future<Output 
= ()>;
+    fn on_ack(&self, message: AckMessage<C>) -> impl Future<Output = ()>;
 }
 
 pub trait PlaneIdentity<C>
@@ -101,9 +104,7 @@ where
     C: Consensus,
 {
     fn is_applicable<H>(&self, message: &C::Message<H>) -> bool
-    where
-        H: ConsensusHeader,
-        C::Message<H>: ConsensusMessage<H>;
+    where H: ConsensusHeader;
 }
 
 mod impls;
diff --git a/core/consensus/src/plane_mux.rs b/core/consensus/src/plane_mux.rs
index 6e849be5b..1f32cb810 100644
--- a/core/consensus/src/plane_mux.rs
+++ b/core/consensus/src/plane_mux.rs
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{Consensus, Plane, PlaneIdentity, Project};
+use crate::{
+    AckMessage, Consensus, Plane, PlaneIdentity, Project, ReplicateMessage, 
RequestMessage,
+};
 use iggy_common::variadic;
 
 #[derive(Debug)]
@@ -42,22 +44,21 @@ where
     C: Consensus,
     T: Plane<C>,
 {
-    async fn on_request(&self, message: C::Message<C::RequestHeader>)
+    async fn on_request(&self, message: RequestMessage<C>)
     where
-        C::Message<C::RequestHeader>:
-            Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone,
+        RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> + 
Clone,
     {
         self.inner.on_request(message).await;
     }
 
-    async fn on_replicate(&self, message: C::Message<C::ReplicateHeader>)
+    async fn on_replicate(&self, message: ReplicateMessage<C>)
     where
-        C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, 
Consensus = C> + Clone,
+        ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone,
     {
         self.inner.on_replicate(message).await;
     }
 
-    async fn on_ack(&self, message: C::Message<C::AckHeader>) {
+    async fn on_ack(&self, message: AckMessage<C>) {
         self.inner.on_ack(message).await;
     }
 }
@@ -66,20 +67,19 @@ impl<C> Plane<C> for ()
 where
     C: Consensus,
 {
-    async fn on_request(&self, _message: C::Message<C::RequestHeader>)
+    async fn on_request(&self, _message: RequestMessage<C>)
     where
-        C::Message<C::RequestHeader>:
-            Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone,
+        RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> + 
Clone,
     {
     }
 
-    async fn on_replicate(&self, _message: C::Message<C::ReplicateHeader>)
+    async fn on_replicate(&self, _message: ReplicateMessage<C>)
     where
-        C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, 
Consensus = C> + Clone,
+        ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone,
     {
     }
 
-    async fn on_ack(&self, _message: C::Message<C::AckHeader>) {}
+    async fn on_ack(&self, _message: AckMessage<C>) {}
 }
 
 impl<C, Head, Tail> Plane<C> for variadic!(Head, ...Tail)
@@ -88,10 +88,9 @@ where
     Head: Plane<C> + PlaneIdentity<C>,
     Tail: Plane<C>,
 {
-    async fn on_request(&self, message: C::Message<C::RequestHeader>)
+    async fn on_request(&self, message: RequestMessage<C>)
     where
-        C::Message<C::RequestHeader>:
-            Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone,
+        RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> + 
Clone,
     {
         if self.0.is_applicable(&message) {
             self.0.on_request(message).await;
@@ -100,9 +99,9 @@ where
         }
     }
 
-    async fn on_replicate(&self, message: C::Message<C::ReplicateHeader>)
+    async fn on_replicate(&self, message: ReplicateMessage<C>)
     where
-        C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, 
Consensus = C> + Clone,
+        ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone,
     {
         if self.0.is_applicable(&message) {
             self.0.on_replicate(message).await;
@@ -111,7 +110,7 @@ where
         }
     }
 
-    async fn on_ack(&self, message: C::Message<C::AckHeader>) {
+    async fn on_ack(&self, message: AckMessage<C>) {
         if self.0.is_applicable(&message) {
             self.0.on_ack(message).await;
         } else {
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index ce4e324c6..f60745faa 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -253,14 +253,13 @@ where
     fn is_applicable<H>(&self, message: &<VsrConsensus<B, P> as 
Consensus>::Message<H>) -> bool
     where
         H: ConsensusHeader,
-        <VsrConsensus<B, P> as Consensus>::Message<H>: ConsensusMessage<H>,
     {
         assert!(matches!(
             message.header().command(),
             Command2::Request | Command2::Prepare | Command2::PrepareOk
         ));
         let operation = message.header().operation();
-        // TODO: Use better heuristic, smth like greater or equal based on op 
number.
+        // TODO: Use better selection, smth like greater or equal based on op 
number.
         matches!(
             operation,
             Operation::CreateStream
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 255a31fff..e3adb6e97 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -475,7 +475,6 @@ where
     fn is_applicable<H>(&self, message: &<VsrConsensus<B> as 
Consensus>::Message<H>) -> bool
     where
         H: ConsensusHeader,
-        <VsrConsensus<B> as Consensus>::Message<H>: ConsensusMessage<H>,
     {
         assert!(matches!(
             message.header().command(),

Reply via email to