This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch command-handler-impr
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 0e4a9883864ee01603cf401c2051214d29bc4136
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Feb 12 16:43:00 2026 +0100

    fix(server): delete oldest segments instead of newest in DeleteSegments API
    
    delete_segments drained from the end of the segment vec,
    removing the newest (including active) segments. It then
    called init_log_in_local_partitions which reset the partition
    to offset 0, destroying surviving data and leaving consumers
    with stale offsets.
    
    Replace with delete_oldest_segments that removes sealed
    segments oldest-first via remove_segment_by_offset (same
    path as the message cleaner). A consumer offset barrier
    prevents deleting segments with unconsumed messages. The
    active segment and partition offset are never touched.
    
    Split purge_topic into in-memory (purge_topic) and disk
    (purge_topic_local) phases. The disk phase uses the new
    purge_all_segments which keeps the original drain-and-reset
    behavior appropriate for full topic resets.
---
 core/integration/tests/server/mod.rs               |    1 +
 core/integration/tests/server/purge_delete.rs      |   96 ++
 .../server/scenarios/delete_segments_scenario.rs   |  229 ----
 core/integration/tests/server/scenarios/mod.rs     |    2 +-
 .../server/scenarios/purge_delete_scenario.rs      | 1107 ++++++++++++++++++++
 core/integration/tests/server/specific.rs          |   12 +-
 .../handlers/segments/delete_segments_handler.rs   |    7 +-
 .../binary/handlers/users/delete_user_handler.rs   |    4 +-
 core/server/src/http/users.rs                      |    4 +-
 core/server/src/shard/execution.rs                 |   46 +-
 core/server/src/shard/handlers.rs                  |   15 +-
 core/server/src/shard/mod.rs                       |    2 +-
 core/server/src/shard/system/consumer_offsets.rs   |   44 +
 core/server/src/shard/system/segments.rs           |  156 ++-
 core/server/src/shard/system/streams.rs            |   17 +
 core/server/src/shard/system/topics.rs             |   74 +-
 core/server/src/shard/transmission/event.rs        |    6 +-
 core/server/src/shard/transmission/frame.rs        |    7 +-
 .../src/streaming/segments/indexes/index_writer.rs |    5 -
 .../streaming/segments/messages/messages_writer.rs |    5 -
 20 files changed, 1452 insertions(+), 387 deletions(-)

diff --git a/core/integration/tests/server/mod.rs 
b/core/integration/tests/server/mod.rs
index f3b7a584a..6ea1338af 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -21,5 +21,6 @@ mod concurrent_addition;
 mod general;
 mod message_cleanup;
 mod message_retrieval;
+mod purge_delete;
 mod scenarios;
 mod specific;
diff --git a/core/integration/tests/server/purge_delete.rs 
b/core/integration/tests/server/purge_delete.rs
new file mode 100644
index 000000000..43298956d
--- /dev/null
+++ b/core/integration/tests/server/purge_delete.rs
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::purge_delete_scenario;
+use integration::iggy_harness;
+use test_case::test_matrix;
+
+#[iggy_harness(server(
+    segment.size = "5KiB",
+    segment.cache_indexes = ["all", "none", "open_segment"],
+    partition.messages_required_to_save = "1",
+    partition.enforce_fsync = "true",
+))]
+#[test_matrix([restart_off(), restart_on()])]
+async fn should_delete_segments_and_validate_filesystem(
+    harness: &mut TestHarness,
+    restart_server: bool,
+) {
+    purge_delete_scenario::run(harness, restart_server).await;
+}
+
+#[iggy_harness(server(
+    segment.size = "5KiB",
+    segment.cache_indexes = ["all", "none", "open_segment"],
+    partition.messages_required_to_save = "1",
+    partition.enforce_fsync = "true",
+))]
+#[test_matrix([restart_off(), restart_on()])]
+async fn should_delete_segments_without_consumers(harness: &mut TestHarness, 
restart_server: bool) {
+    purge_delete_scenario::run_no_consumers(harness, restart_server).await;
+}
+
+#[iggy_harness(server(
+    segment.size = "5KiB",
+    segment.cache_indexes = ["all", "none", "open_segment"],
+    partition.messages_required_to_save = "1",
+    partition.enforce_fsync = "true",
+))]
+async fn should_delete_segments_with_consumer_group_barrier(harness: 
&TestHarness) {
+    let client = harness.tcp_root_client().await.unwrap();
+    let data_path = harness.server().data_path();
+
+    purge_delete_scenario::run_consumer_group_barrier(&client, 
&data_path).await;
+}
+
+#[iggy_harness(server(
+    segment.size = "5KiB",
+    segment.cache_indexes = ["all", "none", "open_segment"],
+    partition.messages_required_to_save = "1",
+    partition.enforce_fsync = "true",
+))]
+#[test_matrix([restart_off(), restart_on()])]
+async fn should_block_deletion_until_all_consumers_pass_segment(
+    harness: &mut TestHarness,
+    restart_server: bool,
+) {
+    purge_delete_scenario::run_multi_consumer_barrier(harness, 
restart_server).await;
+}
+
+#[iggy_harness(server(
+    segment.size = "5KiB",
+    segment.cache_indexes = ["all", "none", "open_segment"],
+    partition.messages_required_to_save = "1",
+    partition.enforce_fsync = "true",
+))]
+#[test_matrix([restart_off(), restart_on()])]
+async fn should_purge_topic_and_clear_consumer_offsets(
+    harness: &mut TestHarness,
+    restart_server: bool,
+) {
+    purge_delete_scenario::run_purge_topic(harness, restart_server).await;
+}
+
+fn restart_off() -> bool {
+    false
+}
+
+fn restart_on() -> bool {
+    true
+}
diff --git 
a/core/integration/tests/server/scenarios/delete_segments_scenario.rs 
b/core/integration/tests/server/scenarios/delete_segments_scenario.rs
deleted file mode 100644
index dad38fbe4..000000000
--- a/core/integration/tests/server/scenarios/delete_segments_scenario.rs
+++ /dev/null
@@ -1,229 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use bytes::Bytes;
-use iggy::prelude::*;
-use std::fs::{DirEntry, read_dir};
-use std::path::Path;
-
-const STREAM_NAME: &str = "test_stream";
-const TOPIC_NAME: &str = "test_topic";
-const PARTITION_ID: u32 = 0;
-const LOG_EXTENSION: &str = "log";
-
-pub async fn run(client: &IggyClient, data_path: &Path) {
-    let stream = client.create_stream(STREAM_NAME).await.unwrap();
-    let stream_id = stream.id;
-
-    let topic = client
-        .create_topic(
-            &Identifier::named(STREAM_NAME).unwrap(),
-            TOPIC_NAME,
-            1,
-            CompressionAlgorithm::None,
-            None,
-            IggyExpiry::NeverExpire,
-            MaxTopicSize::ServerDefault,
-        )
-        .await
-        .unwrap();
-    let topic_id = topic.id;
-
-    // Send 5 large messages to create multiple segments
-    let large_payload = "A".repeat(1024 * 1024);
-
-    for i in 0..5 {
-        let message = IggyMessage::builder()
-            .id(i as u128)
-            .payload(Bytes::from(large_payload.clone()))
-            .build()
-            .expect("Failed to create message");
-
-        let mut messages = vec![message];
-        client
-            .send_messages(
-                &Identifier::named(STREAM_NAME).unwrap(),
-                &Identifier::named(TOPIC_NAME).unwrap(),
-                &Partitioning::partition_id(PARTITION_ID),
-                &mut messages,
-            )
-            .await
-            .unwrap();
-    }
-
-    // Wait for segments to be persisted and closed
-    tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
-
-    // Check initial segment count on filesystem
-    let partition_path = data_path
-        .join(format!(
-            "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}"
-        ))
-        .display()
-        .to_string();
-
-    let initial_segments = get_segment_paths_for_partition(&partition_path);
-    println!(
-        "Initial segments: {:?}",
-        initial_segments
-            .iter()
-            .map(|e| e.file_name())
-            .collect::<Vec<_>>()
-    );
-
-    assert!(
-        initial_segments.len() >= 3,
-        "Expected at least 3 segments but got {}. This might mean the segment 
size is too large or messages aren't being flushed.",
-        initial_segments.len()
-    );
-
-    let initial_count = initial_segments.len();
-
-    // Test delete segments command - keep only 2 segments
-    let segments_to_keep = 2u32;
-    let result = client
-        .delete_segments(
-            &Identifier::named(STREAM_NAME).unwrap(),
-            &Identifier::named(TOPIC_NAME).unwrap(),
-            PARTITION_ID,
-            segments_to_keep,
-        )
-        .await;
-
-    assert!(
-        result.is_ok(),
-        "Delete segments command should succeed: {result:?}"
-    );
-
-    tokio::time::sleep(std::time::Duration::from_millis(500)).await;
-
-    // Verify segments were deleted from filesystem
-    let remaining_segments = get_segment_paths_for_partition(&partition_path);
-    println!(
-        "Remaining segments: {:?}",
-        remaining_segments
-            .iter()
-            .map(|e| e.file_name())
-            .collect::<Vec<_>>()
-    );
-
-    // Should have at most segments_to_keep + 1 (closed + open segments)
-    assert!(
-        remaining_segments.len() <= (segments_to_keep + 1) as usize,
-        "Expected at most {} segments but got {}",
-        segments_to_keep + 1,
-        remaining_segments.len()
-    );
-
-    assert!(
-        remaining_segments.len() < initial_count,
-        "Expected fewer segments after deletion. Initial: {}, Remaining: {}",
-        initial_count,
-        remaining_segments.len()
-    );
-
-    // Test edge cases
-    let result = client
-        .delete_segments(
-            &Identifier::named(STREAM_NAME).unwrap(),
-            &Identifier::named(TOPIC_NAME).unwrap(),
-            PARTITION_ID,
-            0,
-        )
-        .await;
-    assert!(
-        result.is_ok(),
-        "Delete segments with 0 count should succeed"
-    );
-
-    // Test error cases
-    let result = client
-        .delete_segments(
-            &Identifier::numeric(999).unwrap(),
-            &Identifier::named(TOPIC_NAME).unwrap(),
-            PARTITION_ID,
-            1,
-        )
-        .await;
-    assert!(
-        result.is_err(),
-        "Delete segments on non-existent stream should fail"
-    );
-
-    let result = client
-        .delete_segments(
-            &Identifier::named(STREAM_NAME).unwrap(),
-            &Identifier::numeric(999).unwrap(),
-            PARTITION_ID,
-            1,
-        )
-        .await;
-    assert!(
-        result.is_err(),
-        "Delete segments on non-existent topic should fail"
-    );
-
-    let result = client
-        .delete_segments(
-            &Identifier::named(STREAM_NAME).unwrap(),
-            &Identifier::named(TOPIC_NAME).unwrap(),
-            999,
-            1,
-        )
-        .await;
-    assert!(
-        result.is_err(),
-        "Delete segments on non-existent partition should fail"
-    );
-
-    client
-        .delete_topic(
-            &Identifier::named(STREAM_NAME).unwrap(),
-            &Identifier::named(TOPIC_NAME).unwrap(),
-        )
-        .await
-        .unwrap();
-
-    client
-        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
-        .await
-        .unwrap();
-}
-
-fn get_segment_paths_for_partition(partition_path: &str) -> Vec<DirEntry> {
-    read_dir(partition_path)
-        .map(|read_dir| {
-            read_dir
-                .filter_map(|dir_entry| {
-                    dir_entry
-                        .map(|dir_entry| {
-                            match dir_entry
-                                .path()
-                                .extension()
-                                .is_some_and(|ext| ext == LOG_EXTENSION)
-                            {
-                                true => Some(dir_entry),
-                                false => None,
-                            }
-                        })
-                        .ok()
-                        .flatten()
-                })
-                .collect::<Vec<_>>()
-        })
-        .unwrap_or_default()
-}
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index a8ce05e88..22b08f596 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -28,7 +28,6 @@ pub mod 
consumer_group_with_single_client_polling_messages_scenario;
 pub mod consumer_timestamp_polling_scenario;
 pub mod create_message_payload;
 pub mod cross_protocol_pat_scenario;
-pub mod delete_segments_scenario;
 pub mod encryption_scenario;
 pub mod log_rotation_scenario;
 pub mod message_cleanup_scenario;
@@ -36,6 +35,7 @@ pub mod message_headers_scenario;
 pub mod message_size_scenario;
 pub mod offset_scenario;
 pub mod permissions_scenario;
+pub mod purge_delete_scenario;
 pub mod read_during_persistence_scenario;
 pub mod segment_rotation_race_scenario;
 pub mod single_message_per_batch_scenario;
diff --git a/core/integration/tests/server/scenarios/purge_delete_scenario.rs 
b/core/integration/tests/server/scenarios/purge_delete_scenario.rs
new file mode 100644
index 000000000..619a2af0d
--- /dev/null
+++ b/core/integration/tests/server/scenarios/purge_delete_scenario.rs
@@ -0,0 +1,1107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use iggy_common::Credentials;
+use integration::harness::TestHarness;
+use std::fs::{metadata, read_dir};
+use std::path::Path;
+
+const STREAM_NAME: &str = "test_stream";
+const TOPIC_NAME: &str = "test_topic";
+const PARTITION_ID: u32 = 0;
+const LOG_EXTENSION: &str = "log";
+const INDEX_EXTENSION: &str = "index";
+
+/// Payload chosen so IGGY_MESSAGE_HEADER_SIZE + payload = 1000B per message 
on disk.
+///
+/// Rotation mechanics (with segment.size = 5KiB = 5120B, 
messages_required_to_save = 1):
+///   `is_full()` checks `size >= 5120` BEFORE persisting the current message.
+///   After 6 persisted messages (6000B >= 5120) the next arrival sees 
is_full=true,
+///   gets persisted into the same segment, then rotation fires.
+///   Result: 7 messages per sealed segment (7000B on disk).
+const PAYLOAD_SIZE: usize = 936;
+const MESSAGE_ON_DISK_SIZE: u64 = IGGY_MESSAGE_HEADER_SIZE as u64 + 
PAYLOAD_SIZE as u64;
+const INDEX_SIZE_PER_MSG: u64 = INDEX_SIZE as u64;
+const TOTAL_MESSAGES: u32 = 25;
+
+/// 3 sealed segments (7 msgs each) + 1 active (4 msgs at offsets 21-24).
+const EXPECTED_SEGMENT_OFFSETS: [u64; 4] = [0, 7, 14, 21];
+const MSGS_PER_SEALED_SEGMENT: u64 = 7;
+
+/// Single consumer barrier: oldest-first deletion, barrier advancement, and 
edge cases.
+///
+/// Covers: barrier blocks deletion, advancing barrier releases segments, 
delete(0) no-op,
+/// delete(u32::MAX) bulk, consumer not stuck after deletion, error cases for 
invalid IDs.
+pub async fn run(harness: &mut TestHarness, restart_server: bool) {
+    let client = build_root_client(harness);
+    client.connect().await.unwrap();
+    let data_path = harness.server().data_path().to_path_buf();
+
+    let stream = client.create_stream(STREAM_NAME).await.unwrap();
+    let stream_id = stream.id;
+
+    let topic = client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    let topic_id = topic.id;
+
+    let stream_ident = Identifier::named(STREAM_NAME).unwrap();
+    let topic_ident = Identifier::named(TOPIC_NAME).unwrap();
+
+    send_messages(&client, &stream_ident, &topic_ident, TOTAL_MESSAGES).await;
+
+    let partition_path = partition_path(&data_path, stream_id, topic_id);
+
+    // --- Verify exact segment layout ---
+    let segment_offsets = get_sorted_segment_offsets(&partition_path);
+    assert_eq!(
+        segment_offsets, EXPECTED_SEGMENT_OFFSETS,
+        "Segment layout must match calculated offsets"
+    );
+    assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS);
+
+    let all_offsets = poll_all_offsets(&client, &stream_ident, 
&topic_ident).await;
+    let expected_offsets: Vec<u64> = (0..TOTAL_MESSAGES as u64).collect();
+    assert_eq!(all_offsets, expected_offsets);
+
+    // --- Consumer offset barrier ---
+    //
+    // stored_offset = 7 (start of segment 1). Segment 0 end_offset = 6 <= 7 → 
deletable.
+    // Segment 1 end_offset = 13 > 7 → protected by barrier.
+    let consumer = Consumer {
+        kind: ConsumerKind::Consumer,
+        id: Identifier::numeric(1).unwrap(),
+    };
+    let stored_offset = EXPECTED_SEGMENT_OFFSETS[1]; // 7
+    let seg1_end_offset = EXPECTED_SEGMENT_OFFSETS[2] - 1; // 13
+    client
+        .store_consumer_offset(
+            &consumer,
+            &stream_ident,
+            &topic_ident,
+            Some(PARTITION_ID),
+            stored_offset,
+        )
+        .await
+        .unwrap();
+
+    // --- Delete 1 oldest segment ---
+    client
+        .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 1)
+        .await
+        .unwrap();
+    maybe_restart(harness, restart_server).await;
+
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        EXPECTED_SEGMENT_OFFSETS[1..],
+        "Segment 0 deleted → [7, 14, 21]"
+    );
+    assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[1..]);
+    assert_eq!(
+        poll_all_offsets(&client, &stream_ident, &topic_ident).await,
+        (MSGS_PER_SEALED_SEGMENT..TOTAL_MESSAGES as u64).collect::<Vec<_>>(),
+        "Messages 7..25 survive"
+    );
+
+    // --- Barrier prevents deletion ---
+    client
+        .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 1)
+        .await
+        .unwrap();
+    maybe_restart(harness, restart_server).await;
+
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        EXPECTED_SEGMENT_OFFSETS[1..],
+        "Segment 1 (end_offset={seg1_end_offset}) blocked: consumer at 
{stored_offset}"
+    );
+    assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[1..]);
+
+    // --- Advance consumer past segment 1, delete it ---
+    client
+        .store_consumer_offset(
+            &consumer,
+            &stream_ident,
+            &topic_ident,
+            Some(PARTITION_ID),
+            seg1_end_offset,
+        )
+        .await
+        .unwrap();
+
+    client
+        .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 1)
+        .await
+        .unwrap();
+    maybe_restart(harness, restart_server).await;
+
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        EXPECTED_SEGMENT_OFFSETS[2..],
+        "Segment 1 deleted → [14, 21]"
+    );
+    assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[2..]);
+    assert_eq!(
+        poll_all_offsets(&client, &stream_ident, &topic_ident).await,
+        (2 * MSGS_PER_SEALED_SEGMENT..TOTAL_MESSAGES as 
u64).collect::<Vec<_>>(),
+        "Messages 14..25 survive"
+    );
+
+    // --- Consumer not stuck ---
+    let polled_next = client
+        .poll_messages(
+            &stream_ident,
+            &topic_ident,
+            Some(PARTITION_ID),
+            &consumer,
+            &PollingStrategy::next(),
+            100,
+            false,
+        )
+        .await
+        .unwrap();
+    assert_eq!(
+        polled_next.messages[0].header.offset, EXPECTED_SEGMENT_OFFSETS[2],
+        "Next poll resumes at offset 14 (first message after stored_offset 13)"
+    );
+
+    // --- delete(0) is a no-op ---
+    client
+        .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 0)
+        .await
+        .unwrap();
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        EXPECTED_SEGMENT_OFFSETS[2..]
+    );
+    assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[2..]);
+
+    // --- delete(u32::MAX) with consumer past all sealed segments ---
+    client
+        .store_consumer_offset(
+            &consumer,
+            &stream_ident,
+            &topic_ident,
+            Some(PARTITION_ID),
+            (TOTAL_MESSAGES - 1) as u64,
+        )
+        .await
+        .unwrap();
+
+    client
+        .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, u32::MAX)
+        .await
+        .unwrap();
+    maybe_restart(harness, restart_server).await;
+
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        [EXPECTED_SEGMENT_OFFSETS[3]],
+        "Only active segment at offset 21 survives"
+    );
+    assert_no_orphaned_segment_files(&partition_path, 1);
+    assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[3..]);
+    assert_eq!(
+        poll_all_offsets(&client, &stream_ident, &topic_ident).await,
+        (EXPECTED_SEGMENT_OFFSETS[3]..TOTAL_MESSAGES as 
u64).collect::<Vec<_>>(),
+        "Messages 21..25 survive in active segment"
+    );
+
+    // --- Error cases ---
+    assert!(
+        client
+            .delete_segments(
+                &Identifier::numeric(999).unwrap(),
+                &topic_ident,
+                PARTITION_ID,
+                1,
+            )
+            .await
+            .is_err(),
+        "Non-existent stream"
+    );
+    assert!(
+        client
+            .delete_segments(
+                &stream_ident,
+                &Identifier::numeric(999).unwrap(),
+                PARTITION_ID,
+                1,
+            )
+            .await
+            .is_err(),
+        "Non-existent topic"
+    );
+    assert!(
+        client
+            .delete_segments(&stream_ident, &topic_ident, 999, 1)
+            .await
+            .is_err(),
+        "Non-existent partition"
+    );
+
+    // Cleanup
+    client
+        .delete_topic(&stream_ident, &topic_ident)
+        .await
+        .unwrap();
+    client.delete_stream(&stream_ident).await.unwrap();
+}
+
+/// No consumers — no barrier: sealed segments are unconditionally deletable.
+///
+/// Deletes all 3 sealed segments one by one, verifying .log/.index file sizes 
and
+/// surviving message offsets after each. Active segment is never deleted.
+pub async fn run_no_consumers(harness: &mut TestHarness, restart_server: bool) 
{
+    let client = build_root_client(harness);
+    client.connect().await.unwrap();
+    let data_path = harness.server().data_path().to_path_buf();
+
+    let stream = client.create_stream(STREAM_NAME).await.unwrap();
+    let stream_id = stream.id;
+
+    let topic = client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    let topic_id = topic.id;
+
+    let stream_ident = Identifier::named(STREAM_NAME).unwrap();
+    let topic_ident = Identifier::named(TOPIC_NAME).unwrap();
+
+    send_messages(&client, &stream_ident, &topic_ident, TOTAL_MESSAGES).await;
+
+    let partition_path = partition_path(&data_path, stream_id, topic_id);
+
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        EXPECTED_SEGMENT_OFFSETS,
+        "Segment layout must match calculated offsets"
+    );
+    assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS);
+    assert_eq!(
+        poll_all_offsets(&client, &stream_ident, &topic_ident).await,
+        (0..TOTAL_MESSAGES as u64).collect::<Vec<_>>()
+    );
+
+    // Delete 3 sealed segments one by one
+    let sealed_count = EXPECTED_SEGMENT_OFFSETS.len() - 1;
+    for i in 0..sealed_count {
+        client
+            .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 1)
+            .await
+            .unwrap();
+        maybe_restart(harness, restart_server).await;
+
+        let first_surviving = EXPECTED_SEGMENT_OFFSETS[i + 1];
+        assert_eq!(
+            get_sorted_segment_offsets(&partition_path),
+            EXPECTED_SEGMENT_OFFSETS[i + 1..],
+            "After deleting {n} sealed segment(s)",
+            n = i + 1
+        );
+        assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[i 
+ 1..]);
+        assert_eq!(
+            poll_all_offsets(&client, &stream_ident, &topic_ident).await,
+            (first_surviving..TOTAL_MESSAGES as u64).collect::<Vec<_>>(),
+            "Messages from offset {first_surviving} onward survive"
+        );
+    }
+
+    // Only active segment remains — delete is a no-op
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        [EXPECTED_SEGMENT_OFFSETS[3]],
+        "Only active segment at offset 21"
+    );
+    assert_no_orphaned_segment_files(&partition_path, 1);
+
+    client
+        .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 1)
+        .await
+        .unwrap();
+
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        [EXPECTED_SEGMENT_OFFSETS[3]],
+        "No-op: active segment protected"
+    );
+    assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[3..]);
+    assert_eq!(
+        poll_all_offsets(&client, &stream_ident, &topic_ident).await,
+        (EXPECTED_SEGMENT_OFFSETS[3]..TOTAL_MESSAGES as 
u64).collect::<Vec<_>>(),
+        "Active segment messages still pollable"
+    );
+
+    // Cleanup
+    client
+        .delete_topic(&stream_ident, &topic_ident)
+        .await
+        .unwrap();
+    client.delete_stream(&stream_ident).await.unwrap();
+}
+
+/// Single consumer group barrier with message-by-message progression.
+///
+/// Polls one message at a time (next + auto_commit), attempts 
delete_segments(u32::MAX) after
+/// each poll. Verifies that each sealed segment is released exactly when the 
committed offset
+/// reaches its end_offset — not one message earlier, not one later.
+///
+/// No restart_server variant — 25 delete_segments calls would mean 25 
restarts.
+pub async fn run_consumer_group_barrier(client: &IggyClient, data_path: &Path) 
{
+    let stream = client.create_stream(STREAM_NAME).await.unwrap();
+    let stream_id = stream.id;
+
+    let topic = client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    let topic_id = topic.id;
+
+    let stream_ident = Identifier::named(STREAM_NAME).unwrap();
+    let topic_ident = Identifier::named(TOPIC_NAME).unwrap();
+
+    send_messages(client, &stream_ident, &topic_ident, TOTAL_MESSAGES).await;
+
+    let partition_path = partition_path(data_path, stream_id, topic_id);
+
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        EXPECTED_SEGMENT_OFFSETS
+    );
+    assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS);
+
+    // Use high-level consumer group API: auto-creates group, auto-joins, 
auto-commits
+    let mut consumer = client
+        .consumer_group("test_group", STREAM_NAME, TOPIC_NAME)
+        .unwrap()
+        .auto_commit(AutoCommit::When(AutoCommitWhen::ConsumingEachMessage))
+        .create_consumer_group_if_not_exists()
+        .auto_join_consumer_group()
+        .polling_strategy(PollingStrategy::next())
+        .batch_length(1)
+        .build();
+    consumer.init().await.unwrap();
+
+    let mut expected_segments = EXPECTED_SEGMENT_OFFSETS.to_vec();
+
+    for offset in 0..TOTAL_MESSAGES as u64 {
+        use futures::StreamExt;
+        let message = consumer
+            .next()
+            .await
+            .expect("stream ended prematurely")
+            .unwrap();
+
+        assert_eq!(
+            message.message.header.offset, offset,
+            "Expected message at offset {offset}"
+        );
+
+        while expected_segments.len() >= 2 {
+            let seg_end = expected_segments[1] - 1;
+            if segment_deletable(seg_end, offset) {
+                expected_segments.remove(0);
+            } else {
+                break;
+            }
+        }
+
+        client
+            .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 
u32::MAX)
+            .await
+            .unwrap();
+
+        assert_eq!(
+            get_sorted_segment_offsets(&partition_path),
+            expected_segments,
+            "After consuming offset {offset}"
+        );
+        assert_segment_file_sizes(&partition_path, &expected_segments);
+    }
+
+    assert_eq!(
+        expected_segments,
+        [EXPECTED_SEGMENT_OFFSETS[3]],
+        "Only active segment remains after consuming all messages"
+    );
+    assert_no_orphaned_segment_files(&partition_path, 1);
+
+    // Cleanup: consumer group auto-managed, just delete stream resources
+    drop(consumer);
+    client
+        .delete_topic(&stream_ident, &topic_ident)
+        .await
+        .unwrap();
+    client.delete_stream(&stream_ident).await.unwrap();
+}
+
+/// Multiple consumers: the slowest consumer gates deletion for all.
+///
+/// A consumer group ("fast") has consumed everything. A standalone consumer 
("slow") lags behind.
+/// The barrier is `min(fast, slow)`, so deletion is entirely gated by the 
slow consumer.
+/// Advances the slow consumer through each segment boundary, verifying that 
segments are
+/// released only when `segment_deletable(seg_end, barrier)` becomes true.
+pub async fn run_multi_consumer_barrier(harness: &mut TestHarness, 
restart_server: bool) {
+    let client = build_root_client(harness);
+    client.connect().await.unwrap();
+    let data_path = harness.server().data_path().to_path_buf();
+
+    let stream = client.create_stream(STREAM_NAME).await.unwrap();
+    let stream_id = stream.id;
+
+    let topic = client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    let topic_id = topic.id;
+
+    let stream_ident = Identifier::named(STREAM_NAME).unwrap();
+    let topic_ident = Identifier::named(TOPIC_NAME).unwrap();
+
+    send_messages(&client, &stream_ident, &topic_ident, TOTAL_MESSAGES).await;
+
+    let partition_path = partition_path(&data_path, stream_id, topic_id);
+
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        EXPECTED_SEGMENT_OFFSETS
+    );
+
+    // --- Fast consumer group: poll all messages with auto_commit via 
high-level API ---
+    let mut fast_consumer = client
+        .consumer_group("fast_group", STREAM_NAME, TOPIC_NAME)
+        .unwrap()
+        .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+        .create_consumer_group_if_not_exists()
+        .auto_join_consumer_group()
+        .polling_strategy(PollingStrategy::offset(0))
+        .batch_length(TOTAL_MESSAGES)
+        .build();
+    fast_consumer.init().await.unwrap();
+
+    {
+        use futures::StreamExt;
+        let mut consumed = 0u32;
+        while let Some(msg) = fast_consumer.next().await {
+            msg.unwrap();
+            consumed += 1;
+            if consumed >= TOTAL_MESSAGES {
+                break;
+            }
+        }
+        assert_eq!(consumed, TOTAL_MESSAGES);
+    }
+
+    // --- Set up slow standalone consumer: store offset at 0 ---
+    let slow_consumer = Consumer {
+        kind: ConsumerKind::Consumer,
+        id: Identifier::numeric(1).unwrap(),
+    };
+    client
+        .store_consumer_offset(
+            &slow_consumer,
+            &stream_ident,
+            &topic_ident,
+            Some(PARTITION_ID),
+            0,
+        )
+        .await
+        .unwrap();
+
+    // Phase 1: barrier=min(24,0)=0 → segment_deletable(6, 0)=false → nothing 
deleted
+    assert!(!segment_deletable(6, 0));
+    client
+        .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, u32::MAX)
+        .await
+        .unwrap();
+    maybe_restart(harness, restart_server).await;
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        EXPECTED_SEGMENT_OFFSETS,
+        "Phase 1: segment_deletable(6, 0)=false, all segments protected"
+    );
+
+    // Phase 2: slow→6, barrier=6 → segment_deletable(6, 6)=true → seg0 
released
+    assert!(segment_deletable(6, 6));
+    client
+        .store_consumer_offset(
+            &slow_consumer,
+            &stream_ident,
+            &topic_ident,
+            Some(PARTITION_ID),
+            6,
+        )
+        .await
+        .unwrap();
+    client
+        .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, u32::MAX)
+        .await
+        .unwrap();
+    maybe_restart(harness, restart_server).await;
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        [7, 14, 21],
+        "Phase 2: segment_deletable(6, 6)=true, seg0 released"
+    );
+
+    // Phase 3: slow→10, barrier=10 → segment_deletable(13, 10)=false → seg1 
protected
+    assert!(!segment_deletable(13, 10));
+    client
+        .store_consumer_offset(
+            &slow_consumer,
+            &stream_ident,
+            &topic_ident,
+            Some(PARTITION_ID),
+            10,
+        )
+        .await
+        .unwrap();
+    client
+        .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, u32::MAX)
+        .await
+        .unwrap();
+    maybe_restart(harness, restart_server).await;
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        [7, 14, 21],
+        "Phase 3: segment_deletable(13, 10)=false, seg1 protected"
+    );
+
+    // Phase 4: slow→13, barrier=13 → segment_deletable(13, 13)=true → seg1 
released
+    assert!(segment_deletable(13, 13));
+    client
+        .store_consumer_offset(
+            &slow_consumer,
+            &stream_ident,
+            &topic_ident,
+            Some(PARTITION_ID),
+            13,
+        )
+        .await
+        .unwrap();
+    client
+        .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, u32::MAX)
+        .await
+        .unwrap();
+    maybe_restart(harness, restart_server).await;
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        [14, 21],
+        "Phase 4: segment_deletable(13, 13)=true, seg1 released"
+    );
+
+    // Phase 5: slow→20, barrier=20 → segment_deletable(20, 20)=true → seg2 
released
+    assert!(segment_deletable(20, 20));
+    client
+        .store_consumer_offset(
+            &slow_consumer,
+            &stream_ident,
+            &topic_ident,
+            Some(PARTITION_ID),
+            20,
+        )
+        .await
+        .unwrap();
+    client
+        .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, u32::MAX)
+        .await
+        .unwrap();
+    maybe_restart(harness, restart_server).await;
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        [21],
+        "Phase 5: segment_deletable(20, 20)=true, only active segment remains"
+    );
+    assert_no_orphaned_segment_files(&partition_path, 1);
+
+    // Cleanup: drop high-level consumer, delete stream resources
+    drop(fast_consumer);
+    client
+        .delete_topic(&stream_ident, &topic_ident)
+        .await
+        .unwrap();
+    client.delete_stream(&stream_ident).await.unwrap();
+}
+
+/// purge_topic is a full reset: consumer offsets wiped (memory + disk), 
segments deleted,
+/// partition restarted at offset 0.
+///
+/// Sets up both a standalone consumer offset and a consumer group offset, 
purges, then
+/// verifies: in-memory offsets return None, offset files deleted from disk, 
single empty
+/// segment at offset 0, new messages start at offset 0.
+pub async fn run_purge_topic(harness: &mut TestHarness, restart_server: bool) {
+    let client = build_root_client(harness);
+    client.connect().await.unwrap();
+    let data_path = harness.server().data_path().to_path_buf();
+
+    let stream = client.create_stream(STREAM_NAME).await.unwrap();
+    let stream_id = stream.id;
+
+    let topic = client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    let topic_id = topic.id;
+
+    let stream_ident = Identifier::named(STREAM_NAME).unwrap();
+    let topic_ident = Identifier::named(TOPIC_NAME).unwrap();
+
+    send_messages(&client, &stream_ident, &topic_ident, TOTAL_MESSAGES).await;
+
+    let partition_path = partition_path(&data_path, stream_id, topic_id);
+
+    assert_eq!(
+        get_sorted_segment_offsets(&partition_path),
+        EXPECTED_SEGMENT_OFFSETS
+    );
+
+    // --- Store individual consumer offset at 13 ---
+    let consumer = Consumer {
+        kind: ConsumerKind::Consumer,
+        id: Identifier::numeric(1).unwrap(),
+    };
+    client
+        .store_consumer_offset(
+            &consumer,
+            &stream_ident,
+            &topic_ident,
+            Some(PARTITION_ID),
+            13,
+        )
+        .await
+        .unwrap();
+
+    // --- Consumer group: poll 10 messages → group offset at 9 via high-level 
API ---
+    let mut group_consumer = client
+        .consumer_group("purge_group", STREAM_NAME, TOPIC_NAME)
+        .unwrap()
+        .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+        .create_consumer_group_if_not_exists()
+        .auto_join_consumer_group()
+        .polling_strategy(PollingStrategy::offset(0))
+        .batch_length(10)
+        .build();
+    group_consumer.init().await.unwrap();
+
+    {
+        use futures::StreamExt;
+        let mut consumed = 0u32;
+        while let Some(msg) = group_consumer.next().await {
+            msg.unwrap();
+            consumed += 1;
+            if consumed >= 10 {
+                break;
+            }
+        }
+        assert_eq!(consumed, 10);
+    }
+
+    // Need the group ID for get_consumer_offset verification
+    let group_details = client
+        .get_consumer_group(
+            &stream_ident,
+            &topic_ident,
+            &Identifier::named("purge_group").unwrap(),
+        )
+        .await
+        .unwrap()
+        .expect("purge_group must exist");
+    let group_ident = Identifier::numeric(group_details.id).unwrap();
+    let group_consumer_ref = Consumer {
+        kind: ConsumerKind::ConsumerGroup,
+        id: group_ident.clone(),
+    };
+
+    // Verify both offsets are stored
+    let consumer_offset = client
+        .get_consumer_offset(&consumer, &stream_ident, &topic_ident, 
Some(PARTITION_ID))
+        .await
+        .unwrap();
+    assert!(consumer_offset.is_some(), "Consumer offset must be stored");
+    assert_eq!(consumer_offset.unwrap().stored_offset, 13);
+
+    let group_offset = client
+        .get_consumer_offset(
+            &group_consumer_ref,
+            &stream_ident,
+            &topic_ident,
+            Some(PARTITION_ID),
+        )
+        .await
+        .unwrap();
+    assert!(
+        group_offset.is_some(),
+        "Consumer group offset must be stored"
+    );
+    assert_eq!(group_offset.unwrap().stored_offset, 9);
+
+    // Verify offset files exist on disk
+    let consumers_dir = format!("{partition_path}/offsets/consumers");
+    let groups_dir = format!("{partition_path}/offsets/groups");
+    assert!(
+        !is_dir_empty(&consumers_dir),
+        "Consumer offset file must exist before purge"
+    );
+    assert!(
+        !is_dir_empty(&groups_dir),
+        "Consumer group offset file must exist before purge"
+    );
+
+    // --- Purge topic ---
+    // Drop high-level consumer before purge to release group membership
+    drop(group_consumer);
+    client
+        .purge_topic(&stream_ident, &topic_ident)
+        .await
+        .unwrap();
+    maybe_restart(harness, restart_server).await;
+
+    // --- Verify consumer offsets cleared ---
+    let consumer_offset = client
+        .get_consumer_offset(&consumer, &stream_ident, &topic_ident, 
Some(PARTITION_ID))
+        .await
+        .unwrap();
+    assert!(
+        consumer_offset.is_none(),
+        "Consumer offset must be cleared after purge"
+    );
+
+    let group_offset = client
+        .get_consumer_offset(
+            &group_consumer_ref,
+            &stream_ident,
+            &topic_ident,
+            Some(PARTITION_ID),
+        )
+        .await
+        .unwrap();
+    assert!(
+        group_offset.is_none(),
+        "Consumer group offset must be cleared after purge"
+    );
+
+    // --- Verify offset files deleted from disk ---
+    let consumer_files: Vec<_> = read_dir(&consumers_dir)
+        .map(|e| e.filter_map(|e| e.ok().map(|e| e.file_name())).collect())
+        .unwrap_or_default();
+    let group_files: Vec<_> = read_dir(&groups_dir)
+        .map(|e| e.filter_map(|e| e.ok().map(|e| e.file_name())).collect())
+        .unwrap_or_default();
+    assert!(
+        consumer_files.is_empty(),
+        "Consumer offset files must be deleted after purge, found: 
{consumer_files:?}"
+    );
+    assert!(
+        group_files.is_empty(),
+        "Consumer group offset files must be deleted after purge, found: 
{group_files:?}"
+    );
+
+    // --- Verify partition reset: single empty segment at offset 0 ---
+    assert_fresh_empty_partition(&partition_path);
+
+    // --- Verify new messages start at offset 0 ---
+    let new_msg_count = 3u32;
+    send_messages(&client, &stream_ident, &topic_ident, new_msg_count).await;
+
+    let probe_consumer = Consumer {
+        kind: ConsumerKind::Consumer,
+        id: Identifier::numeric(99).unwrap(),
+    };
+    let polled = client
+        .poll_messages(
+            &stream_ident,
+            &topic_ident,
+            Some(PARTITION_ID),
+            &probe_consumer,
+            &PollingStrategy::offset(0),
+            100,
+            false,
+        )
+        .await
+        .unwrap();
+    let offsets: Vec<u64> = polled.messages.iter().map(|m| 
m.header.offset).collect();
+    assert_eq!(
+        offsets,
+        (0..new_msg_count as u64).collect::<Vec<_>>(),
+        "New messages must start at offset 0 after purge"
+    );
+
+    // Cleanup: consumer group was dropped before purge, just delete stream 
resources
+    client
+        .delete_consumer_group(&stream_ident, &topic_ident, &group_ident)
+        .await
+        .unwrap();
+    client
+        .delete_topic(&stream_ident, &topic_ident)
+        .await
+        .unwrap();
+    client.delete_stream(&stream_ident).await.unwrap();
+}
+
+async fn maybe_restart(harness: &mut TestHarness, restart_server: bool) {
+    if restart_server {
+        harness.restart_server().await.unwrap();
+    }
+}
+
+/// Build a root client with SDK-level auto-reconnect and auto-sign-in.
+///
+/// Unlike `harness.tcp_root_client()` which does a one-shot `login_user()`, 
this embeds
+/// credentials in the transport config so the SDK re-authenticates on 
reconnect.
+fn build_root_client(harness: &TestHarness) -> IggyClient {
+    let addr = harness.server().tcp_addr().unwrap();
+    IggyClient::builder()
+        .with_tcp()
+        .with_server_address(addr.to_string())
+        .with_auto_sign_in(AutoLogin::Enabled(Credentials::UsernamePassword(
+            DEFAULT_ROOT_USERNAME.to_string(),
+            DEFAULT_ROOT_PASSWORD.to_string(),
+        )))
+        .build()
+        .unwrap()
+}
+
+fn partition_path(data_path: &Path, stream_id: u32, topic_id: u32) -> String {
+    data_path
+        .join(format!(
+            "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}"
+        ))
+        .display()
+        .to_string()
+}
+
+async fn send_messages(
+    client: &IggyClient,
+    stream_ident: &Identifier,
+    topic_ident: &Identifier,
+    count: u32,
+) {
+    for i in 0..count {
+        let payload = Bytes::from(format!("{i:04}").repeat(PAYLOAD_SIZE / 4));
+        let message = IggyMessage::builder()
+            .id(i as u128)
+            .payload(payload)
+            .build()
+            .expect("Failed to create message");
+
+        let mut messages = vec![message];
+        client
+            .send_messages(
+                stream_ident,
+                topic_ident,
+                &Partitioning::partition_id(PARTITION_ID),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+}
+
+/// Polls all messages from offset 0 and returns their offsets in order.
+async fn poll_all_offsets(
+    client: &IggyClient,
+    stream_ident: &Identifier,
+    topic_ident: &Identifier,
+) -> Vec<u64> {
+    let consumer = Consumer {
+        kind: ConsumerKind::Consumer,
+        id: Identifier::numeric(99).unwrap(),
+    };
+    let polled = client
+        .poll_messages(
+            stream_ident,
+            topic_ident,
+            Some(PARTITION_ID),
+            &consumer,
+            &PollingStrategy::offset(0),
+            TOTAL_MESSAGES * 2,
+            false,
+        )
+        .await
+        .unwrap();
+    polled.messages.iter().map(|m| m.header.offset).collect()
+}
+
+/// Asserts that each segment's `.log` and `.index` files have the exact 
expected size.
+/// Derives message count per segment from adjacent offsets and TOTAL_MESSAGES.
+fn assert_segment_file_sizes(partition_path: &str, offsets: &[u64]) {
+    for (i, &offset) in offsets.iter().enumerate() {
+        let msg_count = if i + 1 < offsets.len() {
+            offsets[i + 1] - offset
+        } else {
+            TOTAL_MESSAGES as u64 - offset
+        };
+
+        let log_path = 
format!("{partition_path}/{offset:0>20}.{LOG_EXTENSION}");
+        let index_path = 
format!("{partition_path}/{offset:0>20}.{INDEX_EXTENSION}");
+
+        let log_size = metadata(&log_path)
+            .unwrap_or_else(|e| panic!("{log_path}: {e}"))
+            .len();
+        let index_size = metadata(&index_path)
+            .unwrap_or_else(|e| panic!("{index_path}: {e}"))
+            .len();
+
+        let expected_log = msg_count * MESSAGE_ON_DISK_SIZE;
+        let expected_index = msg_count * INDEX_SIZE_PER_MSG;
+        assert_eq!(
+            log_size, expected_log,
+            "Segment {offset}: log {log_size}B != expected {expected_log}B 
({msg_count} msgs)"
+        );
+        assert_eq!(
+            index_size, expected_index,
+            "Segment {offset}: index {index_size}B != expected 
{expected_index}B ({msg_count} msgs)"
+        );
+    }
+}
+
+fn get_sorted_segment_offsets(partition_path: &str) -> Vec<u64> {
+    let mut offsets: Vec<u64> = read_dir(partition_path)
+        .map(|entries| {
+            entries
+                .filter_map(|entry| {
+                    let entry = entry.ok()?;
+                    let path = entry.path();
+                    if path.extension().is_some_and(|ext| ext == 
LOG_EXTENSION) {
+                        path.file_stem()
+                            .and_then(|s| s.to_str())
+                            .and_then(|s| s.parse::<u64>().ok())
+                    } else {
+                        None
+                    }
+                })
+                .collect()
+        })
+        .unwrap_or_default();
+    offsets.sort();
+    offsets
+}
+
+fn is_dir_empty(dir: &str) -> bool {
+    read_dir(dir)
+        .map(|mut entries| entries.next().is_none())
+        .unwrap_or(true)
+}
+
+/// Asserts the partition directory contains exactly one .log and one .index 
file at offset 0,
+/// both with size 0 — the expected state after a full purge or segment reset.
+fn assert_fresh_empty_partition(partition_path: &str) {
+    assert_eq!(
+        get_sorted_segment_offsets(partition_path),
+        [0],
+        "Partition must contain a single segment at offset 0"
+    );
+    assert_eq!(
+        count_files_with_ext(partition_path, INDEX_EXTENSION),
+        1,
+        "Exactly one .index file must remain"
+    );
+
+    let log_path = format!("{partition_path}/{:0>20}.{LOG_EXTENSION}", 0);
+    let index_path = format!("{partition_path}/{:0>20}.{INDEX_EXTENSION}", 0);
+    assert_eq!(
+        metadata(&log_path).unwrap().len(),
+        0,
+        "Fresh .log must be empty"
+    );
+    assert_eq!(
+        metadata(&index_path).unwrap().len(),
+        0,
+        "Fresh .index must be empty"
+    );
+}
+
+/// Asserts no orphaned segment files remain after deletion.
+/// `get_sorted_segment_offsets` only checks .log files — this additionally 
verifies
+/// that the .index file count matches, catching stale .index files left 
behind.
+fn assert_no_orphaned_segment_files(partition_path: &str, expected_count: 
usize) {
+    let log_count = count_files_with_ext(partition_path, LOG_EXTENSION);
+    let index_count = count_files_with_ext(partition_path, INDEX_EXTENSION);
+    assert_eq!(
+        log_count, expected_count,
+        "Expected {expected_count} .log files, found {log_count}"
+    );
+    assert_eq!(
+        index_count, expected_count,
+        "Expected {expected_count} .index files, found {index_count}"
+    );
+}
+
+fn count_files_with_ext(dir: &str, ext: &str) -> usize {
+    read_dir(dir)
+        .map(|entries| {
+            entries
+                .filter_map(|e| e.ok())
+                .filter(|e| e.path().extension().is_some_and(|e| e == ext))
+                .count()
+        })
+        .unwrap_or(0)
+}
+
+/// Mirrors the server's deletion rule: a sealed segment is deletable when
+/// `seg.end_offset <= min_committed_offset` (see `delete_oldest_segments` in 
segments.rs).
+///
+/// `seg_end_offset` is the last offset stored in the segment (inclusive).
+/// `committed` is the minimum committed offset across all consumers/groups.
+fn segment_deletable(seg_end_offset: u64, committed: u64) -> bool {
+    seg_end_offset <= committed
+}
diff --git a/core/integration/tests/server/specific.rs 
b/core/integration/tests/server/specific.rs
index 4adaaf9bf..ad569b1bd 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -18,19 +18,11 @@
  */
 
 use crate::server::scenarios::{
-    delete_segments_scenario, message_size_scenario, 
segment_rotation_race_scenario,
-    single_message_per_batch_scenario, tcp_tls_scenario, 
websocket_tls_scenario,
+    message_size_scenario, segment_rotation_race_scenario, 
single_message_per_batch_scenario,
+    tcp_tls_scenario, websocket_tls_scenario,
 };
 use integration::iggy_harness;
 
-#[iggy_harness(server(segment.size = "1MiB"))]
-async fn should_delete_segments_and_validate_filesystem(harness: &TestHarness) 
{
-    let client = harness.tcp_root_client().await.unwrap();
-    let data_path = harness.server().data_path();
-
-    delete_segments_scenario::run(&client, &data_path).await;
-}
-
 #[iggy_harness(
     test_client_transport = TcpTlsGenerated,
     server(tls = generated)
diff --git 
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs 
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index 4a59a2d28..c8aa5ddc1 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -65,7 +65,12 @@ impl ServerCommandHandler for DeleteSegments {
         let request = ShardRequest::data_plane(namespace, payload);
 
         match shard.send_to_data_plane(request).await? {
-            ShardResponse::DeleteSegments => {
+            ShardResponse::DeleteSegments {
+                deleted_segments,
+                deleted_messages,
+            } => {
+                shard.metrics.decrement_segments(deleted_segments as u32);
+                shard.metrics.decrement_messages(deleted_messages);
                 sender.send_empty_ok_response().await?;
             }
             ShardResponse::ErrorResponse(err) => return Err(err),
diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs 
b/core/server/src/binary/handlers/users/delete_user_handler.rs
index a34a98fbd..7164aab70 100644
--- a/core/server/src/binary/handlers/users/delete_user_handler.rs
+++ b/core/server/src/binary/handlers/users/delete_user_handler.rs
@@ -52,11 +52,11 @@ impl ServerCommandHandler for DeleteUser {
         });
 
         match shard.send_to_control_plane(request).await? {
-            ShardResponse::DeletedUser(_) => {
+            ShardResponse::DeleteUserResponse(_) => {
                 sender.send_empty_ok_response().await?;
             }
             ShardResponse::ErrorResponse(err) => return Err(err),
-            _ => unreachable!("Expected DeletedUser"),
+            _ => unreachable!("Expected DeleteUserResponse"),
         }
 
         Ok(HandlerResult::Finished)
diff --git a/core/server/src/http/users.rs b/core/server/src/http/users.rs
index 2ce8636ff..ea0db9b9f 100644
--- a/core/server/src/http/users.rs
+++ b/core/server/src/http/users.rs
@@ -213,9 +213,9 @@ async fn delete_user(
     });
 
     match state.shard.send_to_control_plane(request).await? {
-        ShardResponse::DeletedUser(_) => Ok(StatusCode::NO_CONTENT),
+        ShardResponse::DeleteUserResponse(_) => Ok(StatusCode::NO_CONTENT),
         ShardResponse::ErrorResponse(err) => Err(err.into()),
-        _ => unreachable!("Expected DeletedUser"),
+        _ => unreachable!("Expected DeleteUserResponse"),
     }
 }
 
diff --git a/core/server/src/shard/execution.rs 
b/core/server/src/shard/execution.rs
index a71244dac..df76b80ee 100644
--- a/core/server/src/shard/execution.rs
+++ b/core/server/src/shard/execution.rs
@@ -46,8 +46,6 @@ use iggy_common::{
     purge_stream::PurgeStream, purge_topic::PurgeTopic, 
update_permissions::UpdatePermissions,
     update_stream::UpdateStream, update_topic::UpdateTopic, 
update_user::UpdateUser,
 };
-use std::rc::Rc;
-
 pub struct DeleteStreamResult {
     pub stream_id: usize,
 }
@@ -65,7 +63,7 @@ pub struct DeletePartitionsResult {
 }
 
 pub async fn execute_create_stream(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: CreateStream,
 ) -> Result<StreamResponseData, IggyError> {
@@ -98,7 +96,7 @@ pub async fn execute_create_stream(
 }
 
 pub async fn execute_update_stream(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: UpdateStream,
 ) -> Result<(), IggyError> {
@@ -116,7 +114,7 @@ pub async fn execute_update_stream(
 }
 
 pub async fn execute_delete_stream(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: DeleteStream,
 ) -> Result<DeleteStreamResult, IggyError> {
@@ -165,7 +163,7 @@ pub async fn execute_delete_stream(
 }
 
 pub async fn execute_purge_stream(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: PurgeStream,
 ) -> Result<(), IggyError> {
@@ -173,6 +171,7 @@ pub async fn execute_purge_stream(
     shard.metadata.perm_purge_stream(user_id, stream.id())?;
 
     shard.purge_stream(stream).await?;
+    shard.purge_stream_local(stream).await?;
 
     shard
         .state
@@ -191,7 +190,7 @@ pub async fn execute_purge_stream(
 }
 
 pub async fn execute_create_topic(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: CreateTopic,
 ) -> Result<TopicResponseData, IggyError> {
@@ -260,7 +259,7 @@ pub async fn execute_create_topic(
 }
 
 pub async fn execute_update_topic(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: UpdateTopic,
 ) -> Result<(), IggyError> {
@@ -287,7 +286,7 @@ pub async fn execute_update_topic(
 }
 
 pub async fn execute_delete_topic(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: DeleteTopic,
 ) -> Result<DeleteTopicResult, IggyError> {
@@ -327,7 +326,7 @@ pub async fn execute_delete_topic(
 }
 
 pub async fn execute_purge_topic(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: PurgeTopic,
 ) -> Result<(), IggyError> {
@@ -337,6 +336,7 @@ pub async fn execute_purge_topic(
         .perm_purge_topic(user_id, topic.stream_id, topic.topic_id)?;
 
     shard.purge_topic(topic).await?;
+    shard.purge_topic_local(topic).await?;
 
     shard
         .state
@@ -357,7 +357,7 @@ pub async fn execute_purge_topic(
 }
 
 pub async fn execute_create_partitions(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: CreatePartitions,
 ) -> Result<CreatePartitionsResult, IggyError> {
@@ -400,7 +400,7 @@ pub async fn execute_create_partitions(
 }
 
 pub async fn execute_delete_partitions(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: DeletePartitions,
 ) -> Result<DeletePartitionsResult, IggyError> {
@@ -446,7 +446,7 @@ pub async fn execute_delete_partitions(
 }
 
 pub async fn execute_create_consumer_group(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: CreateConsumerGroup,
 ) -> Result<ConsumerGroupResponseData, IggyError> {
@@ -482,7 +482,7 @@ pub async fn execute_create_consumer_group(
 }
 
 pub async fn execute_delete_consumer_group(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: DeleteConsumerGroup,
 ) -> Result<(), IggyError> {
@@ -513,7 +513,7 @@ pub async fn execute_delete_consumer_group(
 }
 
 pub fn execute_join_consumer_group(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     client_id: u32,
     command: JoinConsumerGroup,
@@ -530,7 +530,7 @@ pub fn execute_join_consumer_group(
 }
 
 pub fn execute_leave_consumer_group(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     client_id: u32,
     command: LeaveConsumerGroup,
@@ -547,7 +547,7 @@ pub fn execute_leave_consumer_group(
 }
 
 pub async fn execute_create_user(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: iggy_common::create_user::CreateUser,
 ) -> Result<User, IggyError> {
@@ -578,7 +578,7 @@ pub async fn execute_create_user(
 }
 
 pub async fn execute_delete_user(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: DeleteUser,
 ) -> Result<User, IggyError> {
@@ -595,7 +595,7 @@ pub async fn execute_delete_user(
 }
 
 pub async fn execute_update_user(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: UpdateUser,
 ) -> Result<User, IggyError> {
@@ -612,7 +612,7 @@ pub async fn execute_update_user(
 }
 
 pub async fn execute_change_password(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: ChangePassword,
 ) -> Result<(), IggyError> {
@@ -643,7 +643,7 @@ pub async fn execute_change_password(
 }
 
 pub async fn execute_update_permissions(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: UpdatePermissions,
 ) -> Result<(), IggyError> {
@@ -665,7 +665,7 @@ pub async fn execute_update_permissions(
 }
 
 pub async fn execute_create_personal_access_token(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: CreatePersonalAccessToken,
 ) -> Result<(PersonalAccessToken, String), IggyError> {
@@ -687,7 +687,7 @@ pub async fn execute_create_personal_access_token(
 }
 
 pub async fn execute_delete_personal_access_token(
-    shard: &Rc<IggyShard>,
+    shard: &IggyShard,
     user_id: u32,
     command: DeletePersonalAccessToken,
 ) -> Result<(), IggyError> {
diff --git a/core/server/src/shard/handlers.rs 
b/core/server/src/shard/handlers.rs
index 89783476e..a2e14800c 100644
--- a/core/server/src/shard/handlers.rs
+++ b/core/server/src/shard/handlers.rs
@@ -120,15 +120,18 @@ async fn handle_request(
         }
         ShardRequestPayload::DeleteSegments { segments_count } => {
             let ns = namespace.expect("DeleteSegments requires routing 
namespace");
-            shard
-                .delete_segments(
+            let (deleted_segments, deleted_messages) = shard
+                .delete_oldest_segments(
                     ns.stream_id(),
                     ns.topic_id(),
                     ns.partition_id(),
                     segments_count,
                 )
                 .await?;
-            Ok(ShardResponse::DeleteSegments)
+            Ok(ShardResponse::DeleteSegments {
+                deleted_segments,
+                deleted_messages,
+            })
         }
         ShardRequestPayload::CleanTopicMessages {
             stream_id,
@@ -220,7 +223,7 @@ async fn handle_request(
                 "DeleteUserRequest should only be handled by shard0"
             );
             let user = execution::execute_delete_user(shard, user_id, 
command).await?;
-            Ok(ShardResponse::DeletedUser(user))
+            Ok(ShardResponse::DeleteUserResponse(user))
         }
         ShardRequestPayload::UpdateStreamRequest { user_id, command } => {
             assert_eq!(
@@ -486,7 +489,7 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: 
ShardEvent) -> Result<()
         }
         ShardEvent::PurgedStream { stream_id } => {
             let stream = shard.resolve_stream(&stream_id)?;
-            shard.purge_stream(stream).await?;
+            shard.purge_stream_local(stream).await?;
             Ok(())
         }
         ShardEvent::PurgedTopic {
@@ -494,7 +497,7 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: 
ShardEvent) -> Result<()
             topic_id,
         } => {
             let topic = shard.resolve_topic(&stream_id, &topic_id)?;
-            shard.purge_topic(topic).await?;
+            shard.purge_topic_local(topic).await?;
             Ok(())
         }
         ShardEvent::AddressBound { protocol, address } => {
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 6d89b1523..7e9f9ae6d 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -65,7 +65,7 @@ pub use communication::calculate_shard_assignment;
 
 pub const COMPONENT: &str = "SHARD";
 pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
-pub const BROADCAST_TIMEOUT: Duration = Duration::from_secs(500);
+pub const BROADCAST_TIMEOUT: Duration = Duration::from_secs(20);
 
 pub struct IggyShard {
     pub id: u16,
diff --git a/core/server/src/shard/system/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
index 85c032d7f..e8f76cde5 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -391,4 +391,48 @@ impl IggyShard {
     pub async fn delete_consumer_offset_from_disk(&self, path: &str) -> 
Result<(), IggyError> {
         
crate::streaming::partitions::storage::delete_persisted_offset(path).await
     }
+
+    /// Enumerates and deletes all consumer/group offset files for a partition 
from disk.
+    /// Uses filesystem paths from config rather than in-memory state (which 
may already be cleared).
+    pub async fn delete_all_consumer_offset_files(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+    ) -> Result<(), IggyError> {
+        let consumers_path =
+            self.config
+                .system
+                .get_consumer_offsets_path(stream_id, topic_id, partition_id);
+        let groups_path =
+            self.config
+                .system
+                .get_consumer_group_offsets_path(stream_id, topic_id, 
partition_id);
+
+        Self::delete_all_files_in_dir(&consumers_path).await?;
+        Self::delete_all_files_in_dir(&groups_path).await?;
+        Ok(())
+    }
+
+    async fn delete_all_files_in_dir(dir: &str) -> Result<(), IggyError> {
+        let entries = match std::fs::read_dir(dir) {
+            Ok(entries) => entries,
+            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return 
Ok(()),
+            Err(e) => {
+                return Err(IggyError::IoError(format!(
+                    "Failed to read directory {dir}: {e}"
+                )));
+            }
+        };
+        for entry in entries.flatten() {
+            let path = entry.path();
+            if path.is_file() {
+                crate::streaming::partitions::storage::delete_persisted_offset(
+                    &path.to_string_lossy(),
+                )
+                .await?;
+            }
+        }
+        Ok(())
+    }
 }
diff --git a/core/server/src/shard/system/segments.rs 
b/core/server/src/shard/system/segments.rs
index 8940350ff..e71be0d7a 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -270,101 +270,149 @@ impl IggyShard {
         Ok((1, messages_in_segment))
     }
 
-    pub(crate) async fn delete_segments(
+    /// Deletes the N oldest **sealed** segments from a partition, preserving 
the active segment
+    /// and partition offset. Reuses `remove_segment_by_offset` — same logic 
as the message cleaner.
+    ///
+    /// Segments containing unconsumed messages are protected: a segment is 
only eligible for
+    /// deletion if `end_offset <= min_committed_offset` across all consumers 
and consumer groups.
+    /// If no consumers exist, there is no barrier.
+    pub(crate) async fn delete_oldest_segments(
         &self,
-        stream: usize,
-        topic: usize,
+        stream_id: usize,
+        topic_id: usize,
         partition_id: usize,
         segments_count: u32,
+    ) -> Result<(u64, u64), IggyError> {
+        let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
+
+        let sealed_offsets: Vec<u64> = {
+            let partitions = self.local_partitions.borrow();
+            let Some(partition) = partitions.get(&ns) else {
+                return Ok((0, 0));
+            };
+
+            let min_committed = Self::min_committed_offset(
+                &partition.consumer_offsets,
+                &partition.consumer_group_offsets,
+            );
+
+            let segments = partition.log.segments();
+            let last_idx = segments.len().saturating_sub(1);
+
+            segments
+                .iter()
+                .enumerate()
+                .filter(|(idx, seg)| {
+                    *idx != last_idx
+                        && seg.sealed
+                        && min_committed.is_none_or(|barrier| seg.end_offset 
<= barrier)
+                })
+                .map(|(_, seg)| seg.start_offset)
+                .take(segments_count as usize)
+                .collect()
+        };
+
+        let mut total_segments = 0u64;
+        let mut total_messages = 0u64;
+        for offset in sealed_offsets {
+            let (s, m) = self
+                .remove_segment_by_offset(stream_id, topic_id, partition_id, 
offset)
+                .await?;
+            total_segments += s;
+            total_messages += m;
+        }
+        Ok((total_segments, total_messages))
+    }
+
+    /// Returns the minimum committed offset across all consumers and consumer 
groups,
+    /// or `None` if no consumers exist (no barrier).
+    fn min_committed_offset(
+        consumer_offsets: 
&crate::streaming::partitions::consumer_offsets::ConsumerOffsets,
+        consumer_group_offsets: 
&crate::streaming::partitions::consumer_group_offsets::ConsumerGroupOffsets,
+    ) -> Option<u64> {
+        let co_guard = consumer_offsets.pin();
+        let cg_guard = consumer_group_offsets.pin();
+        let consumers = co_guard
+            .iter()
+            .map(|(_, co)| 
co.offset.load(std::sync::atomic::Ordering::Relaxed));
+        let groups = cg_guard
+            .iter()
+            .map(|(_, co)| 
co.offset.load(std::sync::atomic::Ordering::Relaxed));
+        consumers.chain(groups).min()
+    }
+
+    /// Drains all segments, deletes their files, and re-initializes the 
partition log at offset 0.
+    /// Used exclusively by `purge_topic_inner` — a destructive full reset.
+    pub(crate) async fn purge_all_segments(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
     ) -> Result<(), IggyError> {
-        let namespace = IggyNamespace::new(stream, topic, partition_id);
+        let namespace = IggyNamespace::new(stream_id, topic_id, partition_id);
 
-        // Drain segments from local_partitions
         let (segments, storages, stats) = {
             let mut partitions = self.local_partitions.borrow_mut();
             let partition = partitions
                 .get_mut(&namespace)
-                .expect("delete_segments_base: partition must exist in 
local_partitions");
+                .expect("purge_all_segments: partition must exist in 
local_partitions");
 
             let upperbound = partition.log.segments().len();
-            let begin = upperbound.saturating_sub(segments_count as usize);
             let segments = partition
                 .log
                 .segments_mut()
-                .drain(begin..upperbound)
+                .drain(..upperbound)
                 .collect::<Vec<_>>();
             let storages = partition
                 .log
                 .storages_mut()
-                .drain(begin..upperbound)
+                .drain(..upperbound)
                 .collect::<Vec<_>>();
             let _ = partition
                 .log
                 .indexes_mut()
-                .drain(begin..upperbound)
+                .drain(..upperbound)
                 .collect::<Vec<_>>();
             (segments, storages, partition.stats.clone())
         };
 
         for (mut storage, segment) in 
storages.into_iter().zip(segments.into_iter()) {
             let (msg_writer, index_writer) = storage.shutdown();
-            if let Some(msg_writer) = msg_writer
-                && let Some(index_writer) = index_writer
-            {
-                // We need to fsync before closing to ensure all data is 
written to disk.
-                msg_writer.fsync().await?;
-                index_writer.fsync().await?;
+            let start_offset = segment.start_offset;
+
+            let log_path = if let Some(msg_writer) = msg_writer {
                 let path = msg_writer.path();
                 drop(msg_writer);
-                drop(index_writer);
-                // File might not exist if never actually written to disk 
(lazy creation)
-                match compio::fs::remove_file(&path).await {
-                    Ok(()) => {}
-                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
-                        tracing::debug!(
-                            "Segment file already gone or never created at 
path: {}",
-                            path
-                        );
-                    }
-                    Err(e) => {
-                        tracing::error!(
-                            "Failed to delete segment file at path: {}, err: 
{}",
-                            path,
-                            e
-                        );
-                        return Err(IggyError::CannotDeleteFile);
-                    }
-                }
+                path
             } else {
-                let start_offset = segment.start_offset;
-                let path = self.config.system.get_messages_file_path(
-                    stream,
-                    topic,
+                self.config.system.get_messages_file_path(
+                    stream_id,
+                    topic_id,
                     partition_id,
                     start_offset,
-                );
-                // File might not exist if segment was never written to (lazy 
creation)
-                match compio::fs::remove_file(&path).await {
+                )
+            };
+            drop(index_writer);
+
+            let index_path =
+                self.config
+                    .system
+                    .get_index_path(stream_id, topic_id, partition_id, 
start_offset);
+
+            for path in [&log_path, &index_path] {
+                match compio::fs::remove_file(path).await {
                     Ok(()) => {}
                     Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
-                        tracing::debug!(
-                            "Segment file already gone or never created at 
path: {}",
-                            path
-                        );
+                        tracing::debug!("File already gone at path: {path}");
                     }
                     Err(e) => {
-                        tracing::error!(
-                            "Failed to delete segment file at path: {}, err: 
{}",
-                            path,
-                            e
-                        );
+                        tracing::error!("Failed to delete file at path: 
{path}, err: {e}");
                         return Err(IggyError::CannotDeleteFile);
                     }
                 }
             }
         }
 
-        // Add segment directly to local_partitions
         self.init_log_in_local_partitions(&namespace).await?;
         stats.increment_segments_count(1);
         Ok(())
@@ -374,9 +422,7 @@ impl IggyShard {
     ///
     /// The log is momentarily empty between `delete_segments`' drain and this 
call, which is
     /// safe because the message pump serializes all handlers — no concurrent 
operation can
-    /// observe the empty state. The new segment reuses the offset-0 file 
path; writers
-    /// open with `truncate(true)` when `!file_exists` to clear any stale data 
from a
-    /// previous incarnation at the same path.
+    /// observe the empty state.
     async fn init_log_in_local_partitions(
         &self,
         namespace: &IggyNamespace,
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index afccda732..408e8d2d4 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -146,6 +146,7 @@ impl IggyShard {
         Ok(stream_info)
     }
 
+    /// Clears in-memory state for all topics in a stream.
     pub async fn purge_stream(&self, stream: ResolvedStream) -> Result<(), 
IggyError> {
         let stream_id = stream.id();
         let topic_ids = self.metadata.get_topic_ids(stream_id);
@@ -160,4 +161,20 @@ impl IggyShard {
 
         Ok(())
     }
+
+    /// Disk cleanup for local partitions across all topics in a stream.
+    pub(crate) async fn purge_stream_local(&self, stream: ResolvedStream) -> 
Result<(), IggyError> {
+        let stream_id = stream.id();
+        let topic_ids = self.metadata.get_topic_ids(stream_id);
+
+        for topic_id in topic_ids {
+            let topic = ResolvedTopic {
+                stream_id,
+                topic_id,
+            };
+            self.purge_topic_local(topic).await?;
+        }
+
+        Ok(())
+    }
 }
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index dc87cda80..24d53d220 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -193,69 +193,59 @@ impl IggyShard {
         Ok(topic_info)
     }
 
+    /// Clears in-memory state for a topic: consumer offsets and stats.
+    /// Called on the control plane before broadcasting to other shards.
     pub async fn purge_topic(&self, topic: ResolvedTopic) -> Result<(), 
IggyError> {
         let stream = topic.stream_id;
         let topic_id = topic.topic_id;
-
         let partition_ids = self.metadata.get_partition_ids(stream, topic_id);
 
-        let mut all_consumer_paths = Vec::new();
-        let mut all_group_paths = Vec::new();
-
-        for partition_id in &partition_ids {
-            let ns = IggyNamespace::new(stream, topic_id, *partition_id);
-            if let Some(partition) = self.local_partitions.borrow().get(&ns) {
-                all_consumer_paths.extend(
-                    partition
-                        .consumer_offsets
-                        .pin()
-                        .iter()
-                        .map(|item| item.1.path.clone()),
-                );
-                all_group_paths.extend(
-                    partition
-                        .consumer_group_offsets
-                        .pin()
-                        .iter()
-                        .map(|item| item.1.path.clone()),
-                );
+        for &partition_id in &partition_ids {
+            if let Some(offsets) =
+                self.metadata
+                    .get_partition_consumer_offsets(stream, topic_id, 
partition_id)
+            {
+                offsets.pin().clear();
+            }
+            if let Some(offsets) =
+                self.metadata
+                    .get_partition_consumer_group_offsets(stream, topic_id, 
partition_id)
+            {
+                offsets.pin().clear();
             }
         }
 
-        for path in all_consumer_paths {
-            self.delete_consumer_offset_from_disk(&path).await?;
+        if let Some(topic_stats) = self.metadata.get_topic_stats(stream, 
topic_id) {
+            topic_stats.zero_out_all();
         }
-        for path in all_group_paths {
-            self.delete_consumer_offset_from_disk(&path).await?;
+
+        for &partition_id in &partition_ids {
+            let ns = IggyNamespace::new(stream, topic_id, partition_id);
+            if let Some(partition_stats) = 
self.metadata.get_partition_stats(&ns) {
+                partition_stats.zero_out_all();
+            }
         }
 
-        self.purge_topic_inner(topic).await
+        Ok(())
     }
 
-    pub(crate) async fn purge_topic_inner(&self, topic: ResolvedTopic) -> 
Result<(), IggyError> {
+    /// Disk cleanup for local partitions: deletes consumer offset files and 
purges segments.
+    /// Called on each shard (including shard 0) after in-memory state is 
cleared.
+    pub(crate) async fn purge_topic_local(&self, topic: ResolvedTopic) -> 
Result<(), IggyError> {
         let stream = topic.stream_id;
         let topic_id = topic.topic_id;
         let partition_ids = self.metadata.get_partition_ids(stream, topic_id);
 
         for &partition_id in &partition_ids {
             let ns = IggyNamespace::new(stream, topic_id, partition_id);
-
-            let has_partition = self.local_partitions.borrow().contains(&ns);
-            if has_partition {
-                self.delete_segments(stream, topic_id, partition_id, u32::MAX)
-                    .await?;
+            if !self.local_partitions.borrow().contains(&ns) {
+                continue;
             }
-        }
-
-        if let Some(topic_stats) = self.metadata.get_topic_stats(stream, 
topic_id) {
-            topic_stats.zero_out_all();
-        }
 
-        for &partition_id in &partition_ids {
-            let ns = IggyNamespace::new(stream, topic_id, partition_id);
-            if let Some(partition_stats) = 
self.metadata.get_partition_stats(&ns) {
-                partition_stats.zero_out_all();
-            }
+            self.delete_all_consumer_offset_files(stream, topic_id, 
partition_id)
+                .await?;
+            self.purge_all_segments(stream, topic_id, partition_id)
+                .await?;
         }
 
         Ok(())
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index 64007a4ea..b243e26dd 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -42,13 +42,13 @@ pub enum ShardEvent {
         partition_id: usize,
         fsync: bool,
     },
-    /// Purge all messages from a stream (requires per-shard log truncation)
-    PurgedStream { stream_id: Identifier },
-    /// Purge all messages from a topic (requires per-shard log truncation)
+    /// Purge all messages, consumer groups and consumer group offsets from a 
topic
     PurgedTopic {
         stream_id: Identifier,
         topic_id: Identifier,
     },
+    /// Purges all topics in a stream
+    PurgedStream { stream_id: Identifier },
     /// New partitions created (requires per-shard log initialization)
     CreatedPartitions {
         stream_id: Identifier,
diff --git a/core/server/src/shard/transmission/frame.rs 
b/core/server/src/shard/transmission/frame.rs
index c8cc585a9..e6d630837 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -63,7 +63,10 @@ pub enum ShardResponse {
     FlushUnsavedBuffer {
         flushed_count: u32,
     },
-    DeleteSegments,
+    DeleteSegments {
+        deleted_segments: u64,
+        deleted_messages: u64,
+    },
     CleanTopicMessages {
         deleted_segments: u64,
         deleted_messages: u64,
@@ -75,7 +78,7 @@ pub enum ShardResponse {
     UpdateTopicResponse,
     DeleteTopicResponse(usize),
     CreateUserResponse(User),
-    DeletedUser(User),
+    DeleteUserResponse(User),
     GetStatsResponse(Stats),
     CreatePartitionsResponse(Vec<usize>),
     DeletePartitionsResponse(Vec<usize>),
diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs 
b/core/server/src/streaming/segments/indexes/index_writer.rs
index fb5a0a533..bab61fe9a 100644
--- a/core/server/src/streaming/segments/indexes/index_writer.rs
+++ b/core/server/src/streaming/segments/indexes/index_writer.rs
@@ -50,11 +50,6 @@ impl IndexWriter {
     ) -> Result<Self, IggyError> {
         let mut opts = OpenOptions::new();
         opts.create(true).write(true);
-        if !file_exists {
-            // When creating a fresh segment at a reused path (e.g. offset 0 
after all segments
-            // were deleted), truncate to clear any stale data from a previous 
incarnation.
-            opts.truncate(true);
-        }
         let file = opts
             .open(file_path)
             .await
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs 
b/core/server/src/streaming/segments/messages/messages_writer.rs
index 525f4292a..9b9f08c82 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -52,11 +52,6 @@ impl MessagesWriter {
     ) -> Result<Self, IggyError> {
         let mut opts = OpenOptions::new();
         opts.create(true).write(true);
-        if !file_exists {
-            // When creating a fresh segment at a reused path (e.g. offset 0 
after all segments
-            // were deleted), truncate to clear any stale data from a previous 
incarnation.
-            opts.truncate(true);
-        }
         let file = opts
             .open(file_path)
             .await

Reply via email to