This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch plane_demuxer in repository https://gitbox.apache.org/repos/asf/iggy.git
commit f3ffba934c6ae8e8b15261a9390a1e134435343d Author: numinex <[email protected]> AuthorDate: Thu Feb 19 11:26:48 2026 +0100 type aliases --- core/consensus/src/lib.rs | 21 ++++++++++--------- core/consensus/src/plane_mux.rs | 37 +++++++++++++++++----------------- core/metadata/src/impls/metadata.rs | 3 +-- core/partitions/src/iggy_partitions.rs | 1 - 4 files changed, 30 insertions(+), 32 deletions(-) diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index f021f3d11..d9fc41e6e 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -19,6 +19,7 @@ use iggy_common::header::ConsensusHeader; use iggy_common::message::ConsensusMessage; use message_bus::MessageBus; + pub trait Project<T, C: Consensus> { type Consensus: Consensus; fn project(self, consensus: &Self::Consensus) -> T; @@ -50,7 +51,10 @@ pub trait Pipeline { fn verify(&self); } -// TODO: Create type aliases for the Message types, both here and on the `Plane` trait. +pub type RequestMessage<C> = <C as Consensus>::Message<<C as Consensus>::RequestHeader>; +pub type ReplicateMessage<C> = <C as Consensus>::Message<<C as Consensus>::ReplicateHeader>; +pub type AckMessage<C> = <C as Consensus>::Message<<C as Consensus>::AckHeader>; + pub trait Consensus: Sized { type MessageBus: MessageBus; #[rustfmt::skip] // Scuffed formatter. @@ -84,16 +88,15 @@ pub trait Plane<C> where C: Consensus, { - fn on_request(&self, message: C::Message<C::RequestHeader>) -> impl Future<Output = ()> + fn on_request(&self, message: RequestMessage<C>) -> impl Future<Output = ()> where - C::Message<C::RequestHeader>: - Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone; + RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> + Clone; - fn on_replicate(&self, message: C::Message<C::ReplicateHeader>) -> impl Future<Output = ()> + fn on_replicate(&self, message: ReplicateMessage<C>) -> impl Future<Output = ()> where - C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, Consensus = C> + Clone; + ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone; - fn on_ack(&self, message: C::Message<C::AckHeader>) -> impl Future<Output = ()>; + fn on_ack(&self, message: AckMessage<C>) -> impl Future<Output = ()>; } pub trait PlaneIdentity<C> @@ -101,9 +104,7 @@ where C: Consensus, { fn is_applicable<H>(&self, message: &C::Message<H>) -> bool - where - H: ConsensusHeader, - C::Message<H>: ConsensusMessage<H>; + where H: ConsensusHeader; } mod impls; diff --git a/core/consensus/src/plane_mux.rs b/core/consensus/src/plane_mux.rs index 6e849be5b..1f32cb810 100644 --- a/core/consensus/src/plane_mux.rs +++ b/core/consensus/src/plane_mux.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::{Consensus, Plane, PlaneIdentity, Project}; +use crate::{ + AckMessage, Consensus, Plane, PlaneIdentity, Project, ReplicateMessage, RequestMessage, +}; use iggy_common::variadic; #[derive(Debug)] @@ -42,22 +44,21 @@ where C: Consensus, T: Plane<C>, { - async fn on_request(&self, message: C::Message<C::RequestHeader>) + async fn on_request(&self, message: RequestMessage<C>) where - C::Message<C::RequestHeader>: - Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone, + RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> + Clone, { self.inner.on_request(message).await; } - async fn on_replicate(&self, message: C::Message<C::ReplicateHeader>) + async fn on_replicate(&self, message: ReplicateMessage<C>) where - C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, Consensus = C> + Clone, + ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone, { self.inner.on_replicate(message).await; } - async fn on_ack(&self, message: C::Message<C::AckHeader>) { + async fn on_ack(&self, message: AckMessage<C>) { self.inner.on_ack(message).await; } } @@ -66,20 +67,19 @@ impl<C> Plane<C> for () where C: Consensus, { - async fn on_request(&self, _message: C::Message<C::RequestHeader>) + async fn on_request(&self, _message: RequestMessage<C>) where - C::Message<C::RequestHeader>: - Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone, + RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> + Clone, { } - async fn on_replicate(&self, _message: C::Message<C::ReplicateHeader>) + async fn on_replicate(&self, _message: ReplicateMessage<C>) where - C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, Consensus = C> + Clone, + ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone, { } - async fn on_ack(&self, _message: C::Message<C::AckHeader>) {} + async fn on_ack(&self, _message: AckMessage<C>) {} } impl<C, Head, Tail> Plane<C> for variadic!(Head, ...Tail) @@ -88,10 +88,9 @@ where Head: Plane<C> + PlaneIdentity<C>, Tail: Plane<C>, { - async fn on_request(&self, message: C::Message<C::RequestHeader>) + async fn on_request(&self, message: RequestMessage<C>) where - C::Message<C::RequestHeader>: - Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone, + RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> + Clone, { if self.0.is_applicable(&message) { self.0.on_request(message).await; @@ -100,9 +99,9 @@ where } } - async fn on_replicate(&self, message: C::Message<C::ReplicateHeader>) + async fn on_replicate(&self, message: ReplicateMessage<C>) where - C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, Consensus = C> + Clone, + ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone, { if self.0.is_applicable(&message) { self.0.on_replicate(message).await; @@ -111,7 +110,7 @@ where } } - async fn on_ack(&self, message: C::Message<C::AckHeader>) { + async fn on_ack(&self, message: AckMessage<C>) { if self.0.is_applicable(&message) { self.0.on_ack(message).await; } else { diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index ce4e324c6..f60745faa 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -253,14 +253,13 @@ where fn is_applicable<H>(&self, message: &<VsrConsensus<B, P> as Consensus>::Message<H>) -> bool where H: ConsensusHeader, - <VsrConsensus<B, P> as Consensus>::Message<H>: ConsensusMessage<H>, { assert!(matches!( message.header().command(), Command2::Request | Command2::Prepare | Command2::PrepareOk )); let operation = message.header().operation(); - // TODO: Use better heuristic, smth like greater or equal based on op number. + // TODO: Use better selection, smth like greater or equal based on op number. matches!( operation, Operation::CreateStream diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 255a31fff..e3adb6e97 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -475,7 +475,6 @@ where fn is_applicable<H>(&self, message: &<VsrConsensus<B> as Consensus>::Message<H>) -> bool where H: ConsensusHeader, - <VsrConsensus<B> as Consensus>::Message<H>: ConsensusMessage<H>, { assert!(matches!( message.header().command(),
