This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 51cb96951 refactor(partitions): add plane trait and refactor 
partitions module (#2744)
51cb96951 is described below

commit 51cb96951f87991c1a652ffdaddfb41bc2f39301
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Mon Feb 16 19:31:38 2026 +0100

    refactor(partitions): add plane trait and refactor partitions module (#2744)
    
    Adds `Plane` trait as replacement for `Metadata` and `Partitions`
    traits, refactors the logic by moving shared part into `plane_helpers`
    module. Refactors the `partitions` module by creating `Partition` trait
    and moving the `append_batch` logic to `append_messages` method from
    `Partition` trait.
---
 core/consensus/src/impls.rs            |   4 +
 core/consensus/src/lib.rs              |  18 ++
 core/consensus/src/plane_helpers.rs    | 271 +++++++++++++++++++++
 core/metadata/src/impls/metadata.rs    | 296 +++--------------------
 core/metadata/src/lib.rs               |   4 +-
 core/partitions/src/iggy_partition.rs  |  76 +++++-
 core/partitions/src/iggy_partitions.rs | 425 +++++++--------------------------
 core/partitions/src/lib.rs             |  52 ++--
 core/simulator/src/lib.rs              |   3 +-
 9 files changed, 522 insertions(+), 627 deletions(-)

diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 3fd146cd4..4b1e620c8 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -1213,6 +1213,10 @@ where
         !self.is_primary()
     }
 
+    fn is_normal(&self) -> bool {
+        self.status() == Status::Normal
+    }
+
     fn is_syncing(&self) -> bool {
         self.is_syncing()
     }
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 5c3e152e6..17f2fb9ea 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -63,11 +63,29 @@ pub trait Consensus: Sized {
     fn post_replicate_verify(&self, message: &Self::ReplicateMessage);
 
     fn is_follower(&self) -> bool;
+    fn is_normal(&self) -> bool;
     fn is_syncing(&self) -> bool;
 }
 
+/// Shared consensus lifecycle interface for control/data planes.
+///
+/// This abstracts the VSR message flow:
+/// - request -> prepare
+/// - replicate (prepare)
+/// - ack (prepare_ok)
+pub trait Plane<C>
+where
+    C: Consensus,
+{
+    fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = 
()>;
+    fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output 
= ()>;
+    fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
+}
+
 mod impls;
 pub use impls::*;
+mod plane_helpers;
+pub use plane_helpers::*;
 
 mod view_change_quorum;
 pub use view_change_quorum::*;
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
new file mode 100644
index 000000000..60ea1dedf
--- /dev/null
+++ b/core/consensus/src/plane_helpers.rs
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{Consensus, Pipeline, PipelineEntry, Sequencer, Status, 
VsrConsensus};
+use iggy_common::header::{Command2, GenericHeader, PrepareHeader, 
PrepareOkHeader, ReplyHeader};
+use iggy_common::message::Message;
+use message_bus::MessageBus;
+use std::ops::AsyncFnOnce;
+
+// TODO: Rework all of those helpers, once the boundries are more clear and we 
have a better picture of the commonalities between all of the planes.
+
+/// Shared pipeline-first request flow used by metadata and partitions.
+pub async fn pipeline_prepare_common<C, F>(
+    consensus: &C,
+    prepare: C::ReplicateMessage,
+    on_replicate: F,
+) where
+    C: Consensus,
+    C::ReplicateMessage: Clone,
+    F: AsyncFnOnce(C::ReplicateMessage) -> (),
+{
+    assert!(!consensus.is_follower(), "on_request: primary only");
+    assert!(consensus.is_normal(), "on_request: status must be normal");
+    assert!(!consensus.is_syncing(), "on_request: must not be syncing");
+
+    consensus.verify_pipeline();
+    consensus.pipeline_message(prepare.clone());
+    on_replicate(prepare.clone()).await;
+    consensus.post_replicate_verify(&prepare);
+}
+
+/// Shared commit-based old-prepare fence.
+pub fn fence_old_prepare_by_commit<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    header: &PrepareHeader,
+) -> bool
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    header.op <= consensus.commit()
+}
+
+/// Shared chain-replication forwarding to the next replica.
+pub async fn replicate_to_next_in_chain<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    message: Message<PrepareHeader>,
+) where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    let header = message.header();
+
+    assert_eq!(header.command, Command2::Prepare);
+    assert!(header.op > consensus.commit());
+
+    let next = (consensus.replica() + 1) % consensus.replica_count();
+    let primary = consensus.primary_index(header.view);
+
+    if next == primary {
+        return;
+    }
+
+    assert_ne!(next, consensus.replica());
+
+    consensus
+        .message_bus()
+        .send_to_replica(next, message.into_generic())
+        .await
+        .unwrap();
+}
+
+/// Shared preflight checks for `on_replicate`.
+///
+/// Returns current op on success.
+pub fn replicate_preflight<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    header: &PrepareHeader,
+) -> Result<u64, &'static str>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    assert_eq!(header.command, Command2::Prepare);
+
+    if consensus.is_syncing() {
+        return Err("sync");
+    }
+
+    let current_op = consensus.sequencer().current_sequence();
+
+    if consensus.status() != Status::Normal {
+        return Err("not normal state");
+    }
+
+    if header.view > consensus.view() {
+        return Err("newer view");
+    }
+
+    if consensus.is_follower() {
+        consensus.advance_commit_number(header.commit);
+    }
+
+    Ok(current_op)
+}
+
+/// Shared preflight checks for `on_ack`.
+pub fn ack_preflight<B, P>(consensus: &VsrConsensus<B, P>) -> Result<(), 
&'static str>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    if !consensus.is_primary() {
+        return Err("not primary");
+    }
+
+    if consensus.status() != Status::Normal {
+        return Err("not normal");
+    }
+
+    Ok(())
+}
+
+/// Shared quorum + extraction flow for ack handling.
+pub fn ack_quorum_reached<B, P>(consensus: &VsrConsensus<B, P>, ack: 
&PrepareOkHeader) -> bool
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    if !consensus.handle_prepare_ok(ack) {
+        return false;
+    }
+
+    consensus.advance_commit_number(ack.op);
+    true
+}
+
+/// Shared reply-message construction for committed prepare.
+pub fn build_reply_message<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    prepare_header: &PrepareHeader,
+) -> Message<ReplyHeader>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>()).transmute_header(|_,
 new| {
+        *new = ReplyHeader {
+            checksum: 0,
+            checksum_body: 0,
+            cluster: consensus.cluster(),
+            size: std::mem::size_of::<ReplyHeader>() as u32,
+            view: consensus.view(),
+            release: 0,
+            command: Command2::Reply,
+            replica: consensus.replica(),
+            reserved_frame: [0; 66],
+            request_checksum: prepare_header.request_checksum,
+            context: 0,
+            op: prepare_header.op,
+            commit: consensus.commit(),
+            timestamp: prepare_header.timestamp,
+            request: prepare_header.request,
+            operation: prepare_header.operation,
+            ..Default::default()
+        };
+    })
+}
+
+/// Verify hash chain would not break if we add this header.
+pub fn panic_if_hash_chain_would_break_in_same_view(
+    previous: &PrepareHeader,
+    current: &PrepareHeader,
+) {
+    // If both headers are in the same view, parent must chain correctly.
+    if previous.view == current.view {
+        assert_eq!(
+            current.parent, previous.checksum,
+            "hash chain broken in same view: op={} parent={} expected={}",
+            current.op, current.parent, previous.checksum
+        );
+    }
+}
+
+// TODO: Figure out how to make this check the journal if it contains the 
prepare.
+pub async fn send_prepare_ok<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    header: &PrepareHeader,
+    is_persisted: Option<bool>,
+) where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    assert_eq!(header.command, Command2::Prepare);
+
+    if consensus.status() != Status::Normal {
+        return;
+    }
+
+    if consensus.is_syncing() {
+        return;
+    }
+
+    if let Some(false) = is_persisted {
+        return;
+    }
+
+    assert!(
+        header.view <= consensus.view(),
+        "send_prepare_ok: prepare view {} > our view {}",
+        header.view,
+        consensus.view()
+    );
+
+    if header.op > consensus.sequencer().current_sequence() {
+        return;
+    }
+
+    let prepare_ok_header = PrepareOkHeader {
+        command: Command2::PrepareOk,
+        cluster: consensus.cluster(),
+        replica: consensus.replica(),
+        view: consensus.view(),
+        op: header.op,
+        commit: consensus.commit(),
+        timestamp: header.timestamp,
+        parent: header.parent,
+        prepare_checksum: header.checksum,
+        request: header.request,
+        operation: header.operation,
+        namespace: header.namespace,
+        size: std::mem::size_of::<PrepareOkHeader>() as u32,
+        ..Default::default()
+    };
+
+    let message: Message<PrepareOkHeader> =
+        Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
+            .transmute_header(|_, new| *new = prepare_ok_header);
+    let generic_message = message.into_generic();
+    let primary = consensus.primary_index(consensus.view());
+
+    if primary == consensus.replica() {
+        // TODO: Queue for self-processing or call handle_prepare_ok directly.
+        // TODO: This is temporal, to test simulator, but we should send 
message to ourselves properly.
+        consensus
+            .message_bus()
+            .send_to_replica(primary, generic_message)
+            .await
+            .unwrap();
+    } else {
+        consensus
+            .message_bus()
+            .send_to_replica(primary, generic_message)
+            .await
+            .unwrap();
+    }
+}
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 77a174e39..9a81461af 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -16,9 +16,14 @@
 // under the License.
 use crate::stm::StateMachine;
 use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, 
SnapshotError};
-use consensus::{Consensus, Pipeline, PipelineEntry, Project, Sequencer, 
Status, VsrConsensus};
+use consensus::{
+    Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, 
VsrConsensus, ack_preflight,
+    ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit,
+    panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common, 
replicate_preflight,
+    replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
+};
 use iggy_common::{
-    header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, 
ReplyHeader},
+    header::{Command2, GenericHeader, PrepareHeader},
     message::Message,
 };
 use journal::{Journal, JournalHandle};
@@ -79,20 +84,6 @@ impl Snapshot for IggySnapshot {
     }
 }
 
-pub trait Metadata<C>
-where
-    C: Consensus,
-{
-    /// Handle a request message.
-    fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = 
()>;
-
-    /// Handle a replicate message (Prepare in VSR).
-    fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output 
= ()>;
-
-    /// Handle an ack message (PrepareOk in VSR).
-    fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
-}
-
 #[derive(Debug)]
 pub struct IggyMetadata<C, J, S, M> {
     /// Some on shard0, None on other shards
@@ -105,7 +96,7 @@ pub struct IggyMetadata<C, J, S, M> {
     pub mux_stm: M,
 }
 
-impl<B, P, J, S, M> Metadata<VsrConsensus<B, P>> for 
IggyMetadata<VsrConsensus<B, P>, J, S, M>
+impl<B, P, J, S, M> Plane<VsrConsensus<B, P>> for IggyMetadata<VsrConsensus<B, 
P>, J, S, M>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
@@ -123,7 +114,7 @@ where
         // TODO: Bunch of asserts.
         debug!("handling metadata request");
         let prepare = message.project(consensus);
-        self.pipeline_prepare(prepare).await;
+        pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
     }
 
     async fn on_replicate(&self, message: <VsrConsensus<B, P> as 
Consensus>::ReplicateMessage) {
@@ -132,57 +123,35 @@ where
 
         let header = message.header();
 
-        assert_eq!(header.command, Command2::Prepare);
+        let current_op = match replicate_preflight(consensus, header) {
+            Ok(current_op) => current_op,
+            Err(reason) => {
+                warn!(
+                    replica = consensus.replica(),
+                    "on_replicate: ignoring ({reason})"
+                );
+                return;
+            }
+        };
 
-        if !self.fence_old_prepare(&message) {
+        // TODO: Handle idx calculation, for now using header.op, but since 
the journal may get compacted, this may not be correct.
+        let is_old_prepare = fence_old_prepare_by_commit(consensus, header)
+            || journal.handle().header(header.op as usize).is_some();
+        if !is_old_prepare {
             self.replicate(message.clone()).await;
         } else {
             warn!("received old prepare, not replicating");
         }
 
-        // If syncing, ignore the replicate message.
-        if consensus.is_syncing() {
-            warn!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (sync)"
-            );
-            return;
-        }
-
-        let current_op = consensus.sequencer().current_sequence();
-
-        // If status is not normal, ignore the replicate.
-        if consensus.status() != Status::Normal {
-            warn!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (not normal state)"
-            );
-            return;
-        }
-
-        //if message from future view, we ignore the replicate.
-        if header.view > consensus.view() {
-            warn!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (newer view)"
-            );
-            return;
-        }
-
         // TODO add assertions for valid state here.
 
-        // If we are a follower, we advance the commit number.
-        if consensus.is_follower() {
-            consensus.advance_commit_number(message.header().commit);
-        }
-
         // TODO verify that the current prepare fits in the WAL.
 
         // TODO handle gap in ops.
 
         // Verify hash chain integrity.
         if let Some(previous) = journal.handle().previous_header(header) {
-            self.panic_if_hash_chain_would_break_in_same_view(&previous, 
header);
+            panic_if_hash_chain_would_break_in_same_view(&previous, header);
         }
 
         assert_eq!(header.op, current_op + 1);
@@ -205,13 +174,8 @@ where
         let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
 
-        if !consensus.is_primary() {
-            warn!("on_ack: ignoring (not primary)");
-            return;
-        }
-
-        if consensus.status() != Status::Normal {
-            warn!("on_ack: ignoring (not normal)");
+        if let Err(reason) = ack_preflight(consensus) {
+            warn!("on_ack: ignoring ({reason})");
             return;
         }
 
@@ -226,12 +190,10 @@ where
             }
         }
 
-        // Let consensus handle the ack increment and quorum check
-        if consensus.handle_prepare_ok(header) {
+        if ack_quorum_reached(consensus, header) {
             let journal = self.journal.as_ref().unwrap();
 
             debug!("on_ack: quorum received for op={}", header.op);
-            consensus.advance_commit_number(header.op);
 
             // Extract the header from the pipeline, fetch the full message 
from journal
             // TODO: Commit from the head. ALWAYS
@@ -261,32 +223,8 @@ where
             let _result = self.mux_stm.update(prepare);
             debug!("on_ack: state applied for op={}", prepare_header.op);
 
-            // TODO: Figure out better infra for this, its messy.
-            let reply = 
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>())
-                .transmute_header(|_, new| {
-                    *new = ReplyHeader {
-                        checksum: 0,
-                        checksum_body: 0,
-                        cluster: consensus.cluster(),
-                        size: std::mem::size_of::<ReplyHeader>() as u32,
-                        view: consensus.view(),
-                        release: 0,
-                        command: Command2::Reply,
-                        replica: consensus.replica(),
-                        reserved_frame: [0; 66],
-                        request_checksum: prepare_header.request_checksum,
-                        context: 0,
-                        op: prepare_header.op,
-                        commit: consensus.commit(),
-                        timestamp: prepare_header.timestamp,
-                        request: prepare_header.request,
-                        operation: prepare_header.operation,
-                        ..Default::default()
-                    };
-                });
-
             // Send reply to client
-            let generic_reply = reply.into_generic();
+            let generic_reply = build_reply_message(consensus, 
&prepare_header).into_generic();
             debug!(
                 "on_ack: sending reply to client={} for op={}",
                 prepare_header.client, prepare_header.op
@@ -314,30 +252,6 @@ where
         >,
     M: StateMachine<Input = Message<PrepareHeader>>,
 {
-    async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
-        let consensus = self.consensus.as_ref().unwrap();
-
-        debug!("inserting prepare into metadata pipeline");
-        consensus.verify_pipeline();
-        // Pipeline-first ordering is safe only because message
-        // processing is cooperative (single-task, RefCell-based).
-        // If on_replicate ever early-returns (syncing, status change)
-        // the entry would be in the pipeline without journal backing.
-        consensus.pipeline_message(prepare.clone());
-        self.on_replicate(prepare.clone()).await;
-
-        consensus.post_replicate_verify(&prepare);
-    }
-
-    fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool {
-        let consensus = self.consensus.as_ref().unwrap();
-        let journal = self.journal.as_ref().unwrap();
-
-        let header = prepare.header();
-        // TODO: Handle idx calculation, for now using header.op, but since 
the journal may get compacted, this may not be correct.
-        header.op <= consensus.commit() || journal.handle().header(header.op 
as usize).is_some()
-    }
-
     /// Replicate a prepare message to the next replica in the chain.
     ///
     /// Chain replication pattern:
@@ -357,51 +271,7 @@ where
             journal.handle().header(idx).is_none(),
             "replicate: must not already have prepare"
         );
-        assert!(header.op > consensus.commit());
-
-        let next = (consensus.replica() + 1) % consensus.replica_count();
-
-        let primary = consensus.primary_index(header.view);
-        if next == primary {
-            debug!(
-                replica = consensus.replica(),
-                op = header.op,
-                "replicate: not replicating (ring complete)"
-            );
-            return;
-        }
-
-        assert_ne!(next, consensus.replica());
-
-        debug!(
-            replica = consensus.replica(),
-            to = next,
-            op = header.op,
-            "replicate: forwarding"
-        );
-
-        let message = message.into_generic();
-        consensus
-            .message_bus()
-            .send_to_replica(next, message)
-            .await
-            .unwrap();
-    }
-
-    /// Verify hash chain would not break if we add this header.
-    fn panic_if_hash_chain_would_break_in_same_view(
-        &self,
-        previous: &PrepareHeader,
-        current: &PrepareHeader,
-    ) {
-        // If both headers are in the same view, parent must chain correctly
-        if previous.view == current.view {
-            assert_eq!(
-                current.parent, previous.checksum,
-                "hash chain broken in same view: op={} parent={} expected={}",
-                current.op, current.parent, previous.checksum
-            );
-        }
+        replicate_to_next_in_chain(consensus, message).await;
     }
 
     // TODO: Implement jump_to_newer_op
@@ -413,114 +283,10 @@ where
         // Apply each entry to the state machine
     }
 
-    /// Send a prepare_ok message to the primary.
-    /// Called after successfully writing a prepare to the journal.
     async fn send_prepare_ok(&self, header: &PrepareHeader) {
         let consensus = self.consensus.as_ref().unwrap();
         let journal = self.journal.as_ref().unwrap();
-
-        assert_eq!(header.command, Command2::Prepare);
-
-        if consensus.status() != Status::Normal {
-            debug!(
-                replica = consensus.replica(),
-                status = ?consensus.status(),
-                "send_prepare_ok: not sending (not normal)"
-            );
-            return;
-        }
-
-        if consensus.is_syncing() {
-            debug!(
-                replica = consensus.replica(),
-                "send_prepare_ok: not sending (syncing)"
-            );
-            return;
-        }
-
-        // Verify we have the prepare and it's persisted (not dirty).
-        if journal.handle().header(header.op as usize).is_none() {
-            debug!(
-                replica = consensus.replica(),
-                op = header.op,
-                "send_prepare_ok: not sending (not persisted or missing)"
-            );
-            return;
-        }
-
-        assert!(
-            header.view <= consensus.view(),
-            "send_prepare_ok: prepare view {} > our view {}",
-            header.view,
-            consensus.view()
-        );
-
-        if header.op > consensus.sequencer().current_sequence() {
-            debug!(
-                replica = consensus.replica(),
-                op = header.op,
-                our_op = consensus.sequencer().current_sequence(),
-                "send_prepare_ok: not sending (op ahead)"
-            );
-            return;
-        }
-
-        debug!(
-            replica = consensus.replica(),
-            op = header.op,
-            checksum = header.checksum,
-            "send_prepare_ok: sending"
-        );
-
-        // Use current view, not the prepare's view.
-        let prepare_ok_header = PrepareOkHeader {
-            command: Command2::PrepareOk,
-            cluster: consensus.cluster(),
-            replica: consensus.replica(),
-            view: consensus.view(),
-            op: header.op,
-            commit: consensus.commit(),
-            timestamp: header.timestamp,
-            parent: header.parent,
-            prepare_checksum: header.checksum,
-            request: header.request,
-            operation: header.operation,
-            namespace: header.namespace,
-            size: std::mem::size_of::<PrepareOkHeader>() as u32,
-            ..Default::default()
-        };
-
-        let message: Message<PrepareOkHeader> =
-            
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
-                .transmute_header(|_, new| *new = prepare_ok_header);
-        let generic_message = message.into_generic();
-        let primary = consensus.primary_index(consensus.view());
-
-        if primary == consensus.replica() {
-            debug!(
-                replica = consensus.replica(),
-                "send_prepare_ok: loopback to self"
-            );
-            // TODO: Queue for self-processing or call handle_prepare_ok 
directly
-            // TODO: This is temporal, to test simulator, but we should send 
message to ourselves properly.
-            consensus
-                .message_bus()
-                .send_to_replica(primary, generic_message)
-                .await
-                .unwrap();
-        } else {
-            debug!(
-                replica = consensus.replica(),
-                to = primary,
-                op = header.op,
-                "send_prepare_ok: sending to primary"
-            );
-
-            consensus
-                .message_bus()
-                .send_to_replica(primary, generic_message)
-                .await
-                .unwrap();
-        }
+        let persisted = journal.handle().header(header.op as usize).is_some();
+        send_prepare_ok_common(consensus, header, Some(persisted)).await;
     }
 }
diff --git a/core/metadata/src/lib.rs b/core/metadata/src/lib.rs
index 08c137fff..7ae61f217 100644
--- a/core/metadata/src/lib.rs
+++ b/core/metadata/src/lib.rs
@@ -23,8 +23,8 @@ pub mod stm;
 
 mod stats;
 
-// Re-export IggyMetadata and Metadata trait for use in other modules
-pub use impls::metadata::{IggyMetadata, Metadata};
+// Re-export IggyMetadata for use in other modules
+pub use impls::metadata::IggyMetadata;
 
 // Re-export MuxStateMachine for use in other modules
 pub use stm::mux::MuxStateMachine;
diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index 13f55b0c5..47e71dea2 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -17,9 +17,14 @@
 
 use crate::journal::{Noop, PartitionJournal};
 use crate::log::SegmentedLog;
-use iggy_common::{ConsumerGroupOffsets, ConsumerOffsets, IggyTimestamp, 
PartitionStats};
+use crate::{AppendResult, Partition};
+use iggy_common::{
+    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, 
IggyMessagesBatchMut,
+    IggyTimestamp, PartitionStats,
+};
+use journal::Journal as _;
 use std::sync::Arc;
-use std::sync::atomic::AtomicU64;
+use std::sync::atomic::{AtomicU64, Ordering};
 use tokio::sync::Mutex as TokioMutex;
 
 // This struct aliases in terms of the code contained the `LocalPartition from 
`core/server/src/streaming/partitions/local_partition.rs`.
@@ -56,3 +61,70 @@ impl IggyPartition {
         }
     }
 }
+
+impl Partition for IggyPartition {
+    async fn append_messages(
+        &mut self,
+        mut batch: IggyMessagesBatchMut,
+    ) -> Result<AppendResult, IggyError> {
+        if batch.count() == 0 {
+            return Ok(AppendResult::new(0, 0, 0));
+        }
+
+        let dirty_offset = if self.should_increment_offset {
+            self.dirty_offset.load(Ordering::Relaxed) + 1
+        } else {
+            0
+        };
+
+        let segment = self.log.active_segment();
+        let segment_start_offset = segment.start_offset;
+        let current_position = segment.current_position;
+
+        batch
+            .prepare_for_persistence(segment_start_offset, dirty_offset, 
current_position, None)
+            .await;
+
+        let batch_messages_count = batch.count();
+        let batch_messages_size = batch.size();
+
+        let last_dirty_offset = if batch_messages_count == 0 {
+            dirty_offset
+        } else {
+            dirty_offset + batch_messages_count as u64 - 1
+        };
+
+        if self.should_increment_offset {
+            self.dirty_offset
+                .store(last_dirty_offset, Ordering::Relaxed);
+        } else {
+            self.should_increment_offset = true;
+            self.dirty_offset
+                .store(last_dirty_offset, Ordering::Relaxed);
+        }
+
+        let segment_index = self.log.segments().len() - 1;
+        self.log.segments_mut()[segment_index].current_position += 
batch_messages_size;
+
+        let journal = self.log.journal_mut();
+        journal.info.messages_count += batch_messages_count;
+        journal.info.size += IggyByteSize::from(batch_messages_size as u64);
+        journal.info.current_offset = last_dirty_offset;
+        if let Some(ts) = batch.first_timestamp()
+            && journal.info.first_timestamp == 0
+        {
+            journal.info.first_timestamp = ts;
+        }
+        if let Some(ts) = batch.last_timestamp() {
+            journal.info.end_timestamp = ts;
+        }
+
+        journal.inner.append(batch).await;
+
+        Ok(AppendResult::new(
+            dirty_offset,
+            last_dirty_offset,
+            batch_messages_count,
+        ))
+    }
+}
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index b3cdf414c..54891dfdd 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -18,17 +18,20 @@
 #![allow(dead_code)]
 
 use crate::IggyPartition;
-use crate::Partitions;
+use crate::Partition;
 use crate::types::PartitionsConfig;
-use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
+use consensus::{
+    Consensus, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, 
ack_preflight,
+    ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, 
pipeline_prepare_common,
+    replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as 
send_prepare_ok_common,
+};
 use iggy_common::{
     INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, 
PartitionStats, PooledBuffer,
     Segment, SegmentStorage,
-    header::{Command2, GenericHeader, Operation, PrepareHeader, 
PrepareOkHeader, ReplyHeader},
+    header::{GenericHeader, Operation, PrepareHeader},
     message::Message,
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
-use journal::Journal as _;
 use message_bus::MessageBus;
 use std::cell::UnsafeCell;
 use std::collections::HashMap;
@@ -326,7 +329,7 @@ impl<C> IggyPartitions<C> {
     }
 }
 
-impl<B> Partitions<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>>
+impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
 {
@@ -335,7 +338,7 @@ where
 
         debug!("handling partition request");
         let prepare = message.project(consensus);
-        self.pipeline_prepare(prepare).await;
+        pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
     }
 
     async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::ReplicateMessage) {
@@ -343,48 +346,24 @@ where
 
         let header = message.header();
 
-        assert_eq!(header.command, Command2::Prepare);
+        let current_op = match replicate_preflight(consensus, header) {
+            Ok(current_op) => current_op,
+            Err(reason) => {
+                warn!(
+                    replica = consensus.replica(),
+                    "on_replicate: ignoring ({reason})"
+                );
+                return;
+            }
+        };
 
-        if !self.fence_old_prepare(&message) {
+        let is_old_prepare = fence_old_prepare_by_commit(consensus, header);
+        if !is_old_prepare {
             self.replicate(message.clone()).await;
         } else {
             warn!("received old prepare, not replicating");
         }
 
-        // If syncing, ignore the replicate message.
-        if consensus.is_syncing() {
-            warn!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (sync)"
-            );
-            return;
-        }
-
-        let current_op = consensus.sequencer().current_sequence();
-
-        // If status is not normal, ignore the replicate.
-        if consensus.status() != Status::Normal {
-            warn!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (not normal state)"
-            );
-            return;
-        }
-
-        // If message from future view, we ignore the replicate.
-        if header.view > consensus.view() {
-            warn!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (newer view)"
-            );
-            return;
-        }
-
-        // If we are a follower, we advance the commit number.
-        if consensus.is_follower() {
-            consensus.advance_commit_number(message.header().commit);
-        }
-
         // TODO: Make those assertions be toggleable through an feature flag, 
so they can be used only by simulator/tests.
         debug_assert_eq!(header.op, current_op + 1);
         consensus.sequencer().set_sequence(header.op);
@@ -393,36 +372,7 @@ where
         // In metadata layer we assume that when an `on_request` or 
`on_replicate` is called, it's called from correct shard.
         // I think we need to do the same here, which means that the code from 
below is unfallable, the partition should always exist by now!
         let namespace = IggyNamespace::from_raw(header.namespace);
-        match header.operation {
-            Operation::SendMessages => {
-                let body = message.body_bytes();
-                let batch = Self::batch_from_body(&body);
-                self.append_batch(&namespace, batch).await;
-                debug!(
-                    replica = consensus.replica(),
-                    op = header.op,
-                    ?namespace,
-                    "on_replicate: batch appended to partition journal"
-                );
-            }
-            Operation::StoreConsumerOffset => {
-                // TODO: Deserialize consumer offset from prepare body
-                // and store in partition's consumer_offsets.
-                debug!(
-                    replica = consensus.replica(),
-                    op = header.op,
-                    "on_replicate: consumer offset stored"
-                );
-            }
-            _ => {
-                warn!(
-                    replica = consensus.replica(),
-                    op = header.op,
-                    "on_replicate: unexpected operation {:?}",
-                    header.operation
-                );
-            }
-        }
+        self.apply_replicated_operation(&message, &namespace).await;
 
         // After successful journal write, send prepare_ok to primary.
         self.send_prepare_ok(header).await;
@@ -437,17 +387,11 @@ where
         let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
 
-        if !consensus.is_primary() {
-            warn!("on_ack: ignoring (not primary)");
-            return;
-        }
-
-        if consensus.status() != Status::Normal {
-            warn!("on_ack: ignoring (not normal)");
+        if let Err(reason) = ack_preflight(consensus) {
+            warn!("on_ack: ignoring ({reason})");
             return;
         }
 
-        // Verify checksum by checking pipeline entry exists
         {
             let pipeline = consensus.pipeline().borrow();
             let Some(entry) =
@@ -463,21 +407,21 @@ where
             }
         }
 
-        // Let consensus handle the ack increment and quorum check
-        if consensus.handle_prepare_ok(header) {
+        if ack_quorum_reached(consensus, header) {
             debug!("on_ack: quorum received for op={}", header.op);
-            consensus.advance_commit_number(header.op);
 
             // Extract the prepare message from the pipeline by op
             // TODO: Commit from the head. ALWAYS
             let entry = 
consensus.pipeline().borrow_mut().extract_by_op(header.op);
-            let Some(entry) = entry else {
+            let Some(PipelineEntry {
+                header: prepare_header,
+                ..
+            }) = entry
+            else {
                 warn!("on_ack: prepare not found in pipeline for op={}", 
header.op);
                 return;
             };
 
-            let prepare_header = entry.header;
-
             // Data was already appended to the partition journal during
             // on_replicate. Now that quorum is reached, update the partition's
             // current offset and check whether the journal needs flushing.
@@ -503,32 +447,8 @@ where
                 }
             }
 
-            // TODO: Figure out better infra for this, its messy.
-            let reply = 
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>())
-                .transmute_header(|_, new| {
-                    *new = ReplyHeader {
-                        checksum: 0,
-                        checksum_body: 0,
-                        cluster: consensus.cluster(),
-                        size: std::mem::size_of::<ReplyHeader>() as u32,
-                        view: consensus.view(),
-                        release: 0,
-                        command: Command2::Reply,
-                        replica: consensus.replica(),
-                        reserved_frame: [0; 66],
-                        request_checksum: prepare_header.request_checksum,
-                        context: 0,
-                        op: prepare_header.op,
-                        commit: consensus.commit(),
-                        timestamp: prepare_header.timestamp,
-                        request: prepare_header.request,
-                        operation: prepare_header.operation,
-                        ..Default::default()
-                    };
-                });
-
             // Send reply to client
-            let generic_reply = reply.into_generic();
+            let generic_reply = build_reply_message(consensus, 
&prepare_header).into_generic();
             debug!(
                 "on_ack: sending reply to client={} for op={}",
                 prepare_header.client, prepare_header.op
@@ -566,6 +486,51 @@ where
         IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages)
     }
 
+    async fn apply_replicated_operation(
+        &self,
+        message: &Message<PrepareHeader>,
+        namespace: &IggyNamespace,
+    ) {
+        let consensus = self.consensus.as_ref().unwrap();
+        let header = message.header();
+
+        match header.operation {
+            Operation::SendMessages => {
+                let body = message.body_bytes();
+                self.append_send_messages_to_journal(namespace, body.as_ref())
+                    .await;
+                debug!(
+                    replica = consensus.replica(),
+                    op = header.op,
+                    ?namespace,
+                    "on_replicate: send_messages appended to partition journal"
+                );
+            }
+            Operation::StoreConsumerOffset => {
+                // TODO: Deserialize consumer offset from prepare body
+                // and store in partition's consumer_offsets.
+                debug!(
+                    replica = consensus.replica(),
+                    op = header.op,
+                    "on_replicate: consumer offset stored"
+                );
+            }
+            _ => {
+                warn!(
+                    replica = consensus.replica(),
+                    op = header.op,
+                    "on_replicate: unexpected operation {:?}",
+                    header.operation
+                );
+            }
+        }
+    }
+
+    async fn append_send_messages_to_journal(&self, namespace: &IggyNamespace, 
body: &[u8]) {
+        let batch = Self::batch_from_body(body);
+        self.append_messages_to_journal(namespace, batch).await;
+    }
+
     /// Append a batch to a partition's journal with offset assignment.
     ///
     /// Updates `segment.current_position` (logical position for indexing) but
@@ -574,91 +539,15 @@ where
     ///
     /// Uses `dirty_offset` for offset assignment so that multiple prepares
     /// can be pipelined before any commit.
-    async fn append_batch(&self, namespace: &IggyNamespace, mut batch: 
IggyMessagesBatchMut) {
+    async fn append_messages_to_journal(
+        &self,
+        namespace: &IggyNamespace,
+        batch: IggyMessagesBatchMut,
+    ) {
         let partition = self
             .get_mut_by_ns(namespace)
-            .expect("append_batch: partition not found for namespace");
-
-        if batch.count() == 0 {
-            return;
-        }
-
-        let dirty_offset = if partition.should_increment_offset {
-            partition.dirty_offset.load(Ordering::Relaxed) + 1
-        } else {
-            0
-        };
-
-        let segment = partition.log.active_segment();
-        let segment_start_offset = segment.start_offset;
-        let current_position = segment.current_position;
-
-        batch
-            .prepare_for_persistence(segment_start_offset, dirty_offset, 
current_position, None)
-            .await;
-
-        let batch_messages_count = batch.count();
-        let batch_messages_size = batch.size();
-
-        // Advance dirty offset (committed offset is advanced in on_ack).
-        let last_dirty_offset = if batch_messages_count == 0 {
-            dirty_offset
-        } else {
-            dirty_offset + batch_messages_count as u64 - 1
-        };
-
-        if partition.should_increment_offset {
-            partition
-                .dirty_offset
-                .store(last_dirty_offset, Ordering::Relaxed);
-        } else {
-            partition.should_increment_offset = true;
-            partition
-                .dirty_offset
-                .store(last_dirty_offset, Ordering::Relaxed);
-        }
-
-        // Update segment.current_position for next prepare_for_persistence 
call.
-        // This is the logical position (includes unflushed journal data).
-        // segment.size is only updated after actual persist (in 
persist_frozen_batches_to_disk).
-        let segment_index = partition.log.segments().len() - 1;
-        partition.log.segments_mut()[segment_index].current_position += 
batch_messages_size;
-
-        // Update journal tracking metadata.
-        let journal = partition.log.journal_mut();
-        journal.info.messages_count += batch_messages_count;
-        journal.info.size += IggyByteSize::from(batch_messages_size as u64);
-        journal.info.current_offset = last_dirty_offset;
-        if let Some(ts) = batch.first_timestamp()
-            && journal.info.first_timestamp == 0
-        {
-            journal.info.first_timestamp = ts;
-        }
-        if let Some(ts) = batch.last_timestamp() {
-            journal.info.end_timestamp = ts;
-        }
-
-        journal.inner.append(batch).await;
-    }
-
-    async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
-        let consensus = self.consensus.as_ref().unwrap();
-
-        debug!("inserting prepare into partition pipeline");
-        consensus.verify_pipeline();
-        consensus.pipeline_message(prepare.clone());
-
-        self.on_replicate(prepare.clone()).await;
-        consensus.post_replicate_verify(&prepare);
-    }
-
-    fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool {
-        let consensus = self.consensus.as_ref().unwrap();
-
-        let header = prepare.header();
-        // TODO: Check per-partition journal once namespace extraction is 
possible.
-        // For now, only check if the op is already committed.
-        header.op <= consensus.commit()
+            .expect("append_messages_to_journal: partition not found for 
namespace");
+        let _ = partition.append_messages(batch).await;
     }
 
     /// Replicate a prepare message to the next replica in the chain.
@@ -669,55 +558,7 @@ where
     /// - Stops when we would forward back to primary
     async fn replicate(&self, message: Message<PrepareHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
-
-        let header = message.header();
-
-        assert_eq!(header.command, Command2::Prepare);
-        assert!(header.op > consensus.commit());
-
-        let next = (consensus.replica() + 1) % consensus.replica_count();
-
-        let primary = consensus.primary_index(header.view);
-        if next == primary {
-            debug!(
-                replica = consensus.replica(),
-                op = header.op,
-                "replicate: not replicating (ring complete)"
-            );
-            return;
-        }
-
-        assert_ne!(next, consensus.replica());
-
-        debug!(
-            replica = consensus.replica(),
-            to = next,
-            op = header.op,
-            "replicate: forwarding"
-        );
-
-        let message = message.into_generic();
-        consensus
-            .message_bus()
-            .send_to_replica(next, message)
-            .await
-            .unwrap();
-    }
-
-    /// Verify hash chain would not break if we add this header.
-    fn panic_if_hash_chain_would_break_in_same_view(
-        &self,
-        previous: &PrepareHeader,
-        current: &PrepareHeader,
-    ) {
-        // If both headers are in the same view, parent must chain correctly
-        if previous.view == current.view {
-            assert_eq!(
-                current.parent, previous.checksum,
-                "hash chain broken in same view: op={} parent={} expected={}",
-                current.op, current.parent, previous.checksum
-            );
-        }
+        replicate_to_next_in_chain(consensus, message).await;
     }
 
     fn commit_journal(&self) {
@@ -945,107 +786,11 @@ where
         debug!(?namespace, start_offset, "rotated to new segment");
     }
 
-    /// Send a prepare_ok message to the primary.
-    /// Called after successfully writing a prepare to the journal.
     async fn send_prepare_ok(&self, header: &PrepareHeader) {
         let consensus = self.consensus.as_ref().unwrap();
-
-        assert_eq!(header.command, Command2::Prepare);
-
-        if consensus.status() != Status::Normal {
-            debug!(
-                replica = consensus.replica(),
-                status = ?consensus.status(),
-                "send_prepare_ok: not sending (not normal)"
-            );
-            return;
-        }
-
-        if consensus.is_syncing() {
-            debug!(
-                replica = consensus.replica(),
-                "send_prepare_ok: not sending (syncing)"
-            );
-            return;
-        }
-
         // TODO: Verify the prepare is persisted in the partition journal.
         // The partition journal uses MessageLookup headers, so we cannot
         // check by PrepareHeader.op directly. For now, skip this check.
-
-        assert!(
-            header.view <= consensus.view(),
-            "send_prepare_ok: prepare view {} > our view {}",
-            header.view,
-            consensus.view()
-        );
-
-        if header.op > consensus.sequencer().current_sequence() {
-            debug!(
-                replica = consensus.replica(),
-                op = header.op,
-                our_op = consensus.sequencer().current_sequence(),
-                "send_prepare_ok: not sending (op ahead)"
-            );
-            return;
-        }
-
-        debug!(
-            replica = consensus.replica(),
-            op = header.op,
-            checksum = header.checksum,
-            "send_prepare_ok: sending"
-        );
-
-        // Use current view, not the prepare's view.
-        let prepare_ok_header = PrepareOkHeader {
-            command: Command2::PrepareOk,
-            cluster: consensus.cluster(),
-            replica: consensus.replica(),
-            view: consensus.view(),
-            op: header.op,
-            commit: consensus.commit(),
-            timestamp: header.timestamp,
-            parent: header.parent,
-            prepare_checksum: header.checksum,
-            request: header.request,
-            operation: header.operation,
-            namespace: header.namespace,
-            size: std::mem::size_of::<PrepareOkHeader>() as u32,
-            ..Default::default()
-        };
-
-        let message: Message<PrepareOkHeader> =
-            
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
-                .transmute_header(|_, new| *new = prepare_ok_header);
-        let generic_message = message.into_generic();
-        let primary = consensus.primary_index(consensus.view());
-
-        if primary == consensus.replica() {
-            debug!(
-                replica = consensus.replica(),
-                "send_prepare_ok: loopback to self"
-            );
-            // TODO: Queue for self-processing or call handle_prepare_ok 
directly
-            // TODO: This is temporal, to test simulator, but we should send 
message to ourselves properly.
-            consensus
-                .message_bus()
-                .send_to_replica(primary, generic_message)
-                .await
-                .unwrap();
-        } else {
-            debug!(
-                replica = consensus.replica(),
-                to = primary,
-                op = header.op,
-                "send_prepare_ok: sending to primary"
-            );
-
-            consensus
-                .message_bus()
-                .send_to_replica(primary, generic_message)
-                .await
-                .unwrap();
-        }
+        send_prepare_ok_common(consensus, header, None).await;
     }
 }
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index 01ad8da1d..d92b078c4 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -21,7 +21,7 @@ mod journal;
 mod log;
 mod types;
 
-use consensus::Consensus;
+use iggy_common::{IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet};
 pub use iggy_partition::IggyPartition;
 pub use iggy_partitions::IggyPartitions;
 pub use types::{
@@ -29,20 +29,40 @@ pub use types::{
     SendMessagesResult,
 };
 
-// TODO: Figure out how this can be somehow merged with `Metadata` trait, in a 
sense, where the `Metadata` trait would be gone
-// and something more general purpose is put in the place.
-
-/// Consensus lifecycle for partition operations (mirrors `Metadata<C>`).
+/// Partition-level data plane operations.
 ///
-/// Handles the VSR replication flow for partition writes:
-/// - `on_request`: Primary receives a client write, projects to Prepare, 
pipelines it
-/// - `on_replicate`: Replica receives Prepare, appends to journal, sends 
PrepareOk
-/// - `on_ack`: Primary receives PrepareOk, checks quorum, commits
-pub trait Partitions<C>
-where
-    C: Consensus,
-{
-    fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = 
()>;
-    fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output 
= ()>;
-    fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
+/// `send_messages` MUST only append to the partition journal (prepare phase),
+/// without committing/persisting to disk.
+pub trait Partition {
+    fn append_messages(
+        &mut self,
+        batch: IggyMessagesBatchMut,
+    ) -> impl Future<Output = Result<AppendResult, IggyError>>;
+
+    fn poll_messages(
+        &self,
+        consumer: PollingConsumer,
+        args: PollingArgs,
+    ) -> impl Future<Output = Result<IggyMessagesBatchSet, IggyError>> {
+        let _ = (consumer, args);
+        async { Err(IggyError::FeatureUnavailable) }
+    }
+
+    fn store_consumer_offset(
+        &self,
+        consumer: PollingConsumer,
+        offset: u64,
+    ) -> Result<(), IggyError> {
+        let _ = (consumer, offset);
+        Err(IggyError::FeatureUnavailable)
+    }
+
+    fn get_consumer_offset(&self, consumer: PollingConsumer) -> Option<u64> {
+        let _ = consumer;
+        None
+    }
+
+    fn offsets(&self) -> PartitionOffsets {
+        PartitionOffsets::default()
+    }
 }
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index c808481a2..44e063f78 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -21,11 +21,10 @@ pub mod deps;
 pub mod replica;
 
 use bus::MemBus;
+use consensus::Plane;
 use iggy_common::header::{GenericHeader, ReplyHeader};
 use iggy_common::message::{Message, MessageBag};
 use message_bus::MessageBus;
-use metadata::Metadata;
-use partitions::Partitions;
 use replica::Replica;
 use std::sync::Arc;
 

Reply via email to