This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch command-handler-impr
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit d521d529b13aa405bcc2006bd0340f91a639ae3f
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Feb 12 00:43:11 2026 +0100

    refactor(server): serialize all partition mutations through message pump
    
    Per-partition write locks (TokioMutex) masked a deeper issue:
    periodic tasks (message_saver, message_cleaner) could interleave
    with append handlers across async suspension points, creating
    subtle TOCTOU races on segment indices and storage vectors.
    
    The fix routes every partition mutation — appends, flushes,
    segment deletion — exclusively through the message pump's
    single-threaded loop. Periodic tasks now enqueue requests via
    the data-plane channel and await responses, replacing direct
    local_partitions access. This eliminates write locks entirely
    and guarantees strict ordering.
    
    Also: pump drains in-flight frames and performs flush+fsync on
    shutdown (replacing message_saver's on_shutdown hook), writers
    truncate when reusing segment paths after full deletion, and
    partition loading creates an initial segment when none exist
    on disk.
---
 Cargo.lock                                         |   1 -
 core/server/Cargo.toml                             |   1 -
 core/server/src/shard/handlers.rs                  |  39 +-
 core/server/src/shard/mod.rs                       |  26 +-
 core/server/src/shard/system/messages.rs           | 104 +-----
 core/server/src/shard/system/segments.rs           | 267 +++++++++++++-
 .../src/shard/tasks/continuous/message_pump.rs     |  81 +++-
 .../src/shard/tasks/periodic/message_cleaner.rs    | 408 ++-------------------
 .../src/shard/tasks/periodic/message_saver.rs      |  74 +---
 core/server/src/shard/transmission/frame.rs        |   8 +-
 core/server/src/shard/transmission/message.rs      |   5 +
 .../src/streaming/partitions/local_partition.rs    |   4 -
 .../src/streaming/segments/indexes/index_writer.rs |  11 +-
 .../streaming/segments/messages/messages_writer.rs |  11 +-
 14 files changed, 488 insertions(+), 552 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index d40a0c839..7f33c0c32 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8321,7 +8321,6 @@ dependencies = [
  "sysinfo 0.38.1",
  "tempfile",
  "thiserror 2.0.18",
- "tokio",
  "toml 0.9.11+spec-1.1.0",
  "tower-http",
  "tracing",
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 433a1d7a8..15f7978f8 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -98,7 +98,6 @@ strum = { workspace = true }
 sysinfo = { workspace = true }
 tempfile = { workspace = true }
 thiserror = { workspace = true }
-tokio = { workspace = true, features = ["sync"] }
 toml = { workspace = true }
 tower-http = { workspace = true }
 tracing = { workspace = true }
diff --git a/core/server/src/shard/handlers.rs 
b/core/server/src/shard/handlers.rs
index a125f6441..89783476e 100644
--- a/core/server/src/shard/handlers.rs
+++ b/core/server/src/shard/handlers.rs
@@ -75,10 +75,26 @@ async fn handle_request(
             Ok(ShardResponse::SendMessages)
         }
         ShardRequestPayload::PollMessages { args, consumer } => {
-            let auto_commit = args.auto_commit;
-
             let namespace = namespace.expect("PollMessages requires routing 
namespace");
 
+            if args.count == 0 {
+                let current_offset = shard
+                    .local_partitions
+                    .borrow()
+                    .get(&namespace)
+                    .map(|p| 
p.offset.load(std::sync::atomic::Ordering::Relaxed))
+                    .unwrap_or(0);
+                return Ok(ShardResponse::PollMessages((
+                    iggy_common::IggyPollMetadata::new(
+                        namespace.partition_id() as u32,
+                        current_offset,
+                    ),
+                    crate::streaming::segments::IggyMessagesBatchSet::empty(),
+                )));
+            }
+
+            let auto_commit = args.auto_commit;
+
             shard.ensure_partition(&namespace).await?;
 
             let (poll_metadata, batches) = shard
@@ -97,10 +113,10 @@ async fn handle_request(
         }
         ShardRequestPayload::FlushUnsavedBuffer { fsync } => {
             let ns = namespace.expect("FlushUnsavedBuffer requires routing 
namespace");
-            shard
-                .flush_unsaved_buffer_base(ns.stream_id(), ns.topic_id(), 
ns.partition_id(), fsync)
+            let flushed_count = shard
+                .flush_unsaved_buffer_from_local_partitions(&ns, fsync)
                 .await?;
-            Ok(ShardResponse::FlushUnsavedBuffer)
+            Ok(ShardResponse::FlushUnsavedBuffer { flushed_count })
         }
         ShardRequestPayload::DeleteSegments { segments_count } => {
             let ns = namespace.expect("DeleteSegments requires routing 
namespace");
@@ -114,6 +130,19 @@ async fn handle_request(
                 .await?;
             Ok(ShardResponse::DeleteSegments)
         }
+        ShardRequestPayload::CleanTopicMessages {
+            stream_id,
+            topic_id,
+            partition_ids,
+        } => {
+            let (deleted_segments, deleted_messages) = shard
+                .clean_topic_messages(stream_id, topic_id, &partition_ids)
+                .await?;
+            Ok(ShardResponse::CleanTopicMessages {
+                deleted_segments,
+                deleted_messages,
+            })
+        }
         ShardRequestPayload::CreatePartitionsRequest { user_id, command } => {
             assert_eq!(
                 shard.id, 0,
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index ea4e14cfa..6d89b1523 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -269,7 +269,31 @@ impl IggyShard {
                 )
                 .await
                 {
-                    Ok(loaded_log) => {
+                    Ok(mut loaded_log) => {
+                        if !loaded_log.has_segments() {
+                            info!(
+                                "No segments found on disk for partition ID: 
{} for topic ID: {} for stream ID: {}, creating initial segment",
+                                partition_id, topic_id, stream_id
+                            );
+                            let segment = 
crate::streaming::segments::Segment::new(
+                                0,
+                                self.config.system.segment.size,
+                            );
+                            let storage =
+                                
crate::streaming::segments::storage::create_segment_storage(
+                                    &self.config.system,
+                                    stream_id,
+                                    topic_id,
+                                    partition_id,
+                                    0,
+                                    0,
+                                    0,
+                                )
+                                .await?;
+                            loaded_log.add_persisted_segment(segment, storage);
+                            stats.increment_segments_count(1);
+                        }
+
                         let current_offset = 
loaded_log.active_segment().end_offset;
                         stats.set_current_offset(current_offset);
 
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 5307b5c04..bc07ea457 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -54,30 +54,14 @@ impl IggyShard {
             partition.partition_id,
         );
 
-        // Fast path: if partition exists locally, append directly without 
channel routing
-        let is_local = self.local_partitions.borrow().contains(&namespace);
-        if is_local {
-            let batch = self.maybe_encrypt_messages(batch)?;
-            let messages_count = batch.count();
-
-            self.append_messages_to_local_partition(&namespace, batch, 
&self.config.system)
-                .await?;
-
-            self.metrics.increment_messages(messages_count as u64);
-            return Ok(());
-        }
-
-        // Slow path: route through data-plane channel to target shard
         let payload = ShardRequestPayload::SendMessages { batch };
         let request = ShardRequest::data_plane(namespace, payload);
 
         match self.send_to_data_plane(request).await? {
-            ShardResponse::SendMessages => {}
-            ShardResponse::ErrorResponse(err) => return Err(err),
+            ShardResponse::SendMessages => Ok(()),
+            ShardResponse::ErrorResponse(err) => Err(err),
             _ => unreachable!("Expected SendMessages response"),
         }
-
-        Ok(())
     }
 
     /// Polls messages from partition. Permission must be checked by caller via
@@ -103,45 +87,6 @@ impl IggyShard {
 
         let namespace = IggyNamespace::new(topic.stream_id, topic.topic_id, 
partition_id);
 
-        // Fast path: if partition exists locally, poll directly without 
channel routing
-        if self.local_partitions.borrow().contains(&namespace) {
-            if args.count == 0 {
-                let current_offset = self
-                    .local_partitions
-                    .borrow()
-                    .get(&namespace)
-                    .map(|data| data.offset.load(Ordering::Relaxed))
-                    .unwrap_or(0);
-                return Ok((
-                    IggyPollMetadata::new(partition_id as u32, current_offset),
-                    IggyMessagesBatchSet::empty(),
-                ));
-            }
-
-            let auto_commit = args.auto_commit;
-
-            let (poll_metadata, batches) = self
-                .poll_messages_from_local_partition(&namespace, consumer, args)
-                .await?;
-
-            if auto_commit && !batches.is_empty() {
-                let offset = batches
-                    .last_offset()
-                    .expect("Batch set should have at least one batch");
-                
self.auto_commit_consumer_offset_from_local_partition(&namespace, consumer, 
offset)
-                    .await?;
-            }
-
-            let batches = if let Some(encryptor) = &self.encryptor {
-                self.decrypt_messages(batches, encryptor).await?
-            } else {
-                batches
-            };
-
-            return Ok((poll_metadata, batches));
-        }
-
-        // Slow path: route through data-plane channel to target shard
         let payload = ShardRequestPayload::PollMessages { consumer, args };
         let request = ShardRequest::data_plane(namespace, payload);
 
@@ -181,24 +126,12 @@ impl IggyShard {
         let request = ShardRequest::data_plane(namespace, payload);
 
         match self.send_to_data_plane(request).await? {
-            ShardResponse::FlushUnsavedBuffer => Ok(()),
+            ShardResponse::FlushUnsavedBuffer { .. } => Ok(()),
             ShardResponse::ErrorResponse(err) => Err(err),
             _ => unreachable!("Expected FlushUnsavedBuffer response"),
         }
     }
 
-    pub(crate) async fn flush_unsaved_buffer_base(
-        &self,
-        stream: usize,
-        topic: usize,
-        partition_id: usize,
-        fsync: bool,
-    ) -> Result<u32, IggyError> {
-        let namespace = IggyNamespace::new(stream, topic, partition_id);
-        self.flush_unsaved_buffer_from_local_partitions(&namespace, fsync)
-            .await
-    }
-
     /// Flushes unsaved messages from the partition store to disk.
     /// Returns the number of messages saved.
     pub(crate) async fn flush_unsaved_buffer_from_local_partitions(
@@ -206,16 +139,6 @@ impl IggyShard {
         namespace: &IggyNamespace,
         fsync: bool,
     ) -> Result<u32, IggyError> {
-        let write_lock = {
-            let partitions = self.local_partitions.borrow();
-            let Some(partition) = partitions.get(namespace) else {
-                return Ok(0);
-            };
-            partition.write_lock.clone()
-        };
-
-        let _write_guard = write_lock.lock().await;
-
         let frozen_batches = {
             let mut partitions = self.local_partitions.borrow_mut();
             let Some(partition) = partitions.get_mut(namespace) else {
@@ -361,22 +284,17 @@ impl IggyShard {
         Ok(())
     }
 
-    pub async fn append_messages_to_local_partition(
+    /// Appends a batch to the active segment, flushing to disk and rotating 
if needed.
+    ///
+    /// Safety: called exclusively from the message pump — segment indices 
captured before
+    /// internal `.await` points (prepare_for_persistence, persist, rotate) 
remain valid
+    /// because no other handler can modify the segment vec while this frame 
is in progress.
+    pub(crate) async fn append_messages_to_local_partition(
         &self,
         namespace: &IggyNamespace,
         mut batch: IggyMessagesBatchMut,
         config: &crate::configs::system::SystemConfig,
     ) -> Result<(), IggyError> {
-        let write_lock = {
-            let partitions = self.local_partitions.borrow();
-            let partition = partitions
-                .get(namespace)
-                .expect("local_partitions: partition must exist");
-            partition.write_lock.clone()
-        };
-
-        let _write_guard = write_lock.lock().await;
-
         let (
             current_offset,
             current_position,
@@ -576,9 +494,7 @@ impl IggyShard {
                 .get_mut(namespace)
                 .expect("local_partitions: partition must exist");
 
-            // Recalculate index: segment deletion during async I/O shifts 
indices
             let segment_index = partition.log.segments().len() - 1;
-
             let indexes = partition.log.indexes_mut()[segment_index]
                 .as_mut()
                 .expect("indexes must exist for segment being persisted");
@@ -594,7 +510,7 @@ impl IggyShard {
         Ok(batch_count)
     }
 
-    pub async fn poll_messages_from_local_partition(
+    pub(crate) async fn poll_messages_from_local_partition(
         &self,
         namespace: &IggyNamespace,
         consumer: crate::streaming::polling_consumer::PollingConsumer,
diff --git a/core/server/src/shard/system/segments.rs 
b/core/server/src/shard/system/segments.rs
index bf15b7b39..8940350ff 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -18,10 +18,258 @@
 use crate::configs::cache_indexes::CacheIndexesConfig;
 use crate::shard::IggyShard;
 use crate::streaming::segments::Segment;
-use iggy_common::IggyError;
 use iggy_common::sharding::IggyNamespace;
+use iggy_common::{IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize};
 
 impl IggyShard {
+    /// Performs all cleanup for a topic's partitions: time-based expiry then 
size-based trimming.
+    ///
+    /// Runs entirely inside the message pump's serialized loop — reads 
partition state and
+    /// deletes segments atomically with no TOCTOU window.
+    pub(crate) async fn clean_topic_messages(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_ids: &[usize],
+    ) -> Result<(u64, u64), IggyError> {
+        let (expiry, max_topic_size) = self
+            .metadata
+            .get_topic_config(stream_id, topic_id)
+            .unwrap_or((
+                self.config.system.topic.message_expiry,
+                MaxTopicSize::Unlimited,
+            ));
+
+        let mut total_segments = 0u64;
+        let mut total_messages = 0u64;
+
+        // Phase 1: time-based expiry
+        if !matches!(expiry, IggyExpiry::NeverExpire) {
+            let now = IggyTimestamp::now();
+            for &partition_id in partition_ids {
+                let (s, m) = self
+                    .delete_expired_segments_for_partition(
+                        stream_id,
+                        topic_id,
+                        partition_id,
+                        now,
+                        expiry,
+                    )
+                    .await?;
+                total_segments += s;
+                total_messages += m;
+            }
+        }
+
+        // Phase 2: size-based trimming
+        if !matches!(max_topic_size, MaxTopicSize::Unlimited) {
+            let max_bytes = max_topic_size.as_bytes_u64();
+            let threshold = max_bytes * 9 / 10;
+
+            loop {
+                let current_size = self
+                    .metadata
+                    .with_metadata(|m| {
+                        m.streams
+                            .get(stream_id)
+                            .and_then(|s| s.topics.get(topic_id))
+                            .map(|t| t.stats.size_bytes_inconsistent())
+                    })
+                    .unwrap_or(0);
+
+                if current_size < threshold {
+                    break;
+                }
+
+                let Some((target_partition_id, target_offset)) =
+                    self.find_oldest_sealed_segment(stream_id, topic_id, 
partition_ids)
+                else {
+                    break;
+                };
+
+                let (s, m) = self
+                    .remove_segment_by_offset(
+                        stream_id,
+                        topic_id,
+                        target_partition_id,
+                        target_offset,
+                    )
+                    .await?;
+                if s == 0 {
+                    break;
+                }
+                total_segments += s;
+                total_messages += m;
+            }
+        }
+
+        Ok((total_segments, total_messages))
+    }
+
+    /// Deletes all expired sealed segments from a single partition.
+    async fn delete_expired_segments_for_partition(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        now: IggyTimestamp,
+        expiry: IggyExpiry,
+    ) -> Result<(u64, u64), IggyError> {
+        let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
+
+        let expired_offsets: Vec<u64> = {
+            let partitions = self.local_partitions.borrow();
+            let Some(partition) = partitions.get(&ns) else {
+                return Ok((0, 0));
+            };
+            let segments = partition.log.segments();
+            let last_idx = segments.len().saturating_sub(1);
+
+            segments
+                .iter()
+                .enumerate()
+                .filter(|(idx, seg)| *idx != last_idx && seg.is_expired(now, 
expiry))
+                .map(|(_, seg)| seg.start_offset)
+                .collect()
+        };
+
+        let mut total_segments = 0u64;
+        let mut total_messages = 0u64;
+        for offset in expired_offsets {
+            let (s, m) = self
+                .remove_segment_by_offset(stream_id, topic_id, partition_id, 
offset)
+                .await?;
+            total_segments += s;
+            total_messages += m;
+        }
+        Ok((total_segments, total_messages))
+    }
+
+    /// Finds the oldest sealed segment across the given partitions, comparing 
by timestamp.
+    /// Returns `(partition_id, start_offset)` or `None` if no deletable 
segments exist.
+    fn find_oldest_sealed_segment(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_ids: &[usize],
+    ) -> Option<(usize, u64)> {
+        let partitions = self.local_partitions.borrow();
+        let mut oldest: Option<(usize, u64, u64)> = None;
+
+        for &partition_id in partition_ids {
+            let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
+            let Some(partition) = partitions.get(&ns) else {
+                continue;
+            };
+
+            let segments = partition.log.segments();
+            if segments.len() <= 1 {
+                continue;
+            }
+
+            let first = &segments[0];
+            if !first.sealed {
+                continue;
+            }
+
+            match &oldest {
+                None => oldest = Some((partition_id, first.start_offset, 
first.start_timestamp)),
+                Some((_, _, ts)) if first.start_timestamp < *ts => {
+                    oldest = Some((partition_id, first.start_offset, 
first.start_timestamp));
+                }
+                _ => {}
+            }
+        }
+
+        oldest.map(|(pid, offset, _)| (pid, offset))
+    }
+
+    /// Removes a single segment identified by its start_offset from the given 
partition.
+    /// Skips if the segment no longer exists or is the active (last) segment.
+    async fn remove_segment_by_offset(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        start_offset: u64,
+    ) -> Result<(u64, u64), IggyError> {
+        let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
+
+        let removed = {
+            let mut partitions = self.local_partitions.borrow_mut();
+            let Some(partition) = partitions.get_mut(&ns) else {
+                return Ok((0, 0));
+            };
+
+            let log = &mut partition.log;
+            let last_idx = log.segments().len().saturating_sub(1);
+
+            let Some(idx) = log
+                .segments()
+                .iter()
+                .position(|s| s.start_offset == start_offset)
+            else {
+                return Ok((0, 0));
+            };
+
+            if idx == last_idx {
+                tracing::warn!(
+                    "Refusing to delete active segment (start_offset: 
{start_offset}) \
+                     for partition ID: {partition_id}"
+                );
+                return Ok((0, 0));
+            }
+
+            let segment = log.segments_mut().remove(idx);
+            let storage = log.storages_mut().remove(idx);
+            log.indexes_mut().remove(idx);
+
+            Some((segment, storage, partition.stats.clone()))
+        };
+
+        let Some((segment, mut storage, stats)) = removed else {
+            return Ok((0, 0));
+        };
+
+        let segment_size = segment.size.as_bytes_u64();
+        let end_offset = segment.end_offset;
+        let messages_in_segment = if start_offset == end_offset {
+            0
+        } else {
+            (end_offset - start_offset) + 1
+        };
+
+        let _ = storage.shutdown();
+        let (messages_path, index_path) = storage.segment_and_index_paths();
+
+        if let Some(path) = messages_path
+            && let Err(e) = compio::fs::remove_file(&path).await
+        {
+            tracing::error!("Failed to delete messages file {}: {}", path, e);
+        }
+
+        if let Some(path) = index_path
+            && let Err(e) = compio::fs::remove_file(&path).await
+        {
+            tracing::error!("Failed to delete index file {}: {}", path, e);
+        }
+
+        stats.decrement_size_bytes(segment_size);
+        stats.decrement_segments_count(1);
+        stats.decrement_messages_count(messages_in_segment);
+
+        tracing::info!(
+            "Deleted segment (start: {}, end: {}, size: {}, messages: {}) from 
partition {}",
+            start_offset,
+            end_offset,
+            segment_size,
+            messages_in_segment,
+            partition_id
+        );
+
+        Ok((1, messages_in_segment))
+    }
+
     pub(crate) async fn delete_segments(
         &self,
         stream: usize,
@@ -122,6 +370,13 @@ impl IggyShard {
         Ok(())
     }
 
+    /// Creates a fresh segment at offset 0 after all segments have been 
drained.
+    ///
+    /// The log is momentarily empty between `delete_segments`' drain and this 
call, which is
+    /// safe because the message pump serializes all handlers — no concurrent 
operation can
+    /// observe the empty state. The new segment reuses the offset-0 file 
path; writers
+    /// open with `truncate(true)` when `!file_exists` to clear any stale data 
from a
+    /// previous incarnation at the same path.
     async fn init_log_in_local_partitions(
         &self,
         namespace: &IggyNamespace,
@@ -157,6 +412,10 @@ impl IggyShard {
     /// Rotate to a new segment when the current segment is full.
     /// The new segment starts at the next offset after the current segment's 
end.
     /// Seals the old segment so it becomes eligible for expiry-based cleanup.
+    ///
+    /// Safety: called exclusively from the message pump (via append handler) 
— the captured
+    /// `old_segment_index` remains valid across the `create_segment_storage` 
await because
+    /// no other handler can modify the segment vec while this frame is in 
progress.
     pub(crate) async fn rotate_segment_in_local_partitions(
         &self,
         namespace: &IggyNamespace,
@@ -206,9 +465,11 @@ impl IggyShard {
             partition.log.add_persisted_segment(segment, storage);
             partition.stats.increment_segments_count(1);
             tracing::info!(
-                "Rotated to new segment at offset {} for partition {}",
+                "Rotated to new segment at offset {} for partition {} (stream 
{}, topic {})",
                 start_offset,
-                namespace.partition_id()
+                namespace.partition_id(),
+                namespace.stream_id(),
+                namespace.topic_id()
             );
         }
         Ok(())
diff --git a/core/server/src/shard/tasks/continuous/message_pump.rs 
b/core/server/src/shard/tasks/continuous/message_pump.rs
index 97e1f341d..85ed29c25 100644
--- a/core/server/src/shard/tasks/continuous/message_pump.rs
+++ b/core/server/src/shard/tasks/continuous/message_pump.rs
@@ -21,7 +21,7 @@ use crate::shard::transmission::frame::ShardFrame;
 use crate::shard::{IggyShard, handlers::handle_shard_message};
 use futures::FutureExt;
 use std::rc::Rc;
-use tracing::{debug, info};
+use tracing::{debug, error, info};
 
 pub fn spawn_message_pump(shard: Rc<IggyShard>) {
     let shard_clone = shard.clone();
@@ -33,6 +33,23 @@ pub fn spawn_message_pump(shard: Rc<IggyShard>) {
         .spawn();
 }
 
+/// Single serialization point for all partition mutations on this shard.
+///
+/// Every operation that mutates `local_partitions` — appends, segment 
rotation, flush,
+/// segment deletion — is dispatched exclusively through this pump. The loop 
awaits each
+/// `process_frame` to completion before dequeuing the next message, so 
handlers never
+/// interleave even across internal `.await` points (disk I/O, fsync).
+///
+/// Periodic tasks (message_saver, message_cleaner) run as separate futures on 
the same
+/// compio thread but **cannot** mutate partitions directly. They read 
partition metadata
+/// via `borrow()` and enqueue mutation requests back into this pump's 
channel. Those
+/// requests block on a response that is only sent after the current frame 
completes,
+/// guaranteeing strict ordering.
+///
+/// This invariant replaces per-partition write locks and eliminates TOCTOU 
races between
+/// concurrent handlers. All `pub(crate)` mutation methods on `IggyShard` (e.g.
+/// `append_messages_to_local_partition`, `delete_expired_segments`,
+/// `rotate_segment_in_local_partitions`) assume they are called from within 
this pump.
 async fn message_pump(
     shard: Rc<IggyShard>,
     shutdown: ShutdownToken,
@@ -44,24 +61,17 @@ async fn message_pump(
 
     info!("Starting message passing task");
 
-    // Get the inner flume receiver directly
     let receiver = messages_receiver.inner;
 
     loop {
         futures::select! {
             _ = shutdown.wait().fuse() => {
-                debug!("Message receiver shutting down");
+                debug!("Message pump shutting down");
                 break;
             }
             frame = receiver.recv_async().fuse() => {
                 match frame {
-                    Ok(ShardFrame { message, response_sender }) => {
-                        if let (Some(response), Some(tx)) =
-                            (handle_shard_message(&shard, message).await, 
response_sender)
-                        {
-                             let _ = tx.send(response).await;
-                        }
-                    }
+                    Ok(frame) => process_frame(&shard, frame).await,
                     Err(_) => {
                         debug!("Message receiver closed; exiting pump");
                         break;
@@ -71,5 +81,56 @@ async fn message_pump(
         }
     }
 
+    // Drain remaining frames before flushing — any in-flight appends must
+    // complete so their data lands in the journal before we flush to disk.
+    while let Ok(frame) = receiver.try_recv() {
+        process_frame(&shard, frame).await;
+    }
+
+    flush_and_fsync_all_partitions(&shard).await;
+
     Ok(())
 }
+
+async fn process_frame(shard: &Rc<IggyShard>, frame: ShardFrame) {
+    let ShardFrame {
+        message,
+        response_sender,
+    } = frame;
+    if let (Some(response), Some(tx)) =
+        (handle_shard_message(shard, message).await, response_sender)
+    {
+        let _ = tx.send(response).await;
+    }
+}
+
+/// Final flush + fsync of all local partitions. Runs inside the pump after
+/// the main loop exits, so no other pump frame can interleave.
+async fn flush_and_fsync_all_partitions(shard: &Rc<IggyShard>) {
+    let namespaces = shard.get_current_shard_namespaces();
+    if namespaces.is_empty() {
+        return;
+    }
+
+    let mut flushed = 0u32;
+    for ns in &namespaces {
+        match shard
+            .flush_unsaved_buffer_from_local_partitions(ns, false)
+            .await
+        {
+            Ok(saved) if saved > 0 => flushed += 1,
+            Ok(_) => {}
+            Err(e) => error!("Shutdown flush failed for partition {:?}: {}", 
ns, e),
+        }
+    }
+    if flushed > 0 {
+        info!("Shutdown: flushed {flushed} partitions.");
+    }
+
+    for ns in &namespaces {
+        if let Err(e) = 
shard.fsync_all_messages_from_local_partitions(ns).await {
+            error!("Shutdown fsync failed for partition {:?}: {}", ns, e);
+        }
+    }
+    info!("Shutdown: fsync complete for all partitions.");
+}
diff --git a/core/server/src/shard/tasks/periodic/message_cleaner.rs 
b/core/server/src/shard/tasks/periodic/message_cleaner.rs
index 5e33428c2..c0443da6e 100644
--- a/core/server/src/shard/tasks/periodic/message_cleaner.rs
+++ b/core/server/src/shard/tasks/periodic/message_cleaner.rs
@@ -17,10 +17,12 @@
  */
 
 use crate::shard::IggyShard;
+use crate::shard::transmission::frame::ShardResponse;
+use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload};
+use iggy_common::IggyError;
 use iggy_common::sharding::IggyNamespace;
-use iggy_common::{IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize};
 use std::rc::Rc;
-use tracing::{debug, error, info, trace, warn};
+use tracing::{error, info, trace};
 
 pub fn spawn_message_cleaner(shard: Rc<IggyShard>) {
     if !shard.config.data_maintenance.messages.cleaner_enabled {
@@ -43,24 +45,23 @@ pub fn spawn_message_cleaner(shard: Rc<IggyShard>) {
         .task_registry
         .periodic("clean_messages")
         .every(period)
-        .tick(move |_shutdown| clean_expired_messages(shard_clone.clone()))
+        .tick(move |_shutdown| clean_messages(shard_clone.clone()))
         .spawn();
 }
 
-async fn clean_expired_messages(shard: Rc<IggyShard>) -> Result<(), IggyError> 
{
+/// Groups namespaces by topic and sends a single `CleanTopicMessages` per 
topic to the pump.
+/// All segment inspection and deletion happens inside the pump handler — no 
TOCTOU.
+async fn clean_messages(shard: Rc<IggyShard>) -> Result<(), IggyError> {
     trace!("Cleaning expired messages...");
 
     let namespaces = shard.get_current_shard_namespaces();
-    let now = IggyTimestamp::now();
 
     let mut topics: std::collections::HashMap<(usize, usize), Vec<usize>> =
         std::collections::HashMap::new();
 
     for ns in namespaces {
-        let stream_id = ns.stream_id();
-        let topic_id = ns.topic_id();
         topics
-            .entry((stream_id, topic_id))
+            .entry((ns.stream_id(), ns.topic_id()))
             .or_default()
             .push(ns.partition_id());
     }
@@ -69,63 +70,44 @@ async fn clean_expired_messages(shard: Rc<IggyShard>) -> 
Result<(), IggyError> {
     let mut total_deleted_messages = 0u64;
 
     for ((stream_id, topic_id), partition_ids) in topics {
-        let mut topic_deleted_segments = 0u64;
-        let mut topic_deleted_messages = 0u64;
-
-        // Phase 1: Time-based expiry cleanup per partition
-        for &partition_id in &partition_ids {
-            let expired_result =
-                handle_expired_segments(&shard, stream_id, topic_id, 
partition_id, now).await;
-
-            match expired_result {
-                Ok(deleted) => {
-                    topic_deleted_segments += deleted.segments_count;
-                    topic_deleted_messages += deleted.messages_count;
-                }
-                Err(err) => {
-                    error!(
-                        "Failed to clean expired segments for stream ID: {}, 
topic ID: {}, partition ID: {}. Error: {}",
-                        stream_id, topic_id, partition_id, err
+        let ns = IggyNamespace::new(stream_id, topic_id, partition_ids[0]);
+        let payload = ShardRequestPayload::CleanTopicMessages {
+            stream_id,
+            topic_id,
+            partition_ids,
+        };
+        let request = ShardRequest::data_plane(ns, payload);
+
+        match shard.send_to_data_plane(request).await {
+            Ok(ShardResponse::CleanTopicMessages {
+                deleted_segments,
+                deleted_messages,
+            }) => {
+                if deleted_segments > 0 {
+                    info!(
+                        "Deleted {} segments and {} messages for stream {}, 
topic {}",
+                        deleted_segments, deleted_messages, stream_id, topic_id
                     );
+                    shard.metrics.decrement_segments(deleted_segments as u32);
+                    shard.metrics.decrement_messages(deleted_messages);
+                    total_deleted_segments += deleted_segments;
+                    total_deleted_messages += deleted_messages;
                 }
             }
-        }
-
-        // Phase 2: Size-based cleanup at topic level (fair across partitions)
-        let size_result =
-            handle_size_based_cleanup(&shard, stream_id, topic_id, 
&partition_ids).await;
-
-        match size_result {
-            Ok(deleted) => {
-                topic_deleted_segments += deleted.segments_count;
-                topic_deleted_messages += deleted.messages_count;
+            Ok(ShardResponse::ErrorResponse(err)) => {
+                error!(
+                    "Failed to clean messages for stream {}, topic {}: {}",
+                    stream_id, topic_id, err
+                );
             }
+            Ok(_) => unreachable!("Expected CleanTopicMessages response"),
             Err(err) => {
                 error!(
-                    "Failed to clean segments by size for stream ID: {}, topic 
ID: {}. Error: {}",
+                    "Failed to send CleanTopicMessages for stream {}, topic 
{}: {}",
                     stream_id, topic_id, err
                 );
             }
         }
-
-        if topic_deleted_segments > 0 {
-            info!(
-                "Deleted {} segments and {} messages for stream ID: {}, topic 
ID: {}",
-                topic_deleted_segments, topic_deleted_messages, stream_id, 
topic_id
-            );
-            total_deleted_segments += topic_deleted_segments;
-            total_deleted_messages += topic_deleted_messages;
-
-            shard
-                .metrics
-                .decrement_segments(topic_deleted_segments as u32);
-            shard.metrics.decrement_messages(topic_deleted_messages);
-        } else {
-            trace!(
-                "No segments were deleted for stream ID: {}, topic ID: {}",
-                stream_id, topic_id
-            );
-        }
     }
 
     if total_deleted_segments > 0 {
@@ -137,319 +119,3 @@ async fn clean_expired_messages(shard: Rc<IggyShard>) -> 
Result<(), IggyError> {
 
     Ok(())
 }
-
-#[derive(Debug, Default)]
-struct DeletedSegments {
-    pub segments_count: u64,
-    pub messages_count: u64,
-}
-
-impl DeletedSegments {
-    fn add(&mut self, other: &DeletedSegments) {
-        self.segments_count += other.segments_count;
-        self.messages_count += other.messages_count;
-    }
-}
-
-async fn handle_expired_segments(
-    shard: &Rc<IggyShard>,
-    stream_id: usize,
-    topic_id: usize,
-    partition_id: usize,
-    now: IggyTimestamp,
-) -> Result<DeletedSegments, IggyError> {
-    let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
-
-    let expiry = shard
-        .metadata
-        .get_topic_config(stream_id, topic_id)
-        .map(|(exp, _)| exp)
-        .unwrap_or(shard.config.system.topic.message_expiry);
-
-    if matches!(expiry, IggyExpiry::NeverExpire) {
-        return Ok(DeletedSegments::default());
-    }
-
-    let expired_segment_offsets: Vec<u64> = {
-        let partitions = shard.local_partitions.borrow();
-        let Some(partition) = partitions.get(&ns) else {
-            return Ok(DeletedSegments::default());
-        };
-        let segments = partition.log.segments();
-        let last_idx = segments.len().saturating_sub(1);
-
-        segments
-            .iter()
-            .enumerate()
-            .filter(|(idx, segment)| *idx != last_idx && 
segment.is_expired(now, expiry))
-            .map(|(_, segment)| segment.start_offset)
-            .collect()
-    };
-
-    if expired_segment_offsets.is_empty() {
-        return Ok(DeletedSegments::default());
-    }
-
-    debug!(
-        "Found {} expired segments for stream ID: {}, topic ID: {}, partition 
ID: {}",
-        expired_segment_offsets.len(),
-        stream_id,
-        topic_id,
-        partition_id
-    );
-
-    delete_segments(
-        shard,
-        stream_id,
-        topic_id,
-        partition_id,
-        &expired_segment_offsets,
-    )
-    .await
-}
-
-/// Handles size-based cleanup at the topic level.
-/// Deletes the globally oldest sealed segment across all partitions until 
topic size is below 90% threshold.
-async fn handle_size_based_cleanup(
-    shard: &Rc<IggyShard>,
-    stream_id: usize,
-    topic_id: usize,
-    partition_ids: &[usize],
-) -> Result<DeletedSegments, IggyError> {
-    let Some((max_size, _)) = shard.metadata.with_metadata(|m| {
-        m.streams
-            .get(stream_id)
-            .and_then(|s| s.topics.get(topic_id))
-            .map(|t| (t.max_topic_size, t.stats.size_bytes_inconsistent()))
-    }) else {
-        return Ok(DeletedSegments::default());
-    };
-
-    if matches!(max_size, MaxTopicSize::Unlimited) {
-        return Ok(DeletedSegments::default());
-    }
-
-    let max_bytes = max_size.as_bytes_u64();
-    let threshold = max_bytes * 9 / 10;
-
-    let mut total_deleted = DeletedSegments::default();
-
-    loop {
-        let current_size = shard
-            .metadata
-            .with_metadata(|m| {
-                m.streams
-                    .get(stream_id)
-                    .and_then(|s| s.topics.get(topic_id))
-                    .map(|t| t.stats.size_bytes_inconsistent())
-            })
-            .unwrap_or(0);
-
-        if current_size < threshold {
-            break;
-        }
-
-        let Some((target_partition_id, target_offset, target_timestamp)) =
-            find_oldest_segment_in_shard(shard, stream_id, topic_id, 
partition_ids)
-        else {
-            debug!(
-                "No deletable segments found for stream ID: {}, topic ID: {} 
(all partitions have only active segment)",
-                stream_id, topic_id
-            );
-            break;
-        };
-
-        info!(
-            "Deleting oldest segment (start_offset: {}, timestamp: {}) from 
partition {} for stream ID: {}, topic ID: {}",
-            target_offset, target_timestamp, target_partition_id, stream_id, 
topic_id
-        );
-
-        let deleted = delete_segments(
-            shard,
-            stream_id,
-            topic_id,
-            target_partition_id,
-            &[target_offset],
-        )
-        .await?;
-        total_deleted.add(&deleted);
-
-        if deleted.segments_count == 0 {
-            break;
-        }
-    }
-
-    Ok(total_deleted)
-}
-
-/// Finds the oldest sealed segment across partitions owned by this shard.
-/// For each partition, the first segment in the vector is the oldest 
(segments are ordered).
-/// Compares first segments across partitions by timestamp to ensure fair 
deletion.
-/// Returns (partition_id, start_offset, start_timestamp) or None if no 
deletable segments exist.
-fn find_oldest_segment_in_shard(
-    shard: &Rc<IggyShard>,
-    stream_id: usize,
-    topic_id: usize,
-    partition_ids: &[usize],
-) -> Option<(usize, u64, u64)> {
-    let partitions = shard.local_partitions.borrow();
-
-    let mut oldest: Option<(usize, u64, u64)> = None;
-
-    for &partition_id in partition_ids {
-        let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
-        let Some(partition) = partitions.get(&ns) else {
-            continue;
-        };
-
-        let segments = partition.log.segments();
-        if segments.len() <= 1 {
-            continue;
-        }
-
-        // First segment is the oldest in this partition (segments are ordered 
chronologically)
-        let first_segment = &segments[0];
-        if !first_segment.sealed {
-            continue;
-        }
-
-        let candidate = (
-            partition_id,
-            first_segment.start_offset,
-            first_segment.start_timestamp,
-        );
-        match &oldest {
-            None => oldest = Some(candidate),
-            Some((_, _, oldest_ts)) if first_segment.start_timestamp < 
*oldest_ts => {
-                oldest = Some(candidate);
-            }
-            _ => {}
-        }
-    }
-
-    oldest
-}
-
-async fn delete_segments(
-    shard: &Rc<IggyShard>,
-    stream_id: usize,
-    topic_id: usize,
-    partition_id: usize,
-    segment_offsets: &[u64],
-) -> Result<DeletedSegments, IggyError> {
-    if segment_offsets.is_empty() {
-        return Ok(DeletedSegments::default());
-    }
-
-    info!(
-        "Deleting {} segments for stream ID: {}, topic ID: {}, partition ID: 
{}...",
-        segment_offsets.len(),
-        stream_id,
-        topic_id,
-        partition_id
-    );
-
-    let mut segments_count = 0u64;
-    let mut messages_count = 0u64;
-
-    let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
-
-    let (stats, segments_to_delete, mut storages_to_delete) = {
-        let mut partitions = shard.local_partitions.borrow_mut();
-        let Some(partition) = partitions.get_mut(&ns) else {
-            return Ok(DeletedSegments::default());
-        };
-
-        let log = &mut partition.log;
-        let mut segments_to_remove = Vec::new();
-        let mut storages_to_remove = Vec::new();
-
-        let mut indices_to_remove: Vec<usize> = Vec::new();
-        for &start_offset in segment_offsets {
-            if let Some(idx) = log
-                .segments()
-                .iter()
-                .position(|s| s.start_offset == start_offset)
-            {
-                indices_to_remove.push(idx);
-            }
-        }
-
-        indices_to_remove.sort_by(|a, b| b.cmp(a));
-        for idx in indices_to_remove {
-            let segment = log.segments_mut().remove(idx);
-            let storage = log.storages_mut().remove(idx);
-            log.indexes_mut().remove(idx);
-
-            segments_to_remove.push(segment);
-            storages_to_remove.push(storage);
-        }
-
-        (
-            partition.stats.clone(),
-            segments_to_remove,
-            storages_to_remove,
-        )
-    };
-
-    for (segment, storage) in segments_to_delete
-        .into_iter()
-        .zip(storages_to_delete.iter_mut())
-    {
-        let segment_size = segment.size.as_bytes_u64();
-        let start_offset = segment.start_offset;
-        let end_offset = segment.end_offset;
-
-        let messages_in_segment = if start_offset == end_offset {
-            0
-        } else {
-            (end_offset - start_offset) + 1
-        };
-
-        let _ = storage.shutdown();
-        let (messages_path, index_path) = storage.segment_and_index_paths();
-
-        if let Some(path) = messages_path {
-            if let Err(e) = compio::fs::remove_file(&path).await {
-                error!("Failed to delete messages file {}: {}", path, e);
-            } else {
-                trace!("Deleted messages file: {}", path);
-            }
-        } else {
-            warn!(
-                "Messages writer path not found for segment starting at offset 
{}",
-                start_offset
-            );
-        }
-
-        if let Some(path) = index_path {
-            if let Err(e) = compio::fs::remove_file(&path).await {
-                error!("Failed to delete index file {}: {}", path, e);
-            } else {
-                trace!("Deleted index file: {}", path);
-            }
-        } else {
-            warn!(
-                "Index writer path not found for segment starting at offset 
{}",
-                start_offset
-            );
-        }
-
-        stats.decrement_size_bytes(segment_size);
-        stats.decrement_segments_count(1);
-        stats.decrement_messages_count(messages_in_segment);
-
-        info!(
-            "Deleted segment with start offset {} (end: {}, size: {}, 
messages: {}) from partition ID: {}",
-            start_offset, end_offset, segment_size, messages_in_segment, 
partition_id
-        );
-
-        segments_count += 1;
-        messages_count += messages_in_segment;
-    }
-
-    Ok(DeletedSegments {
-        segments_count,
-        messages_count,
-    })
-}
diff --git a/core/server/src/shard/tasks/periodic/message_saver.rs 
b/core/server/src/shard/tasks/periodic/message_saver.rs
index d959ca8c7..0ebb5cdfc 100644
--- a/core/server/src/shard/tasks/periodic/message_saver.rs
+++ b/core/server/src/shard/tasks/periodic/message_saver.rs
@@ -17,6 +17,8 @@
  */
 
 use crate::shard::IggyShard;
+use crate::shard::transmission::frame::ShardResponse;
+use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload};
 use iggy_common::IggyError;
 use std::rc::Rc;
 use tracing::{error, info, trace};
@@ -29,16 +31,13 @@ pub fn spawn_message_saver(shard: Rc<IggyShard>) {
         period
     );
     let shard_clone = shard.clone();
-    let shard_for_shutdown = shard.clone();
     shard
         .task_registry
         .periodic("save_messages")
         .every(period)
-        .last_tick_on_shutdown(true)
+        // No last_tick_on_shutdown — the pump handles final flush + fsync
+        // during its own shutdown (see message_pump.rs).
         .tick(move |_shutdown| save_messages(shard_clone.clone()))
-        .on_shutdown(move |result| {
-            fsync_all_segments_on_shutdown(shard_for_shutdown.clone(), result)
-        })
         .spawn();
 }
 
@@ -46,63 +45,28 @@ async fn save_messages(shard: Rc<IggyShard>) -> Result<(), 
IggyError> {
     trace!("Saving buffered messages...");
 
     let namespaces = shard.get_current_shard_namespaces();
-    let mut total_saved_messages = 0u64;
     let mut partitions_flushed = 0u32;
 
     for ns in namespaces {
-        if shard.local_partitions.borrow().get(&ns).is_some() {
-            match shard
-                .flush_unsaved_buffer_from_local_partitions(&ns, false)
-                .await
-            {
-                Ok(saved) => {
-                    if saved > 0 {
-                        total_saved_messages += saved as u64;
-                        partitions_flushed += 1;
-                    }
-                }
-                Err(err) => {
-                    error!("Failed to save messages for partition {:?}: {}", 
ns, err);
-                }
+        let payload = ShardRequestPayload::FlushUnsavedBuffer { fsync: false };
+        let request = ShardRequest::data_plane(ns, payload);
+        match shard.send_to_data_plane(request).await {
+            Ok(ShardResponse::FlushUnsavedBuffer { flushed_count }) if 
flushed_count > 0 => {
+                partitions_flushed += 1;
             }
+            Ok(ShardResponse::FlushUnsavedBuffer { .. }) => {}
+            Ok(ShardResponse::ErrorResponse(err)) => {
+                error!("Failed to save messages for partition {:?}: {}", ns, 
err);
+            }
+            Err(err) => {
+                error!("Failed to save messages for partition {:?}: {}", ns, 
err);
+            }
+            _ => {}
         }
     }
 
-    if total_saved_messages > 0 {
-        info!("Saved {total_saved_messages} messages from {partitions_flushed} 
partitions.");
+    if partitions_flushed > 0 {
+        info!("Flushed {partitions_flushed} partitions.");
     }
     Ok(())
 }
-
-async fn fsync_all_segments_on_shutdown(shard: Rc<IggyShard>, result: 
Result<(), IggyError>) {
-    if result.is_err() {
-        error!(
-            "Last save_messages tick failed, skipping fsync: {:?}",
-            result
-        );
-        return;
-    }
-
-    trace!("Performing fsync on all segments during shutdown...");
-
-    let namespaces = shard.get_current_shard_namespaces();
-
-    for ns in namespaces {
-        if shard.local_partitions.borrow().get(&ns).is_some() {
-            match shard.fsync_all_messages_from_local_partitions(&ns).await {
-                Ok(()) => {
-                    trace!(
-                        "Successfully fsynced segment for partition {:?} 
during shutdown",
-                        ns
-                    );
-                }
-                Err(err) => {
-                    error!(
-                        "Failed to fsync segment for partition {:?} during 
shutdown: {}",
-                        ns, err
-                    );
-                }
-            }
-        }
-    }
-}
diff --git a/core/server/src/shard/transmission/frame.rs 
b/core/server/src/shard/transmission/frame.rs
index 4576221cf..c8cc585a9 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -60,8 +60,14 @@ pub struct ConsumerGroupResponseData {
 pub enum ShardResponse {
     PollMessages((IggyPollMetadata, IggyMessagesBatchSet)),
     SendMessages,
-    FlushUnsavedBuffer,
+    FlushUnsavedBuffer {
+        flushed_count: u32,
+    },
     DeleteSegments,
+    CleanTopicMessages {
+        deleted_segments: u64,
+        deleted_messages: u64,
+    },
     Event,
     CreateStreamResponse(StreamResponseData),
     DeleteStreamResponse(usize),
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index 4f04fc13e..6fe952b92 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -115,6 +115,11 @@ pub enum ShardRequestPayload {
     DeleteSegments {
         segments_count: u32,
     },
+    CleanTopicMessages {
+        stream_id: usize,
+        topic_id: usize,
+        partition_ids: Vec<usize>,
+    },
     SocketTransfer {
         fd: OwnedFd,
         from_shard: u16,
diff --git a/core/server/src/streaming/partitions/local_partition.rs 
b/core/server/src/streaming/partitions/local_partition.rs
index 4eeba5dbb..14f5e63d9 100644
--- a/core/server/src/streaming/partitions/local_partition.rs
+++ b/core/server/src/streaming/partitions/local_partition.rs
@@ -27,7 +27,6 @@ use super::{
 use crate::streaming::{deduplication::MessageDeduplicator, 
stats::PartitionStats};
 use iggy_common::IggyTimestamp;
 use std::sync::{Arc, atomic::AtomicU64};
-use tokio::sync::Mutex as TokioMutex;
 
 /// Per-shard partition data - mutable, single-threaded access.
 #[derive(Debug)]
@@ -41,7 +40,6 @@ pub struct LocalPartition {
     pub created_at: IggyTimestamp,
     pub revision_id: u64,
     pub should_increment_offset: bool,
-    pub write_lock: Arc<TokioMutex<()>>,
 }
 
 impl LocalPartition {
@@ -67,7 +65,6 @@ impl LocalPartition {
             created_at,
             revision_id,
             should_increment_offset,
-            write_lock: Arc::new(TokioMutex::new(())),
         }
     }
 
@@ -94,7 +91,6 @@ impl LocalPartition {
             created_at,
             revision_id,
             should_increment_offset,
-            write_lock: Arc::new(TokioMutex::new(())),
         }
     }
 }
diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs 
b/core/server/src/streaming/segments/indexes/index_writer.rs
index 2022b411d..fb5a0a533 100644
--- a/core/server/src/streaming/segments/indexes/index_writer.rs
+++ b/core/server/src/streaming/segments/indexes/index_writer.rs
@@ -48,9 +48,14 @@ impl IndexWriter {
         fsync: bool,
         file_exists: bool,
     ) -> Result<Self, IggyError> {
-        let file = OpenOptions::new()
-            .create(true)
-            .write(true)
+        let mut opts = OpenOptions::new();
+        opts.create(true).write(true);
+        if !file_exists {
+            // When creating a fresh segment at a reused path (e.g. offset 0 
after all segments
+            // were deleted), truncate to clear any stale data from a previous 
incarnation.
+            opts.truncate(true);
+        }
+        let file = opts
             .open(file_path)
             .await
             .error(|e: &std::io::Error| format!("Failed to open index file: 
{file_path}. {e}"))
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs 
b/core/server/src/streaming/segments/messages/messages_writer.rs
index 439e30a80..525f4292a 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -50,9 +50,14 @@ impl MessagesWriter {
         fsync: bool,
         file_exists: bool,
     ) -> Result<Self, IggyError> {
-        let file = OpenOptions::new()
-            .create(true)
-            .write(true)
+        let mut opts = OpenOptions::new();
+        opts.create(true).write(true);
+        if !file_exists {
+            // When creating a fresh segment at a reused path (e.g. offset 0 
after all segments
+            // were deleted), truncate to clear any stale data from a previous 
incarnation.
+            opts.truncate(true);
+        }
+        let file = opts
             .open(file_path)
             .await
             .error(|err: &std::io::Error| {

Reply via email to