This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 7db7eeae1 feat(cluster): add `Partitions` trait that unifies both
single-node and VSR operations (#2620)
7db7eeae1 is described below
commit 7db7eeae143e594b59abd5e923aa0e335c4ca4e4
Author: Krishna Vishal <[email protected]>
AuthorDate: Wed Feb 4 19:44:20 2026 +0530
feat(cluster): add `Partitions` trait that unifies both single-node and VSR
operations (#2620)
Add `Partitions` trait that unifies both single-node and VSR operations.
Added a `PartitionStorage` trait which can be used to mock the storage
for testing.
---------
Co-authored-by: Grzegorz Koszyk
<[email protected]>
---
core/partitions/src/lib.rs | 127 +++++++++++++++++++++++++++++++++++++++++--
core/partitions/src/types.rs | 122 +++++++++++++++++++++++++++++++++++------
2 files changed, 227 insertions(+), 22 deletions(-)
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index b679e05a9..1e157cd03 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -19,15 +19,130 @@ mod iggy_partition;
mod iggy_partitions;
mod types;
+use iggy_common::sharding::IggyNamespace;
pub use iggy_partition::IggyPartition;
pub use iggy_partitions::IggyPartitions;
-pub use types::{PollMetadata, PollingArgs, PollingConsumer,
SendMessagesResult};
+pub use types::{AppendResult, PartitionOffsets, PollingArgs, PollingConsumer,
SendMessagesResult};
-/// The core abstraction for partition operations in clustering.
+/// High-level partition operations for request handlers.
///
-/// This trait defines the data-plane operations for partitions that
-/// need to be coordinated across a cluster using viewstamped replication.
-/// Implementations can vary between single-node and clustered deployments.
+/// This trait abstracts over both single-node vs clustered modes.
+///
+/// # Implementation:
+///
+/// ## Single-node:
+///
+/// ```text
+/// send_messages() ──► storage.append_prepared()
+/// │
+/// ▼
+/// storage.advance_commit()
+/// │
+/// ▼
+/// Return success
+/// ```
+///
+/// ## Clustered
+///
+/// ```text
+/// send_messages() ──► Create Prepare(messages)
+/// │
+/// ▼
+/// Broadcast to replicas
+/// │
+/// ▼
+/// Replicas: storage.append_prepared()
+/// │
+/// ▼
+/// Wait for quorum PrepareOk
+/// │
+/// ▼
+/// storage.advance_commit()
+/// │
+/// ▼
+/// Return success
+/// ```
pub trait Partitions {
- // TODO(hubcio): define partition operations like poll, send, create,
delete, etc.
+ /// The message batch type for write operations.
+ type MessageBatch;
+
+ /// The result type returned from poll operations.
+ type PollResult;
+
+ type Error;
+
+ /// Send messages to a partition.
+ ///
+ /// Messages are appended atomically as a batch with sequentially assigned
+ /// offsets. Returns only:
+ /// - Single-node: after durable local write (fsync)
+ /// - Clustered: after VSR quorum acknowledgment
+ fn send_messages(
+ &self,
+ namespace: IggyNamespace,
+ batch: Self::MessageBatch,
+ ) -> impl Future<Output = Result<SendMessagesResult, Self::Error>>;
+
+ /// Store consumer offset for progress tracking.
+ ///
+ /// Persists the consumer's position, enabling resumption after restarts.
+ fn store_consumer_offset(
+ &self,
+ consumer: PollingConsumer,
+ namespace: IggyNamespace,
+ offset: u64,
+ ) -> impl Future<Output = Result<(), Self::Error>>;
+
+ /// Poll messages from a partition.
+ ///
+ /// Returns only **committed** messages according to the polling strategy.
+ /// Messages that are prepared but not yet committed are not visible.
+ fn poll_messages(
+ &self,
+ consumer: PollingConsumer,
+ namespace: IggyNamespace,
+ args: PollingArgs,
+ ) -> impl Future<Output = Result<Self::PollResult, Self::Error>>;
+
+ /// Get stored consumer offset.
+ ///
+ /// Returns the last stored offset, or 0 if none exists.
+ fn get_consumer_offset(
+ &self,
+ consumer: PollingConsumer,
+ namespace: IggyNamespace,
+ ) -> impl Future<Output = Result<u64, Self::Error>>;
+
+ /// Get partition's current commit offset.
+ ///
+ /// Returns the highest committed offset (visible to consumers).
+ /// In clustered mode, may lag slightly behind the primary.
+ fn get_offset(
+ &self,
+ namespace: IggyNamespace,
+ ) -> impl Future<Output = Result<u64, Self::Error>>;
+
+ /// Initialize storage for a new partition.
+ ///
+ /// Sets up segments directory, initial segment, and offset tracking.
+ fn create_partition(
+ &self,
+ namespace: IggyNamespace,
+ ) -> impl Future<Output = Result<(), Self::Error>>;
+
+ /// Delete partition and all its data.
+ ///
+ /// Removes messages, segments, indexes, and consumer offsets.
+ fn delete_partition(
+ &self,
+ namespace: IggyNamespace,
+ ) -> impl Future<Output = Result<(), Self::Error>>;
+
+ /// Purge all messages, preserving partition structure.
+ ///
+ /// Resets offset to 0. Consumer offsets become invalid.
+ fn purge_partition(
+ &self,
+ namespace: IggyNamespace,
+ ) -> impl Future<Output = Result<(), Self::Error>>;
}
diff --git a/core/partitions/src/types.rs b/core/partitions/src/types.rs
index 7ff1a9498..db33f980a 100644
--- a/core/partitions/src/types.rs
+++ b/core/partitions/src/types.rs
@@ -35,22 +35,6 @@ impl PollingArgs {
}
}
-/// Metadata returned from a poll operation.
-#[derive(Debug, Clone)]
-pub struct PollMetadata {
- pub partition_id: u32,
- pub current_offset: u64,
-}
-
-impl PollMetadata {
- pub fn new(partition_id: u32, current_offset: u64) -> Self {
- Self {
- partition_id,
- current_offset,
- }
- }
-}
-
/// Result of sending messages.
#[derive(Debug)]
pub struct SendMessagesResult {
@@ -66,3 +50,109 @@ pub enum PollingConsumer {
/// Consumer group with (group_id, member_id)
ConsumerGroup(usize, usize),
}
+
+/// Result of appending messages during the prepare phase.
+///
+/// Indicates the offset range assigned to the appended messages.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct AppendResult {
+ /// First offset assigned to the batch.
+ pub start_offset: u64,
+ /// Last offset assigned to the batch (inclusive).
+ pub end_offset: u64,
+ /// Number of messages in the batch.
+ pub messages_count: u32,
+}
+
+impl AppendResult {
+ pub fn new(start_offset: u64, end_offset: u64, messages_count: u32) ->
Self {
+ Self {
+ start_offset,
+ end_offset,
+ messages_count,
+ }
+ }
+
+ /// Returns the number of offsets in the range.
+ #[inline]
+ pub fn offset_count(&self) -> u64 {
+ self.end_offset - self.start_offset + 1
+ }
+}
+
+/// Current offset state of a partition.
+///
+/// Tracks both the commit offset (visibility boundary) and write offset
+/// (highest written message). These may differ when there are prepared
+/// but uncommitted messages.
+///
+/// ```text
+/// Segment: [msg0][msg1][msg2][msg3][msg4][msg5][msg6][msg7]
+/// ▲ ▲
+/// commit_offset write_offset
+/// (4) (7)
+///
+/// - Messages 0-4: COMMITTED (visible to consumers)
+/// - Messages 5-7: PREPARED but not committed (invisible)
+/// ```
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct PartitionOffsets {
+ /// Highest offset visible to consumers.
+ ///
+ /// All messages with `offset <= commit_offset` can be read via
+ /// `read_committed()` or `poll_messages()`.
+ pub commit_offset: u64,
+
+ /// Highest offset written to storage.
+ ///
+ /// This may be greater than `commit_offset` when there are prepared
+ /// but uncommitted messages (during the window between prepare and
+ /// commit in VSR).
+ ///
+ /// Invariant: `write_offset >= commit_offset`
+ pub write_offset: u64,
+}
+
+impl PartitionOffsets {
+ pub fn new(commit_offset: u64, write_offset: u64) -> Self {
+ debug_assert!(
+ write_offset >= commit_offset,
+ "write_offset ({}) must be >= commit_offset ({})",
+ write_offset,
+ commit_offset
+ );
+ Self {
+ commit_offset,
+ write_offset,
+ }
+ }
+
+ /// Create offsets for an empty partition.
+ pub fn empty() -> Self {
+ Self {
+ commit_offset: 0,
+ write_offset: 0,
+ }
+ }
+
+ /// Returns true if there are uncommitted (prepared) messages.
+ pub fn has_uncommitted(&self) -> bool {
+ self.write_offset > self.commit_offset
+ }
+
+ /// Returns the number of uncommitted messages.
+ pub fn uncommitted_count(&self) -> u64 {
+ self.write_offset - self.commit_offset
+ }
+
+ /// Returns true if commit and write offsets are equal.
+ pub fn is_fully_committed(&self) -> bool {
+ self.write_offset == self.commit_offset
+ }
+}
+
+impl Default for PartitionOffsets {
+ fn default() -> Self {
+ Self::empty()
+ }
+}