This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch plane_refactor
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit a7274f1c7b53304fade06d5d8f3a63c91f15de80
Author: numinex <[email protected]>
AuthorDate: Mon Feb 16 17:26:04 2026 +0100

    m
---
 core/consensus/src/lib.rs              |  15 ++++
 core/metadata/src/impls/metadata.rs    |  20 +----
 core/metadata/src/lib.rs               |   4 +-
 core/partitions/src/iggy_partition.rs  |  76 +++++++++++++++-
 core/partitions/src/iggy_partitions.rs | 153 ++++++++++++---------------------
 core/partitions/src/lib.rs             |  52 +++++++----
 core/simulator/src/lib.rs              |   3 +-
 7 files changed, 188 insertions(+), 135 deletions(-)

diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 5c3e152e6..9a7fd5bae 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -66,6 +66,21 @@ pub trait Consensus: Sized {
     fn is_syncing(&self) -> bool;
 }
 
+/// Shared consensus lifecycle interface for control/data planes.
+///
+/// This abstracts the VSR message flow:
+/// - request -> prepare
+/// - replicate (prepare)
+/// - ack (prepare_ok)
+pub trait Plane<C>
+where
+    C: Consensus,
+{
+    fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = 
()>;
+    fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output 
= ()>;
+    fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
+}
+
 mod impls;
 pub use impls::*;
 
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 77a174e39..c4a55fc54 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -16,7 +16,9 @@
 // under the License.
 use crate::stm::StateMachine;
 use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, 
SnapshotError};
-use consensus::{Consensus, Pipeline, PipelineEntry, Project, Sequencer, 
Status, VsrConsensus};
+use consensus::{
+    Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, Status, 
VsrConsensus,
+};
 use iggy_common::{
     header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, 
ReplyHeader},
     message::Message,
@@ -79,20 +81,6 @@ impl Snapshot for IggySnapshot {
     }
 }
 
-pub trait Metadata<C>
-where
-    C: Consensus,
-{
-    /// Handle a request message.
-    fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = 
()>;
-
-    /// Handle a replicate message (Prepare in VSR).
-    fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output 
= ()>;
-
-    /// Handle an ack message (PrepareOk in VSR).
-    fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
-}
-
 #[derive(Debug)]
 pub struct IggyMetadata<C, J, S, M> {
     /// Some on shard0, None on other shards
@@ -105,7 +93,7 @@ pub struct IggyMetadata<C, J, S, M> {
     pub mux_stm: M,
 }
 
-impl<B, P, J, S, M> Metadata<VsrConsensus<B, P>> for 
IggyMetadata<VsrConsensus<B, P>, J, S, M>
+impl<B, P, J, S, M> Plane<VsrConsensus<B, P>> for IggyMetadata<VsrConsensus<B, 
P>, J, S, M>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
diff --git a/core/metadata/src/lib.rs b/core/metadata/src/lib.rs
index 08c137fff..7ae61f217 100644
--- a/core/metadata/src/lib.rs
+++ b/core/metadata/src/lib.rs
@@ -23,8 +23,8 @@ pub mod stm;
 
 mod stats;
 
-// Re-export IggyMetadata and Metadata trait for use in other modules
-pub use impls::metadata::{IggyMetadata, Metadata};
+// Re-export IggyMetadata for use in other modules
+pub use impls::metadata::IggyMetadata;
 
 // Re-export MuxStateMachine for use in other modules
 pub use stm::mux::MuxStateMachine;
diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index 13f55b0c5..47e71dea2 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -17,9 +17,14 @@
 
 use crate::journal::{Noop, PartitionJournal};
 use crate::log::SegmentedLog;
-use iggy_common::{ConsumerGroupOffsets, ConsumerOffsets, IggyTimestamp, 
PartitionStats};
+use crate::{AppendResult, Partition};
+use iggy_common::{
+    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, 
IggyMessagesBatchMut,
+    IggyTimestamp, PartitionStats,
+};
+use journal::Journal as _;
 use std::sync::Arc;
-use std::sync::atomic::AtomicU64;
+use std::sync::atomic::{AtomicU64, Ordering};
 use tokio::sync::Mutex as TokioMutex;
 
 // This struct aliases in terms of the code contained the `LocalPartition from 
`core/server/src/streaming/partitions/local_partition.rs`.
@@ -56,3 +61,70 @@ impl IggyPartition {
         }
     }
 }
+
+impl Partition for IggyPartition {
+    async fn append_messages(
+        &mut self,
+        mut batch: IggyMessagesBatchMut,
+    ) -> Result<AppendResult, IggyError> {
+        if batch.count() == 0 {
+            return Ok(AppendResult::new(0, 0, 0));
+        }
+
+        let dirty_offset = if self.should_increment_offset {
+            self.dirty_offset.load(Ordering::Relaxed) + 1
+        } else {
+            0
+        };
+
+        let segment = self.log.active_segment();
+        let segment_start_offset = segment.start_offset;
+        let current_position = segment.current_position;
+
+        batch
+            .prepare_for_persistence(segment_start_offset, dirty_offset, 
current_position, None)
+            .await;
+
+        let batch_messages_count = batch.count();
+        let batch_messages_size = batch.size();
+
+        let last_dirty_offset = if batch_messages_count == 0 {
+            dirty_offset
+        } else {
+            dirty_offset + batch_messages_count as u64 - 1
+        };
+
+        if self.should_increment_offset {
+            self.dirty_offset
+                .store(last_dirty_offset, Ordering::Relaxed);
+        } else {
+            self.should_increment_offset = true;
+            self.dirty_offset
+                .store(last_dirty_offset, Ordering::Relaxed);
+        }
+
+        let segment_index = self.log.segments().len() - 1;
+        self.log.segments_mut()[segment_index].current_position += 
batch_messages_size;
+
+        let journal = self.log.journal_mut();
+        journal.info.messages_count += batch_messages_count;
+        journal.info.size += IggyByteSize::from(batch_messages_size as u64);
+        journal.info.current_offset = last_dirty_offset;
+        if let Some(ts) = batch.first_timestamp()
+            && journal.info.first_timestamp == 0
+        {
+            journal.info.first_timestamp = ts;
+        }
+        if let Some(ts) = batch.last_timestamp() {
+            journal.info.end_timestamp = ts;
+        }
+
+        journal.inner.append(batch).await;
+
+        Ok(AppendResult::new(
+            dirty_offset,
+            last_dirty_offset,
+            batch_messages_count,
+        ))
+    }
+}
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index b3cdf414c..305f2dfb2 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -18,9 +18,9 @@
 #![allow(dead_code)]
 
 use crate::IggyPartition;
-use crate::Partitions;
+use crate::Partition;
 use crate::types::PartitionsConfig;
-use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
+use consensus::{Consensus, Plane, Project, Sequencer, Status, VsrConsensus};
 use iggy_common::{
     INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, 
PartitionStats, PooledBuffer,
     Segment, SegmentStorage,
@@ -28,7 +28,6 @@ use iggy_common::{
     message::Message,
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
-use journal::Journal as _;
 use message_bus::MessageBus;
 use std::cell::UnsafeCell;
 use std::collections::HashMap;
@@ -326,7 +325,7 @@ impl<C> IggyPartitions<C> {
     }
 }
 
-impl<B> Partitions<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>>
+impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
 {
@@ -393,36 +392,7 @@ where
         // In metadata layer we assume that when an `on_request` or 
`on_replicate` is called, it's called from correct shard.
         // I think we need to do the same here, which means that the code from 
below is unfallable, the partition should always exist by now!
         let namespace = IggyNamespace::from_raw(header.namespace);
-        match header.operation {
-            Operation::SendMessages => {
-                let body = message.body_bytes();
-                let batch = Self::batch_from_body(&body);
-                self.append_batch(&namespace, batch).await;
-                debug!(
-                    replica = consensus.replica(),
-                    op = header.op,
-                    ?namespace,
-                    "on_replicate: batch appended to partition journal"
-                );
-            }
-            Operation::StoreConsumerOffset => {
-                // TODO: Deserialize consumer offset from prepare body
-                // and store in partition's consumer_offsets.
-                debug!(
-                    replica = consensus.replica(),
-                    op = header.op,
-                    "on_replicate: consumer offset stored"
-                );
-            }
-            _ => {
-                warn!(
-                    replica = consensus.replica(),
-                    op = header.op,
-                    "on_replicate: unexpected operation {:?}",
-                    header.operation
-                );
-            }
-        }
+        self.apply_replicated_operation(&message, &namespace).await;
 
         // After successful journal write, send prepare_ok to primary.
         self.send_prepare_ok(header).await;
@@ -566,6 +536,51 @@ where
         IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages)
     }
 
+    async fn apply_replicated_operation(
+        &self,
+        message: &Message<PrepareHeader>,
+        namespace: &IggyNamespace,
+    ) {
+        let consensus = self.consensus.as_ref().unwrap();
+        let header = message.header();
+
+        match header.operation {
+            Operation::SendMessages => {
+                let body = message.body_bytes();
+                self.append_send_messages_to_journal(namespace, body.as_ref())
+                    .await;
+                debug!(
+                    replica = consensus.replica(),
+                    op = header.op,
+                    ?namespace,
+                    "on_replicate: send_messages appended to partition journal"
+                );
+            }
+            Operation::StoreConsumerOffset => {
+                // TODO: Deserialize consumer offset from prepare body
+                // and store in partition's consumer_offsets.
+                debug!(
+                    replica = consensus.replica(),
+                    op = header.op,
+                    "on_replicate: consumer offset stored"
+                );
+            }
+            _ => {
+                warn!(
+                    replica = consensus.replica(),
+                    op = header.op,
+                    "on_replicate: unexpected operation {:?}",
+                    header.operation
+                );
+            }
+        }
+    }
+
+    async fn append_send_messages_to_journal(&self, namespace: &IggyNamespace, 
body: &[u8]) {
+        let batch = Self::batch_from_body(body);
+        self.append_messages_to_journal(namespace, batch).await;
+    }
+
     /// Append a batch to a partition's journal with offset assignment.
     ///
     /// Updates `segment.current_position` (logical position for indexing) but
@@ -574,71 +589,15 @@ where
     ///
     /// Uses `dirty_offset` for offset assignment so that multiple prepares
     /// can be pipelined before any commit.
-    async fn append_batch(&self, namespace: &IggyNamespace, mut batch: 
IggyMessagesBatchMut) {
+    async fn append_messages_to_journal(
+        &self,
+        namespace: &IggyNamespace,
+        batch: IggyMessagesBatchMut,
+    ) {
         let partition = self
             .get_mut_by_ns(namespace)
-            .expect("append_batch: partition not found for namespace");
-
-        if batch.count() == 0 {
-            return;
-        }
-
-        let dirty_offset = if partition.should_increment_offset {
-            partition.dirty_offset.load(Ordering::Relaxed) + 1
-        } else {
-            0
-        };
-
-        let segment = partition.log.active_segment();
-        let segment_start_offset = segment.start_offset;
-        let current_position = segment.current_position;
-
-        batch
-            .prepare_for_persistence(segment_start_offset, dirty_offset, 
current_position, None)
-            .await;
-
-        let batch_messages_count = batch.count();
-        let batch_messages_size = batch.size();
-
-        // Advance dirty offset (committed offset is advanced in on_ack).
-        let last_dirty_offset = if batch_messages_count == 0 {
-            dirty_offset
-        } else {
-            dirty_offset + batch_messages_count as u64 - 1
-        };
-
-        if partition.should_increment_offset {
-            partition
-                .dirty_offset
-                .store(last_dirty_offset, Ordering::Relaxed);
-        } else {
-            partition.should_increment_offset = true;
-            partition
-                .dirty_offset
-                .store(last_dirty_offset, Ordering::Relaxed);
-        }
-
-        // Update segment.current_position for next prepare_for_persistence 
call.
-        // This is the logical position (includes unflushed journal data).
-        // segment.size is only updated after actual persist (in 
persist_frozen_batches_to_disk).
-        let segment_index = partition.log.segments().len() - 1;
-        partition.log.segments_mut()[segment_index].current_position += 
batch_messages_size;
-
-        // Update journal tracking metadata.
-        let journal = partition.log.journal_mut();
-        journal.info.messages_count += batch_messages_count;
-        journal.info.size += IggyByteSize::from(batch_messages_size as u64);
-        journal.info.current_offset = last_dirty_offset;
-        if let Some(ts) = batch.first_timestamp()
-            && journal.info.first_timestamp == 0
-        {
-            journal.info.first_timestamp = ts;
-        }
-        if let Some(ts) = batch.last_timestamp() {
-            journal.info.end_timestamp = ts;
-        }
-
-        journal.inner.append(batch).await;
+            .expect("append_messages_to_journal: partition not found for 
namespace");
+        let _ = partition.append_messages(batch).await;
     }
 
     async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index 01ad8da1d..d92b078c4 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -21,7 +21,7 @@ mod journal;
 mod log;
 mod types;
 
-use consensus::Consensus;
+use iggy_common::{IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet};
 pub use iggy_partition::IggyPartition;
 pub use iggy_partitions::IggyPartitions;
 pub use types::{
@@ -29,20 +29,40 @@ pub use types::{
     SendMessagesResult,
 };
 
-// TODO: Figure out how this can be somehow merged with `Metadata` trait, in a 
sense, where the `Metadata` trait would be gone
-// and something more general purpose is put in the place.
-
-/// Consensus lifecycle for partition operations (mirrors `Metadata<C>`).
+/// Partition-level data plane operations.
 ///
-/// Handles the VSR replication flow for partition writes:
-/// - `on_request`: Primary receives a client write, projects to Prepare, 
pipelines it
-/// - `on_replicate`: Replica receives Prepare, appends to journal, sends 
PrepareOk
-/// - `on_ack`: Primary receives PrepareOk, checks quorum, commits
-pub trait Partitions<C>
-where
-    C: Consensus,
-{
-    fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = 
()>;
-    fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output 
= ()>;
-    fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
+/// `send_messages` MUST only append to the partition journal (prepare phase),
+/// without committing/persisting to disk.
+pub trait Partition {
+    fn append_messages(
+        &mut self,
+        batch: IggyMessagesBatchMut,
+    ) -> impl Future<Output = Result<AppendResult, IggyError>>;
+
+    fn poll_messages(
+        &self,
+        consumer: PollingConsumer,
+        args: PollingArgs,
+    ) -> impl Future<Output = Result<IggyMessagesBatchSet, IggyError>> {
+        let _ = (consumer, args);
+        async { Err(IggyError::FeatureUnavailable) }
+    }
+
+    fn store_consumer_offset(
+        &self,
+        consumer: PollingConsumer,
+        offset: u64,
+    ) -> Result<(), IggyError> {
+        let _ = (consumer, offset);
+        Err(IggyError::FeatureUnavailable)
+    }
+
+    fn get_consumer_offset(&self, consumer: PollingConsumer) -> Option<u64> {
+        let _ = consumer;
+        None
+    }
+
+    fn offsets(&self) -> PartitionOffsets {
+        PartitionOffsets::default()
+    }
 }
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index c808481a2..44e063f78 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -21,11 +21,10 @@ pub mod deps;
 pub mod replica;
 
 use bus::MemBus;
+use consensus::Plane;
 use iggy_common::header::{GenericHeader, ReplyHeader};
 use iggy_common::message::{Message, MessageBag};
 use message_bus::MessageBus;
-use metadata::Metadata;
-use partitions::Partitions;
 use replica::Replica;
 use std::sync::Arc;
 

Reply via email to