This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 7db7eeae1 feat(cluster): add `Partitions` trait that unifies both 
single-node and VSR operations (#2620)
7db7eeae1 is described below

commit 7db7eeae143e594b59abd5e923aa0e335c4ca4e4
Author: Krishna Vishal <[email protected]>
AuthorDate: Wed Feb 4 19:44:20 2026 +0530

    feat(cluster): add `Partitions` trait that unifies both single-node and VSR 
operations (#2620)
    
    Add `Partitions` trait that unifies both single-node and VSR operations.
    Added a `PartitionStorage` trait which can be used to mock the storage
    for testing.
    
    ---------
    
    Co-authored-by: Grzegorz Koszyk 
<[email protected]>
---
 core/partitions/src/lib.rs   | 127 +++++++++++++++++++++++++++++++++++++++++--
 core/partitions/src/types.rs | 122 +++++++++++++++++++++++++++++++++++------
 2 files changed, 227 insertions(+), 22 deletions(-)

diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index b679e05a9..1e157cd03 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -19,15 +19,130 @@ mod iggy_partition;
 mod iggy_partitions;
 mod types;
 
+use iggy_common::sharding::IggyNamespace;
 pub use iggy_partition::IggyPartition;
 pub use iggy_partitions::IggyPartitions;
-pub use types::{PollMetadata, PollingArgs, PollingConsumer, 
SendMessagesResult};
+pub use types::{AppendResult, PartitionOffsets, PollingArgs, PollingConsumer, 
SendMessagesResult};
 
-/// The core abstraction for partition operations in clustering.
+/// High-level partition operations for request handlers.
 ///
-/// This trait defines the data-plane operations for partitions that
-/// need to be coordinated across a cluster using viewstamped replication.
-/// Implementations can vary between single-node and clustered deployments.
+/// This trait abstracts over both single-node vs clustered modes.
+///
+/// # Implementation:
+///
+/// ## Single-node:
+///
+/// ```text
+/// send_messages() ──► storage.append_prepared()
+///                           │
+///                           ▼
+///                     storage.advance_commit()
+///                           │
+///                           ▼
+///                     Return success
+/// ```
+///
+/// ## Clustered
+///
+/// ```text
+/// send_messages() ──► Create Prepare(messages)
+///                           │
+///                           ▼
+///                     Broadcast to replicas
+///                           │
+///                           ▼
+///                     Replicas: storage.append_prepared()
+///                           │
+///                           ▼
+///                     Wait for quorum PrepareOk
+///                           │
+///                           ▼
+///                     storage.advance_commit()
+///                           │
+///                           ▼
+///                     Return success
+/// ```
 pub trait Partitions {
-    // TODO(hubcio): define partition operations like poll, send, create, 
delete, etc.
+    /// The message batch type for write operations.
+    type MessageBatch;
+
+    /// The result type returned from poll operations.
+    type PollResult;
+
+    type Error;
+
+    /// Send messages to a partition.
+    ///
+    /// Messages are appended atomically as a batch with sequentially assigned
+    /// offsets. Returns only:
+    /// - Single-node: after durable local write (fsync)
+    /// - Clustered: after VSR quorum acknowledgment
+    fn send_messages(
+        &self,
+        namespace: IggyNamespace,
+        batch: Self::MessageBatch,
+    ) -> impl Future<Output = Result<SendMessagesResult, Self::Error>>;
+
+    /// Store consumer offset for progress tracking.
+    ///
+    /// Persists the consumer's position, enabling resumption after restarts.
+    fn store_consumer_offset(
+        &self,
+        consumer: PollingConsumer,
+        namespace: IggyNamespace,
+        offset: u64,
+    ) -> impl Future<Output = Result<(), Self::Error>>;
+
+    /// Poll messages from a partition.
+    ///
+    /// Returns only **committed** messages according to the polling strategy.
+    /// Messages that are prepared but not yet committed are not visible.
+    fn poll_messages(
+        &self,
+        consumer: PollingConsumer,
+        namespace: IggyNamespace,
+        args: PollingArgs,
+    ) -> impl Future<Output = Result<Self::PollResult, Self::Error>>;
+
+    /// Get stored consumer offset.
+    ///
+    /// Returns the last stored offset, or 0 if none exists.
+    fn get_consumer_offset(
+        &self,
+        consumer: PollingConsumer,
+        namespace: IggyNamespace,
+    ) -> impl Future<Output = Result<u64, Self::Error>>;
+
+    /// Get partition's current commit offset.
+    ///
+    /// Returns the highest committed offset (visible to consumers).
+    /// In clustered mode, may lag slightly behind the primary.
+    fn get_offset(
+        &self,
+        namespace: IggyNamespace,
+    ) -> impl Future<Output = Result<u64, Self::Error>>;
+
+    /// Initialize storage for a new partition.
+    ///
+    /// Sets up segments directory, initial segment, and offset tracking.
+    fn create_partition(
+        &self,
+        namespace: IggyNamespace,
+    ) -> impl Future<Output = Result<(), Self::Error>>;
+
+    /// Delete partition and all its data.
+    ///
+    /// Removes messages, segments, indexes, and consumer offsets.
+    fn delete_partition(
+        &self,
+        namespace: IggyNamespace,
+    ) -> impl Future<Output = Result<(), Self::Error>>;
+
+    /// Purge all messages, preserving partition structure.
+    ///
+    /// Resets offset to 0. Consumer offsets become invalid.
+    fn purge_partition(
+        &self,
+        namespace: IggyNamespace,
+    ) -> impl Future<Output = Result<(), Self::Error>>;
 }
diff --git a/core/partitions/src/types.rs b/core/partitions/src/types.rs
index 7ff1a9498..db33f980a 100644
--- a/core/partitions/src/types.rs
+++ b/core/partitions/src/types.rs
@@ -35,22 +35,6 @@ impl PollingArgs {
     }
 }
 
-/// Metadata returned from a poll operation.
-#[derive(Debug, Clone)]
-pub struct PollMetadata {
-    pub partition_id: u32,
-    pub current_offset: u64,
-}
-
-impl PollMetadata {
-    pub fn new(partition_id: u32, current_offset: u64) -> Self {
-        Self {
-            partition_id,
-            current_offset,
-        }
-    }
-}
-
 /// Result of sending messages.
 #[derive(Debug)]
 pub struct SendMessagesResult {
@@ -66,3 +50,109 @@ pub enum PollingConsumer {
     /// Consumer group with (group_id, member_id)
     ConsumerGroup(usize, usize),
 }
+
+/// Result of appending messages during the prepare phase.
+///
+/// Indicates the offset range assigned to the appended messages.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct AppendResult {
+    /// First offset assigned to the batch.
+    pub start_offset: u64,
+    /// Last offset assigned to the batch (inclusive).
+    pub end_offset: u64,
+    /// Number of messages in the batch.
+    pub messages_count: u32,
+}
+
+impl AppendResult {
+    pub fn new(start_offset: u64, end_offset: u64, messages_count: u32) -> 
Self {
+        Self {
+            start_offset,
+            end_offset,
+            messages_count,
+        }
+    }
+
+    /// Returns the number of offsets in the range.
+    #[inline]
+    pub fn offset_count(&self) -> u64 {
+        self.end_offset - self.start_offset + 1
+    }
+}
+
+/// Current offset state of a partition.
+///
+/// Tracks both the commit offset (visibility boundary) and write offset
+/// (highest written message). These may differ when there are prepared
+/// but uncommitted messages.
+///
+/// ```text
+/// Segment: [msg0][msg1][msg2][msg3][msg4][msg5][msg6][msg7]
+///                                     ▲              ▲
+///                               commit_offset   write_offset
+///                                   (4)             (7)
+///
+/// - Messages 0-4: COMMITTED (visible to consumers)
+/// - Messages 5-7: PREPARED but not committed (invisible)
+/// ```
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct PartitionOffsets {
+    /// Highest offset visible to consumers.
+    ///
+    /// All messages with `offset <= commit_offset` can be read via
+    /// `read_committed()` or `poll_messages()`.
+    pub commit_offset: u64,
+
+    /// Highest offset written to storage.
+    ///
+    /// This may be greater than `commit_offset` when there are prepared
+    /// but uncommitted messages (during the window between prepare and
+    /// commit in VSR).
+    ///
+    /// Invariant: `write_offset >= commit_offset`
+    pub write_offset: u64,
+}
+
+impl PartitionOffsets {
+    pub fn new(commit_offset: u64, write_offset: u64) -> Self {
+        debug_assert!(
+            write_offset >= commit_offset,
+            "write_offset ({}) must be >= commit_offset ({})",
+            write_offset,
+            commit_offset
+        );
+        Self {
+            commit_offset,
+            write_offset,
+        }
+    }
+
+    /// Create offsets for an empty partition.
+    pub fn empty() -> Self {
+        Self {
+            commit_offset: 0,
+            write_offset: 0,
+        }
+    }
+
+    /// Returns true if there are uncommitted (prepared) messages.
+    pub fn has_uncommitted(&self) -> bool {
+        self.write_offset > self.commit_offset
+    }
+
+    /// Returns the number of uncommitted messages.
+    pub fn uncommitted_count(&self) -> u64 {
+        self.write_offset - self.commit_offset
+    }
+
+    /// Returns true if commit and write offsets are equal.
+    pub fn is_fully_committed(&self) -> bool {
+        self.write_offset == self.commit_offset
+    }
+}
+
+impl Default for PartitionOffsets {
+    fn default() -> Self {
+        Self::empty()
+    }
+}

Reply via email to