This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch command-handler-impr in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0e4a9883864ee01603cf401c2051214d29bc4136 Author: Hubert Gruszecki <[email protected]> AuthorDate: Thu Feb 12 16:43:00 2026 +0100 fix(server): delete oldest segments instead of newest in DeleteSegments API delete_segments drained from the end of the segment vec, removing the newest (including active) segments. It then called init_log_in_local_partitions which reset the partition to offset 0, destroying surviving data and leaving consumers with stale offsets. Replace with delete_oldest_segments that removes sealed segments oldest-first via remove_segment_by_offset (same path as the message cleaner). A consumer offset barrier prevents deleting segments with unconsumed messages. The active segment and partition offset are never touched. Split purge_topic into in-memory (purge_topic) and disk (purge_topic_local) phases. The disk phase uses the new purge_all_segments which keeps the original drain-and-reset behavior appropriate for full topic resets. --- core/integration/tests/server/mod.rs | 1 + core/integration/tests/server/purge_delete.rs | 96 ++ .../server/scenarios/delete_segments_scenario.rs | 229 ---- core/integration/tests/server/scenarios/mod.rs | 2 +- .../server/scenarios/purge_delete_scenario.rs | 1107 ++++++++++++++++++++ core/integration/tests/server/specific.rs | 12 +- .../handlers/segments/delete_segments_handler.rs | 7 +- .../binary/handlers/users/delete_user_handler.rs | 4 +- core/server/src/http/users.rs | 4 +- core/server/src/shard/execution.rs | 46 +- core/server/src/shard/handlers.rs | 15 +- core/server/src/shard/mod.rs | 2 +- core/server/src/shard/system/consumer_offsets.rs | 44 + core/server/src/shard/system/segments.rs | 156 ++- core/server/src/shard/system/streams.rs | 17 + core/server/src/shard/system/topics.rs | 74 +- core/server/src/shard/transmission/event.rs | 6 +- core/server/src/shard/transmission/frame.rs | 7 +- .../src/streaming/segments/indexes/index_writer.rs | 5 - .../streaming/segments/messages/messages_writer.rs | 5 - 20 files changed, 1452 insertions(+), 387 deletions(-) diff --git a/core/integration/tests/server/mod.rs b/core/integration/tests/server/mod.rs index f3b7a584a..6ea1338af 100644 --- a/core/integration/tests/server/mod.rs +++ b/core/integration/tests/server/mod.rs @@ -21,5 +21,6 @@ mod concurrent_addition; mod general; mod message_cleanup; mod message_retrieval; +mod purge_delete; mod scenarios; mod specific; diff --git a/core/integration/tests/server/purge_delete.rs b/core/integration/tests/server/purge_delete.rs new file mode 100644 index 000000000..43298956d --- /dev/null +++ b/core/integration/tests/server/purge_delete.rs @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::server::scenarios::purge_delete_scenario; +use integration::iggy_harness; +use test_case::test_matrix; + +#[iggy_harness(server( + segment.size = "5KiB", + segment.cache_indexes = ["all", "none", "open_segment"], + partition.messages_required_to_save = "1", + partition.enforce_fsync = "true", +))] +#[test_matrix([restart_off(), restart_on()])] +async fn should_delete_segments_and_validate_filesystem( + harness: &mut TestHarness, + restart_server: bool, +) { + purge_delete_scenario::run(harness, restart_server).await; +} + +#[iggy_harness(server( + segment.size = "5KiB", + segment.cache_indexes = ["all", "none", "open_segment"], + partition.messages_required_to_save = "1", + partition.enforce_fsync = "true", +))] +#[test_matrix([restart_off(), restart_on()])] +async fn should_delete_segments_without_consumers(harness: &mut TestHarness, restart_server: bool) { + purge_delete_scenario::run_no_consumers(harness, restart_server).await; +} + +#[iggy_harness(server( + segment.size = "5KiB", + segment.cache_indexes = ["all", "none", "open_segment"], + partition.messages_required_to_save = "1", + partition.enforce_fsync = "true", +))] +async fn should_delete_segments_with_consumer_group_barrier(harness: &TestHarness) { + let client = harness.tcp_root_client().await.unwrap(); + let data_path = harness.server().data_path(); + + purge_delete_scenario::run_consumer_group_barrier(&client, &data_path).await; +} + +#[iggy_harness(server( + segment.size = "5KiB", + segment.cache_indexes = ["all", "none", "open_segment"], + partition.messages_required_to_save = "1", + partition.enforce_fsync = "true", +))] +#[test_matrix([restart_off(), restart_on()])] +async fn should_block_deletion_until_all_consumers_pass_segment( + harness: &mut TestHarness, + restart_server: bool, +) { + purge_delete_scenario::run_multi_consumer_barrier(harness, restart_server).await; +} + +#[iggy_harness(server( + segment.size = "5KiB", + segment.cache_indexes = ["all", "none", "open_segment"], + partition.messages_required_to_save = "1", + partition.enforce_fsync = "true", +))] +#[test_matrix([restart_off(), restart_on()])] +async fn should_purge_topic_and_clear_consumer_offsets( + harness: &mut TestHarness, + restart_server: bool, +) { + purge_delete_scenario::run_purge_topic(harness, restart_server).await; +} + +fn restart_off() -> bool { + false +} + +fn restart_on() -> bool { + true +} diff --git a/core/integration/tests/server/scenarios/delete_segments_scenario.rs b/core/integration/tests/server/scenarios/delete_segments_scenario.rs deleted file mode 100644 index dad38fbe4..000000000 --- a/core/integration/tests/server/scenarios/delete_segments_scenario.rs +++ /dev/null @@ -1,229 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use bytes::Bytes; -use iggy::prelude::*; -use std::fs::{DirEntry, read_dir}; -use std::path::Path; - -const STREAM_NAME: &str = "test_stream"; -const TOPIC_NAME: &str = "test_topic"; -const PARTITION_ID: u32 = 0; -const LOG_EXTENSION: &str = "log"; - -pub async fn run(client: &IggyClient, data_path: &Path) { - let stream = client.create_stream(STREAM_NAME).await.unwrap(); - let stream_id = stream.id; - - let topic = client - .create_topic( - &Identifier::named(STREAM_NAME).unwrap(), - TOPIC_NAME, - 1, - CompressionAlgorithm::None, - None, - IggyExpiry::NeverExpire, - MaxTopicSize::ServerDefault, - ) - .await - .unwrap(); - let topic_id = topic.id; - - // Send 5 large messages to create multiple segments - let large_payload = "A".repeat(1024 * 1024); - - for i in 0..5 { - let message = IggyMessage::builder() - .id(i as u128) - .payload(Bytes::from(large_payload.clone())) - .build() - .expect("Failed to create message"); - - let mut messages = vec![message]; - client - .send_messages( - &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::named(TOPIC_NAME).unwrap(), - &Partitioning::partition_id(PARTITION_ID), - &mut messages, - ) - .await - .unwrap(); - } - - // Wait for segments to be persisted and closed - tokio::time::sleep(std::time::Duration::from_millis(2000)).await; - - // Check initial segment count on filesystem - let partition_path = data_path - .join(format!( - "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}" - )) - .display() - .to_string(); - - let initial_segments = get_segment_paths_for_partition(&partition_path); - println!( - "Initial segments: {:?}", - initial_segments - .iter() - .map(|e| e.file_name()) - .collect::<Vec<_>>() - ); - - assert!( - initial_segments.len() >= 3, - "Expected at least 3 segments but got {}. This might mean the segment size is too large or messages aren't being flushed.", - initial_segments.len() - ); - - let initial_count = initial_segments.len(); - - // Test delete segments command - keep only 2 segments - let segments_to_keep = 2u32; - let result = client - .delete_segments( - &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::named(TOPIC_NAME).unwrap(), - PARTITION_ID, - segments_to_keep, - ) - .await; - - assert!( - result.is_ok(), - "Delete segments command should succeed: {result:?}" - ); - - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - - // Verify segments were deleted from filesystem - let remaining_segments = get_segment_paths_for_partition(&partition_path); - println!( - "Remaining segments: {:?}", - remaining_segments - .iter() - .map(|e| e.file_name()) - .collect::<Vec<_>>() - ); - - // Should have at most segments_to_keep + 1 (closed + open segments) - assert!( - remaining_segments.len() <= (segments_to_keep + 1) as usize, - "Expected at most {} segments but got {}", - segments_to_keep + 1, - remaining_segments.len() - ); - - assert!( - remaining_segments.len() < initial_count, - "Expected fewer segments after deletion. Initial: {}, Remaining: {}", - initial_count, - remaining_segments.len() - ); - - // Test edge cases - let result = client - .delete_segments( - &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::named(TOPIC_NAME).unwrap(), - PARTITION_ID, - 0, - ) - .await; - assert!( - result.is_ok(), - "Delete segments with 0 count should succeed" - ); - - // Test error cases - let result = client - .delete_segments( - &Identifier::numeric(999).unwrap(), - &Identifier::named(TOPIC_NAME).unwrap(), - PARTITION_ID, - 1, - ) - .await; - assert!( - result.is_err(), - "Delete segments on non-existent stream should fail" - ); - - let result = client - .delete_segments( - &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::numeric(999).unwrap(), - PARTITION_ID, - 1, - ) - .await; - assert!( - result.is_err(), - "Delete segments on non-existent topic should fail" - ); - - let result = client - .delete_segments( - &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::named(TOPIC_NAME).unwrap(), - 999, - 1, - ) - .await; - assert!( - result.is_err(), - "Delete segments on non-existent partition should fail" - ); - - client - .delete_topic( - &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::named(TOPIC_NAME).unwrap(), - ) - .await - .unwrap(); - - client - .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) - .await - .unwrap(); -} - -fn get_segment_paths_for_partition(partition_path: &str) -> Vec<DirEntry> { - read_dir(partition_path) - .map(|read_dir| { - read_dir - .filter_map(|dir_entry| { - dir_entry - .map(|dir_entry| { - match dir_entry - .path() - .extension() - .is_some_and(|ext| ext == LOG_EXTENSION) - { - true => Some(dir_entry), - false => None, - } - }) - .ok() - .flatten() - }) - .collect::<Vec<_>>() - }) - .unwrap_or_default() -} diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index a8ce05e88..22b08f596 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -28,7 +28,6 @@ pub mod consumer_group_with_single_client_polling_messages_scenario; pub mod consumer_timestamp_polling_scenario; pub mod create_message_payload; pub mod cross_protocol_pat_scenario; -pub mod delete_segments_scenario; pub mod encryption_scenario; pub mod log_rotation_scenario; pub mod message_cleanup_scenario; @@ -36,6 +35,7 @@ pub mod message_headers_scenario; pub mod message_size_scenario; pub mod offset_scenario; pub mod permissions_scenario; +pub mod purge_delete_scenario; pub mod read_during_persistence_scenario; pub mod segment_rotation_race_scenario; pub mod single_message_per_batch_scenario; diff --git a/core/integration/tests/server/scenarios/purge_delete_scenario.rs b/core/integration/tests/server/scenarios/purge_delete_scenario.rs new file mode 100644 index 000000000..619a2af0d --- /dev/null +++ b/core/integration/tests/server/scenarios/purge_delete_scenario.rs @@ -0,0 +1,1107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Bytes; +use iggy::prelude::*; +use iggy_common::Credentials; +use integration::harness::TestHarness; +use std::fs::{metadata, read_dir}; +use std::path::Path; + +const STREAM_NAME: &str = "test_stream"; +const TOPIC_NAME: &str = "test_topic"; +const PARTITION_ID: u32 = 0; +const LOG_EXTENSION: &str = "log"; +const INDEX_EXTENSION: &str = "index"; + +/// Payload chosen so IGGY_MESSAGE_HEADER_SIZE + payload = 1000B per message on disk. +/// +/// Rotation mechanics (with segment.size = 5KiB = 5120B, messages_required_to_save = 1): +/// `is_full()` checks `size >= 5120` BEFORE persisting the current message. +/// After 6 persisted messages (6000B >= 5120) the next arrival sees is_full=true, +/// gets persisted into the same segment, then rotation fires. +/// Result: 7 messages per sealed segment (7000B on disk). +const PAYLOAD_SIZE: usize = 936; +const MESSAGE_ON_DISK_SIZE: u64 = IGGY_MESSAGE_HEADER_SIZE as u64 + PAYLOAD_SIZE as u64; +const INDEX_SIZE_PER_MSG: u64 = INDEX_SIZE as u64; +const TOTAL_MESSAGES: u32 = 25; + +/// 3 sealed segments (7 msgs each) + 1 active (4 msgs at offsets 21-24). +const EXPECTED_SEGMENT_OFFSETS: [u64; 4] = [0, 7, 14, 21]; +const MSGS_PER_SEALED_SEGMENT: u64 = 7; + +/// Single consumer barrier: oldest-first deletion, barrier advancement, and edge cases. +/// +/// Covers: barrier blocks deletion, advancing barrier releases segments, delete(0) no-op, +/// delete(u32::MAX) bulk, consumer not stuck after deletion, error cases for invalid IDs. +pub async fn run(harness: &mut TestHarness, restart_server: bool) { + let client = build_root_client(harness); + client.connect().await.unwrap(); + let data_path = harness.server().data_path().to_path_buf(); + + let stream = client.create_stream(STREAM_NAME).await.unwrap(); + let stream_id = stream.id; + + let topic = client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + let topic_id = topic.id; + + let stream_ident = Identifier::named(STREAM_NAME).unwrap(); + let topic_ident = Identifier::named(TOPIC_NAME).unwrap(); + + send_messages(&client, &stream_ident, &topic_ident, TOTAL_MESSAGES).await; + + let partition_path = partition_path(&data_path, stream_id, topic_id); + + // --- Verify exact segment layout --- + let segment_offsets = get_sorted_segment_offsets(&partition_path); + assert_eq!( + segment_offsets, EXPECTED_SEGMENT_OFFSETS, + "Segment layout must match calculated offsets" + ); + assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS); + + let all_offsets = poll_all_offsets(&client, &stream_ident, &topic_ident).await; + let expected_offsets: Vec<u64> = (0..TOTAL_MESSAGES as u64).collect(); + assert_eq!(all_offsets, expected_offsets); + + // --- Consumer offset barrier --- + // + // stored_offset = 7 (start of segment 1). Segment 0 end_offset = 6 <= 7 → deletable. + // Segment 1 end_offset = 13 > 7 → protected by barrier. + let consumer = Consumer { + kind: ConsumerKind::Consumer, + id: Identifier::numeric(1).unwrap(), + }; + let stored_offset = EXPECTED_SEGMENT_OFFSETS[1]; // 7 + let seg1_end_offset = EXPECTED_SEGMENT_OFFSETS[2] - 1; // 13 + client + .store_consumer_offset( + &consumer, + &stream_ident, + &topic_ident, + Some(PARTITION_ID), + stored_offset, + ) + .await + .unwrap(); + + // --- Delete 1 oldest segment --- + client + .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 1) + .await + .unwrap(); + maybe_restart(harness, restart_server).await; + + assert_eq!( + get_sorted_segment_offsets(&partition_path), + EXPECTED_SEGMENT_OFFSETS[1..], + "Segment 0 deleted → [7, 14, 21]" + ); + assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[1..]); + assert_eq!( + poll_all_offsets(&client, &stream_ident, &topic_ident).await, + (MSGS_PER_SEALED_SEGMENT..TOTAL_MESSAGES as u64).collect::<Vec<_>>(), + "Messages 7..25 survive" + ); + + // --- Barrier prevents deletion --- + client + .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 1) + .await + .unwrap(); + maybe_restart(harness, restart_server).await; + + assert_eq!( + get_sorted_segment_offsets(&partition_path), + EXPECTED_SEGMENT_OFFSETS[1..], + "Segment 1 (end_offset={seg1_end_offset}) blocked: consumer at {stored_offset}" + ); + assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[1..]); + + // --- Advance consumer past segment 1, delete it --- + client + .store_consumer_offset( + &consumer, + &stream_ident, + &topic_ident, + Some(PARTITION_ID), + seg1_end_offset, + ) + .await + .unwrap(); + + client + .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 1) + .await + .unwrap(); + maybe_restart(harness, restart_server).await; + + assert_eq!( + get_sorted_segment_offsets(&partition_path), + EXPECTED_SEGMENT_OFFSETS[2..], + "Segment 1 deleted → [14, 21]" + ); + assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[2..]); + assert_eq!( + poll_all_offsets(&client, &stream_ident, &topic_ident).await, + (2 * MSGS_PER_SEALED_SEGMENT..TOTAL_MESSAGES as u64).collect::<Vec<_>>(), + "Messages 14..25 survive" + ); + + // --- Consumer not stuck --- + let polled_next = client + .poll_messages( + &stream_ident, + &topic_ident, + Some(PARTITION_ID), + &consumer, + &PollingStrategy::next(), + 100, + false, + ) + .await + .unwrap(); + assert_eq!( + polled_next.messages[0].header.offset, EXPECTED_SEGMENT_OFFSETS[2], + "Next poll resumes at offset 14 (first message after stored_offset 13)" + ); + + // --- delete(0) is a no-op --- + client + .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 0) + .await + .unwrap(); + assert_eq!( + get_sorted_segment_offsets(&partition_path), + EXPECTED_SEGMENT_OFFSETS[2..] + ); + assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[2..]); + + // --- delete(u32::MAX) with consumer past all sealed segments --- + client + .store_consumer_offset( + &consumer, + &stream_ident, + &topic_ident, + Some(PARTITION_ID), + (TOTAL_MESSAGES - 1) as u64, + ) + .await + .unwrap(); + + client + .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, u32::MAX) + .await + .unwrap(); + maybe_restart(harness, restart_server).await; + + assert_eq!( + get_sorted_segment_offsets(&partition_path), + [EXPECTED_SEGMENT_OFFSETS[3]], + "Only active segment at offset 21 survives" + ); + assert_no_orphaned_segment_files(&partition_path, 1); + assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[3..]); + assert_eq!( + poll_all_offsets(&client, &stream_ident, &topic_ident).await, + (EXPECTED_SEGMENT_OFFSETS[3]..TOTAL_MESSAGES as u64).collect::<Vec<_>>(), + "Messages 21..25 survive in active segment" + ); + + // --- Error cases --- + assert!( + client + .delete_segments( + &Identifier::numeric(999).unwrap(), + &topic_ident, + PARTITION_ID, + 1, + ) + .await + .is_err(), + "Non-existent stream" + ); + assert!( + client + .delete_segments( + &stream_ident, + &Identifier::numeric(999).unwrap(), + PARTITION_ID, + 1, + ) + .await + .is_err(), + "Non-existent topic" + ); + assert!( + client + .delete_segments(&stream_ident, &topic_ident, 999, 1) + .await + .is_err(), + "Non-existent partition" + ); + + // Cleanup + client + .delete_topic(&stream_ident, &topic_ident) + .await + .unwrap(); + client.delete_stream(&stream_ident).await.unwrap(); +} + +/// No consumers — no barrier: sealed segments are unconditionally deletable. +/// +/// Deletes all 3 sealed segments one by one, verifying .log/.index file sizes and +/// surviving message offsets after each. Active segment is never deleted. +pub async fn run_no_consumers(harness: &mut TestHarness, restart_server: bool) { + let client = build_root_client(harness); + client.connect().await.unwrap(); + let data_path = harness.server().data_path().to_path_buf(); + + let stream = client.create_stream(STREAM_NAME).await.unwrap(); + let stream_id = stream.id; + + let topic = client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + let topic_id = topic.id; + + let stream_ident = Identifier::named(STREAM_NAME).unwrap(); + let topic_ident = Identifier::named(TOPIC_NAME).unwrap(); + + send_messages(&client, &stream_ident, &topic_ident, TOTAL_MESSAGES).await; + + let partition_path = partition_path(&data_path, stream_id, topic_id); + + assert_eq!( + get_sorted_segment_offsets(&partition_path), + EXPECTED_SEGMENT_OFFSETS, + "Segment layout must match calculated offsets" + ); + assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS); + assert_eq!( + poll_all_offsets(&client, &stream_ident, &topic_ident).await, + (0..TOTAL_MESSAGES as u64).collect::<Vec<_>>() + ); + + // Delete 3 sealed segments one by one + let sealed_count = EXPECTED_SEGMENT_OFFSETS.len() - 1; + for i in 0..sealed_count { + client + .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 1) + .await + .unwrap(); + maybe_restart(harness, restart_server).await; + + let first_surviving = EXPECTED_SEGMENT_OFFSETS[i + 1]; + assert_eq!( + get_sorted_segment_offsets(&partition_path), + EXPECTED_SEGMENT_OFFSETS[i + 1..], + "After deleting {n} sealed segment(s)", + n = i + 1 + ); + assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[i + 1..]); + assert_eq!( + poll_all_offsets(&client, &stream_ident, &topic_ident).await, + (first_surviving..TOTAL_MESSAGES as u64).collect::<Vec<_>>(), + "Messages from offset {first_surviving} onward survive" + ); + } + + // Only active segment remains — delete is a no-op + assert_eq!( + get_sorted_segment_offsets(&partition_path), + [EXPECTED_SEGMENT_OFFSETS[3]], + "Only active segment at offset 21" + ); + assert_no_orphaned_segment_files(&partition_path, 1); + + client + .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 1) + .await + .unwrap(); + + assert_eq!( + get_sorted_segment_offsets(&partition_path), + [EXPECTED_SEGMENT_OFFSETS[3]], + "No-op: active segment protected" + ); + assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS[3..]); + assert_eq!( + poll_all_offsets(&client, &stream_ident, &topic_ident).await, + (EXPECTED_SEGMENT_OFFSETS[3]..TOTAL_MESSAGES as u64).collect::<Vec<_>>(), + "Active segment messages still pollable" + ); + + // Cleanup + client + .delete_topic(&stream_ident, &topic_ident) + .await + .unwrap(); + client.delete_stream(&stream_ident).await.unwrap(); +} + +/// Single consumer group barrier with message-by-message progression. +/// +/// Polls one message at a time (next + auto_commit), attempts delete_segments(u32::MAX) after +/// each poll. Verifies that each sealed segment is released exactly when the committed offset +/// reaches its end_offset — not one message earlier, not one later. +/// +/// No restart_server variant — 25 delete_segments calls would mean 25 restarts. +pub async fn run_consumer_group_barrier(client: &IggyClient, data_path: &Path) { + let stream = client.create_stream(STREAM_NAME).await.unwrap(); + let stream_id = stream.id; + + let topic = client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + let topic_id = topic.id; + + let stream_ident = Identifier::named(STREAM_NAME).unwrap(); + let topic_ident = Identifier::named(TOPIC_NAME).unwrap(); + + send_messages(client, &stream_ident, &topic_ident, TOTAL_MESSAGES).await; + + let partition_path = partition_path(data_path, stream_id, topic_id); + + assert_eq!( + get_sorted_segment_offsets(&partition_path), + EXPECTED_SEGMENT_OFFSETS + ); + assert_segment_file_sizes(&partition_path, &EXPECTED_SEGMENT_OFFSETS); + + // Use high-level consumer group API: auto-creates group, auto-joins, auto-commits + let mut consumer = client + .consumer_group("test_group", STREAM_NAME, TOPIC_NAME) + .unwrap() + .auto_commit(AutoCommit::When(AutoCommitWhen::ConsumingEachMessage)) + .create_consumer_group_if_not_exists() + .auto_join_consumer_group() + .polling_strategy(PollingStrategy::next()) + .batch_length(1) + .build(); + consumer.init().await.unwrap(); + + let mut expected_segments = EXPECTED_SEGMENT_OFFSETS.to_vec(); + + for offset in 0..TOTAL_MESSAGES as u64 { + use futures::StreamExt; + let message = consumer + .next() + .await + .expect("stream ended prematurely") + .unwrap(); + + assert_eq!( + message.message.header.offset, offset, + "Expected message at offset {offset}" + ); + + while expected_segments.len() >= 2 { + let seg_end = expected_segments[1] - 1; + if segment_deletable(seg_end, offset) { + expected_segments.remove(0); + } else { + break; + } + } + + client + .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, u32::MAX) + .await + .unwrap(); + + assert_eq!( + get_sorted_segment_offsets(&partition_path), + expected_segments, + "After consuming offset {offset}" + ); + assert_segment_file_sizes(&partition_path, &expected_segments); + } + + assert_eq!( + expected_segments, + [EXPECTED_SEGMENT_OFFSETS[3]], + "Only active segment remains after consuming all messages" + ); + assert_no_orphaned_segment_files(&partition_path, 1); + + // Cleanup: consumer group auto-managed, just delete stream resources + drop(consumer); + client + .delete_topic(&stream_ident, &topic_ident) + .await + .unwrap(); + client.delete_stream(&stream_ident).await.unwrap(); +} + +/// Multiple consumers: the slowest consumer gates deletion for all. +/// +/// A consumer group ("fast") has consumed everything. A standalone consumer ("slow") lags behind. +/// The barrier is `min(fast, slow)`, so deletion is entirely gated by the slow consumer. +/// Advances the slow consumer through each segment boundary, verifying that segments are +/// released only when `segment_deletable(seg_end, barrier)` becomes true. +pub async fn run_multi_consumer_barrier(harness: &mut TestHarness, restart_server: bool) { + let client = build_root_client(harness); + client.connect().await.unwrap(); + let data_path = harness.server().data_path().to_path_buf(); + + let stream = client.create_stream(STREAM_NAME).await.unwrap(); + let stream_id = stream.id; + + let topic = client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + let topic_id = topic.id; + + let stream_ident = Identifier::named(STREAM_NAME).unwrap(); + let topic_ident = Identifier::named(TOPIC_NAME).unwrap(); + + send_messages(&client, &stream_ident, &topic_ident, TOTAL_MESSAGES).await; + + let partition_path = partition_path(&data_path, stream_id, topic_id); + + assert_eq!( + get_sorted_segment_offsets(&partition_path), + EXPECTED_SEGMENT_OFFSETS + ); + + // --- Fast consumer group: poll all messages with auto_commit via high-level API --- + let mut fast_consumer = client + .consumer_group("fast_group", STREAM_NAME, TOPIC_NAME) + .unwrap() + .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) + .create_consumer_group_if_not_exists() + .auto_join_consumer_group() + .polling_strategy(PollingStrategy::offset(0)) + .batch_length(TOTAL_MESSAGES) + .build(); + fast_consumer.init().await.unwrap(); + + { + use futures::StreamExt; + let mut consumed = 0u32; + while let Some(msg) = fast_consumer.next().await { + msg.unwrap(); + consumed += 1; + if consumed >= TOTAL_MESSAGES { + break; + } + } + assert_eq!(consumed, TOTAL_MESSAGES); + } + + // --- Set up slow standalone consumer: store offset at 0 --- + let slow_consumer = Consumer { + kind: ConsumerKind::Consumer, + id: Identifier::numeric(1).unwrap(), + }; + client + .store_consumer_offset( + &slow_consumer, + &stream_ident, + &topic_ident, + Some(PARTITION_ID), + 0, + ) + .await + .unwrap(); + + // Phase 1: barrier=min(24,0)=0 → segment_deletable(6, 0)=false → nothing deleted + assert!(!segment_deletable(6, 0)); + client + .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, u32::MAX) + .await + .unwrap(); + maybe_restart(harness, restart_server).await; + assert_eq!( + get_sorted_segment_offsets(&partition_path), + EXPECTED_SEGMENT_OFFSETS, + "Phase 1: segment_deletable(6, 0)=false, all segments protected" + ); + + // Phase 2: slow→6, barrier=6 → segment_deletable(6, 6)=true → seg0 released + assert!(segment_deletable(6, 6)); + client + .store_consumer_offset( + &slow_consumer, + &stream_ident, + &topic_ident, + Some(PARTITION_ID), + 6, + ) + .await + .unwrap(); + client + .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, u32::MAX) + .await + .unwrap(); + maybe_restart(harness, restart_server).await; + assert_eq!( + get_sorted_segment_offsets(&partition_path), + [7, 14, 21], + "Phase 2: segment_deletable(6, 6)=true, seg0 released" + ); + + // Phase 3: slow→10, barrier=10 → segment_deletable(13, 10)=false → seg1 protected + assert!(!segment_deletable(13, 10)); + client + .store_consumer_offset( + &slow_consumer, + &stream_ident, + &topic_ident, + Some(PARTITION_ID), + 10, + ) + .await + .unwrap(); + client + .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, u32::MAX) + .await + .unwrap(); + maybe_restart(harness, restart_server).await; + assert_eq!( + get_sorted_segment_offsets(&partition_path), + [7, 14, 21], + "Phase 3: segment_deletable(13, 10)=false, seg1 protected" + ); + + // Phase 4: slow→13, barrier=13 → segment_deletable(13, 13)=true → seg1 released + assert!(segment_deletable(13, 13)); + client + .store_consumer_offset( + &slow_consumer, + &stream_ident, + &topic_ident, + Some(PARTITION_ID), + 13, + ) + .await + .unwrap(); + client + .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, u32::MAX) + .await + .unwrap(); + maybe_restart(harness, restart_server).await; + assert_eq!( + get_sorted_segment_offsets(&partition_path), + [14, 21], + "Phase 4: segment_deletable(13, 13)=true, seg1 released" + ); + + // Phase 5: slow→20, barrier=20 → segment_deletable(20, 20)=true → seg2 released + assert!(segment_deletable(20, 20)); + client + .store_consumer_offset( + &slow_consumer, + &stream_ident, + &topic_ident, + Some(PARTITION_ID), + 20, + ) + .await + .unwrap(); + client + .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, u32::MAX) + .await + .unwrap(); + maybe_restart(harness, restart_server).await; + assert_eq!( + get_sorted_segment_offsets(&partition_path), + [21], + "Phase 5: segment_deletable(20, 20)=true, only active segment remains" + ); + assert_no_orphaned_segment_files(&partition_path, 1); + + // Cleanup: drop high-level consumer, delete stream resources + drop(fast_consumer); + client + .delete_topic(&stream_ident, &topic_ident) + .await + .unwrap(); + client.delete_stream(&stream_ident).await.unwrap(); +} + +/// purge_topic is a full reset: consumer offsets wiped (memory + disk), segments deleted, +/// partition restarted at offset 0. +/// +/// Sets up both a standalone consumer offset and a consumer group offset, purges, then +/// verifies: in-memory offsets return None, offset files deleted from disk, single empty +/// segment at offset 0, new messages start at offset 0. +pub async fn run_purge_topic(harness: &mut TestHarness, restart_server: bool) { + let client = build_root_client(harness); + client.connect().await.unwrap(); + let data_path = harness.server().data_path().to_path_buf(); + + let stream = client.create_stream(STREAM_NAME).await.unwrap(); + let stream_id = stream.id; + + let topic = client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + let topic_id = topic.id; + + let stream_ident = Identifier::named(STREAM_NAME).unwrap(); + let topic_ident = Identifier::named(TOPIC_NAME).unwrap(); + + send_messages(&client, &stream_ident, &topic_ident, TOTAL_MESSAGES).await; + + let partition_path = partition_path(&data_path, stream_id, topic_id); + + assert_eq!( + get_sorted_segment_offsets(&partition_path), + EXPECTED_SEGMENT_OFFSETS + ); + + // --- Store individual consumer offset at 13 --- + let consumer = Consumer { + kind: ConsumerKind::Consumer, + id: Identifier::numeric(1).unwrap(), + }; + client + .store_consumer_offset( + &consumer, + &stream_ident, + &topic_ident, + Some(PARTITION_ID), + 13, + ) + .await + .unwrap(); + + // --- Consumer group: poll 10 messages → group offset at 9 via high-level API --- + let mut group_consumer = client + .consumer_group("purge_group", STREAM_NAME, TOPIC_NAME) + .unwrap() + .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) + .create_consumer_group_if_not_exists() + .auto_join_consumer_group() + .polling_strategy(PollingStrategy::offset(0)) + .batch_length(10) + .build(); + group_consumer.init().await.unwrap(); + + { + use futures::StreamExt; + let mut consumed = 0u32; + while let Some(msg) = group_consumer.next().await { + msg.unwrap(); + consumed += 1; + if consumed >= 10 { + break; + } + } + assert_eq!(consumed, 10); + } + + // Need the group ID for get_consumer_offset verification + let group_details = client + .get_consumer_group( + &stream_ident, + &topic_ident, + &Identifier::named("purge_group").unwrap(), + ) + .await + .unwrap() + .expect("purge_group must exist"); + let group_ident = Identifier::numeric(group_details.id).unwrap(); + let group_consumer_ref = Consumer { + kind: ConsumerKind::ConsumerGroup, + id: group_ident.clone(), + }; + + // Verify both offsets are stored + let consumer_offset = client + .get_consumer_offset(&consumer, &stream_ident, &topic_ident, Some(PARTITION_ID)) + .await + .unwrap(); + assert!(consumer_offset.is_some(), "Consumer offset must be stored"); + assert_eq!(consumer_offset.unwrap().stored_offset, 13); + + let group_offset = client + .get_consumer_offset( + &group_consumer_ref, + &stream_ident, + &topic_ident, + Some(PARTITION_ID), + ) + .await + .unwrap(); + assert!( + group_offset.is_some(), + "Consumer group offset must be stored" + ); + assert_eq!(group_offset.unwrap().stored_offset, 9); + + // Verify offset files exist on disk + let consumers_dir = format!("{partition_path}/offsets/consumers"); + let groups_dir = format!("{partition_path}/offsets/groups"); + assert!( + !is_dir_empty(&consumers_dir), + "Consumer offset file must exist before purge" + ); + assert!( + !is_dir_empty(&groups_dir), + "Consumer group offset file must exist before purge" + ); + + // --- Purge topic --- + // Drop high-level consumer before purge to release group membership + drop(group_consumer); + client + .purge_topic(&stream_ident, &topic_ident) + .await + .unwrap(); + maybe_restart(harness, restart_server).await; + + // --- Verify consumer offsets cleared --- + let consumer_offset = client + .get_consumer_offset(&consumer, &stream_ident, &topic_ident, Some(PARTITION_ID)) + .await + .unwrap(); + assert!( + consumer_offset.is_none(), + "Consumer offset must be cleared after purge" + ); + + let group_offset = client + .get_consumer_offset( + &group_consumer_ref, + &stream_ident, + &topic_ident, + Some(PARTITION_ID), + ) + .await + .unwrap(); + assert!( + group_offset.is_none(), + "Consumer group offset must be cleared after purge" + ); + + // --- Verify offset files deleted from disk --- + let consumer_files: Vec<_> = read_dir(&consumers_dir) + .map(|e| e.filter_map(|e| e.ok().map(|e| e.file_name())).collect()) + .unwrap_or_default(); + let group_files: Vec<_> = read_dir(&groups_dir) + .map(|e| e.filter_map(|e| e.ok().map(|e| e.file_name())).collect()) + .unwrap_or_default(); + assert!( + consumer_files.is_empty(), + "Consumer offset files must be deleted after purge, found: {consumer_files:?}" + ); + assert!( + group_files.is_empty(), + "Consumer group offset files must be deleted after purge, found: {group_files:?}" + ); + + // --- Verify partition reset: single empty segment at offset 0 --- + assert_fresh_empty_partition(&partition_path); + + // --- Verify new messages start at offset 0 --- + let new_msg_count = 3u32; + send_messages(&client, &stream_ident, &topic_ident, new_msg_count).await; + + let probe_consumer = Consumer { + kind: ConsumerKind::Consumer, + id: Identifier::numeric(99).unwrap(), + }; + let polled = client + .poll_messages( + &stream_ident, + &topic_ident, + Some(PARTITION_ID), + &probe_consumer, + &PollingStrategy::offset(0), + 100, + false, + ) + .await + .unwrap(); + let offsets: Vec<u64> = polled.messages.iter().map(|m| m.header.offset).collect(); + assert_eq!( + offsets, + (0..new_msg_count as u64).collect::<Vec<_>>(), + "New messages must start at offset 0 after purge" + ); + + // Cleanup: consumer group was dropped before purge, just delete stream resources + client + .delete_consumer_group(&stream_ident, &topic_ident, &group_ident) + .await + .unwrap(); + client + .delete_topic(&stream_ident, &topic_ident) + .await + .unwrap(); + client.delete_stream(&stream_ident).await.unwrap(); +} + +async fn maybe_restart(harness: &mut TestHarness, restart_server: bool) { + if restart_server { + harness.restart_server().await.unwrap(); + } +} + +/// Build a root client with SDK-level auto-reconnect and auto-sign-in. +/// +/// Unlike `harness.tcp_root_client()` which does a one-shot `login_user()`, this embeds +/// credentials in the transport config so the SDK re-authenticates on reconnect. +fn build_root_client(harness: &TestHarness) -> IggyClient { + let addr = harness.server().tcp_addr().unwrap(); + IggyClient::builder() + .with_tcp() + .with_server_address(addr.to_string()) + .with_auto_sign_in(AutoLogin::Enabled(Credentials::UsernamePassword( + DEFAULT_ROOT_USERNAME.to_string(), + DEFAULT_ROOT_PASSWORD.to_string(), + ))) + .build() + .unwrap() +} + +fn partition_path(data_path: &Path, stream_id: u32, topic_id: u32) -> String { + data_path + .join(format!( + "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}" + )) + .display() + .to_string() +} + +async fn send_messages( + client: &IggyClient, + stream_ident: &Identifier, + topic_ident: &Identifier, + count: u32, +) { + for i in 0..count { + let payload = Bytes::from(format!("{i:04}").repeat(PAYLOAD_SIZE / 4)); + let message = IggyMessage::builder() + .id(i as u128) + .payload(payload) + .build() + .expect("Failed to create message"); + + let mut messages = vec![message]; + client + .send_messages( + stream_ident, + topic_ident, + &Partitioning::partition_id(PARTITION_ID), + &mut messages, + ) + .await + .unwrap(); + } +} + +/// Polls all messages from offset 0 and returns their offsets in order. +async fn poll_all_offsets( + client: &IggyClient, + stream_ident: &Identifier, + topic_ident: &Identifier, +) -> Vec<u64> { + let consumer = Consumer { + kind: ConsumerKind::Consumer, + id: Identifier::numeric(99).unwrap(), + }; + let polled = client + .poll_messages( + stream_ident, + topic_ident, + Some(PARTITION_ID), + &consumer, + &PollingStrategy::offset(0), + TOTAL_MESSAGES * 2, + false, + ) + .await + .unwrap(); + polled.messages.iter().map(|m| m.header.offset).collect() +} + +/// Asserts that each segment's `.log` and `.index` files have the exact expected size. +/// Derives message count per segment from adjacent offsets and TOTAL_MESSAGES. +fn assert_segment_file_sizes(partition_path: &str, offsets: &[u64]) { + for (i, &offset) in offsets.iter().enumerate() { + let msg_count = if i + 1 < offsets.len() { + offsets[i + 1] - offset + } else { + TOTAL_MESSAGES as u64 - offset + }; + + let log_path = format!("{partition_path}/{offset:0>20}.{LOG_EXTENSION}"); + let index_path = format!("{partition_path}/{offset:0>20}.{INDEX_EXTENSION}"); + + let log_size = metadata(&log_path) + .unwrap_or_else(|e| panic!("{log_path}: {e}")) + .len(); + let index_size = metadata(&index_path) + .unwrap_or_else(|e| panic!("{index_path}: {e}")) + .len(); + + let expected_log = msg_count * MESSAGE_ON_DISK_SIZE; + let expected_index = msg_count * INDEX_SIZE_PER_MSG; + assert_eq!( + log_size, expected_log, + "Segment {offset}: log {log_size}B != expected {expected_log}B ({msg_count} msgs)" + ); + assert_eq!( + index_size, expected_index, + "Segment {offset}: index {index_size}B != expected {expected_index}B ({msg_count} msgs)" + ); + } +} + +fn get_sorted_segment_offsets(partition_path: &str) -> Vec<u64> { + let mut offsets: Vec<u64> = read_dir(partition_path) + .map(|entries| { + entries + .filter_map(|entry| { + let entry = entry.ok()?; + let path = entry.path(); + if path.extension().is_some_and(|ext| ext == LOG_EXTENSION) { + path.file_stem() + .and_then(|s| s.to_str()) + .and_then(|s| s.parse::<u64>().ok()) + } else { + None + } + }) + .collect() + }) + .unwrap_or_default(); + offsets.sort(); + offsets +} + +fn is_dir_empty(dir: &str) -> bool { + read_dir(dir) + .map(|mut entries| entries.next().is_none()) + .unwrap_or(true) +} + +/// Asserts the partition directory contains exactly one .log and one .index file at offset 0, +/// both with size 0 — the expected state after a full purge or segment reset. +fn assert_fresh_empty_partition(partition_path: &str) { + assert_eq!( + get_sorted_segment_offsets(partition_path), + [0], + "Partition must contain a single segment at offset 0" + ); + assert_eq!( + count_files_with_ext(partition_path, INDEX_EXTENSION), + 1, + "Exactly one .index file must remain" + ); + + let log_path = format!("{partition_path}/{:0>20}.{LOG_EXTENSION}", 0); + let index_path = format!("{partition_path}/{:0>20}.{INDEX_EXTENSION}", 0); + assert_eq!( + metadata(&log_path).unwrap().len(), + 0, + "Fresh .log must be empty" + ); + assert_eq!( + metadata(&index_path).unwrap().len(), + 0, + "Fresh .index must be empty" + ); +} + +/// Asserts no orphaned segment files remain after deletion. +/// `get_sorted_segment_offsets` only checks .log files — this additionally verifies +/// that the .index file count matches, catching stale .index files left behind. +fn assert_no_orphaned_segment_files(partition_path: &str, expected_count: usize) { + let log_count = count_files_with_ext(partition_path, LOG_EXTENSION); + let index_count = count_files_with_ext(partition_path, INDEX_EXTENSION); + assert_eq!( + log_count, expected_count, + "Expected {expected_count} .log files, found {log_count}" + ); + assert_eq!( + index_count, expected_count, + "Expected {expected_count} .index files, found {index_count}" + ); +} + +fn count_files_with_ext(dir: &str, ext: &str) -> usize { + read_dir(dir) + .map(|entries| { + entries + .filter_map(|e| e.ok()) + .filter(|e| e.path().extension().is_some_and(|e| e == ext)) + .count() + }) + .unwrap_or(0) +} + +/// Mirrors the server's deletion rule: a sealed segment is deletable when +/// `seg.end_offset <= min_committed_offset` (see `delete_oldest_segments` in segments.rs). +/// +/// `seg_end_offset` is the last offset stored in the segment (inclusive). +/// `committed` is the minimum committed offset across all consumers/groups. +fn segment_deletable(seg_end_offset: u64, committed: u64) -> bool { + seg_end_offset <= committed +} diff --git a/core/integration/tests/server/specific.rs b/core/integration/tests/server/specific.rs index 4adaaf9bf..ad569b1bd 100644 --- a/core/integration/tests/server/specific.rs +++ b/core/integration/tests/server/specific.rs @@ -18,19 +18,11 @@ */ use crate::server::scenarios::{ - delete_segments_scenario, message_size_scenario, segment_rotation_race_scenario, - single_message_per_batch_scenario, tcp_tls_scenario, websocket_tls_scenario, + message_size_scenario, segment_rotation_race_scenario, single_message_per_batch_scenario, + tcp_tls_scenario, websocket_tls_scenario, }; use integration::iggy_harness; -#[iggy_harness(server(segment.size = "1MiB"))] -async fn should_delete_segments_and_validate_filesystem(harness: &TestHarness) { - let client = harness.tcp_root_client().await.unwrap(); - let data_path = harness.server().data_path(); - - delete_segments_scenario::run(&client, &data_path).await; -} - #[iggy_harness( test_client_transport = TcpTlsGenerated, server(tls = generated) diff --git a/core/server/src/binary/handlers/segments/delete_segments_handler.rs b/core/server/src/binary/handlers/segments/delete_segments_handler.rs index 4a59a2d28..c8aa5ddc1 100644 --- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs +++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs @@ -65,7 +65,12 @@ impl ServerCommandHandler for DeleteSegments { let request = ShardRequest::data_plane(namespace, payload); match shard.send_to_data_plane(request).await? { - ShardResponse::DeleteSegments => { + ShardResponse::DeleteSegments { + deleted_segments, + deleted_messages, + } => { + shard.metrics.decrement_segments(deleted_segments as u32); + shard.metrics.decrement_messages(deleted_messages); sender.send_empty_ok_response().await?; } ShardResponse::ErrorResponse(err) => return Err(err), diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs b/core/server/src/binary/handlers/users/delete_user_handler.rs index a34a98fbd..7164aab70 100644 --- a/core/server/src/binary/handlers/users/delete_user_handler.rs +++ b/core/server/src/binary/handlers/users/delete_user_handler.rs @@ -52,11 +52,11 @@ impl ServerCommandHandler for DeleteUser { }); match shard.send_to_control_plane(request).await? { - ShardResponse::DeletedUser(_) => { + ShardResponse::DeleteUserResponse(_) => { sender.send_empty_ok_response().await?; } ShardResponse::ErrorResponse(err) => return Err(err), - _ => unreachable!("Expected DeletedUser"), + _ => unreachable!("Expected DeleteUserResponse"), } Ok(HandlerResult::Finished) diff --git a/core/server/src/http/users.rs b/core/server/src/http/users.rs index 2ce8636ff..ea0db9b9f 100644 --- a/core/server/src/http/users.rs +++ b/core/server/src/http/users.rs @@ -213,9 +213,9 @@ async fn delete_user( }); match state.shard.send_to_control_plane(request).await? { - ShardResponse::DeletedUser(_) => Ok(StatusCode::NO_CONTENT), + ShardResponse::DeleteUserResponse(_) => Ok(StatusCode::NO_CONTENT), ShardResponse::ErrorResponse(err) => Err(err.into()), - _ => unreachable!("Expected DeletedUser"), + _ => unreachable!("Expected DeleteUserResponse"), } } diff --git a/core/server/src/shard/execution.rs b/core/server/src/shard/execution.rs index a71244dac..df76b80ee 100644 --- a/core/server/src/shard/execution.rs +++ b/core/server/src/shard/execution.rs @@ -46,8 +46,6 @@ use iggy_common::{ purge_stream::PurgeStream, purge_topic::PurgeTopic, update_permissions::UpdatePermissions, update_stream::UpdateStream, update_topic::UpdateTopic, update_user::UpdateUser, }; -use std::rc::Rc; - pub struct DeleteStreamResult { pub stream_id: usize, } @@ -65,7 +63,7 @@ pub struct DeletePartitionsResult { } pub async fn execute_create_stream( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: CreateStream, ) -> Result<StreamResponseData, IggyError> { @@ -98,7 +96,7 @@ pub async fn execute_create_stream( } pub async fn execute_update_stream( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: UpdateStream, ) -> Result<(), IggyError> { @@ -116,7 +114,7 @@ pub async fn execute_update_stream( } pub async fn execute_delete_stream( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: DeleteStream, ) -> Result<DeleteStreamResult, IggyError> { @@ -165,7 +163,7 @@ pub async fn execute_delete_stream( } pub async fn execute_purge_stream( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: PurgeStream, ) -> Result<(), IggyError> { @@ -173,6 +171,7 @@ pub async fn execute_purge_stream( shard.metadata.perm_purge_stream(user_id, stream.id())?; shard.purge_stream(stream).await?; + shard.purge_stream_local(stream).await?; shard .state @@ -191,7 +190,7 @@ pub async fn execute_purge_stream( } pub async fn execute_create_topic( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: CreateTopic, ) -> Result<TopicResponseData, IggyError> { @@ -260,7 +259,7 @@ pub async fn execute_create_topic( } pub async fn execute_update_topic( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: UpdateTopic, ) -> Result<(), IggyError> { @@ -287,7 +286,7 @@ pub async fn execute_update_topic( } pub async fn execute_delete_topic( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: DeleteTopic, ) -> Result<DeleteTopicResult, IggyError> { @@ -327,7 +326,7 @@ pub async fn execute_delete_topic( } pub async fn execute_purge_topic( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: PurgeTopic, ) -> Result<(), IggyError> { @@ -337,6 +336,7 @@ pub async fn execute_purge_topic( .perm_purge_topic(user_id, topic.stream_id, topic.topic_id)?; shard.purge_topic(topic).await?; + shard.purge_topic_local(topic).await?; shard .state @@ -357,7 +357,7 @@ pub async fn execute_purge_topic( } pub async fn execute_create_partitions( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: CreatePartitions, ) -> Result<CreatePartitionsResult, IggyError> { @@ -400,7 +400,7 @@ pub async fn execute_create_partitions( } pub async fn execute_delete_partitions( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: DeletePartitions, ) -> Result<DeletePartitionsResult, IggyError> { @@ -446,7 +446,7 @@ pub async fn execute_delete_partitions( } pub async fn execute_create_consumer_group( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: CreateConsumerGroup, ) -> Result<ConsumerGroupResponseData, IggyError> { @@ -482,7 +482,7 @@ pub async fn execute_create_consumer_group( } pub async fn execute_delete_consumer_group( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: DeleteConsumerGroup, ) -> Result<(), IggyError> { @@ -513,7 +513,7 @@ pub async fn execute_delete_consumer_group( } pub fn execute_join_consumer_group( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, client_id: u32, command: JoinConsumerGroup, @@ -530,7 +530,7 @@ pub fn execute_join_consumer_group( } pub fn execute_leave_consumer_group( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, client_id: u32, command: LeaveConsumerGroup, @@ -547,7 +547,7 @@ pub fn execute_leave_consumer_group( } pub async fn execute_create_user( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: iggy_common::create_user::CreateUser, ) -> Result<User, IggyError> { @@ -578,7 +578,7 @@ pub async fn execute_create_user( } pub async fn execute_delete_user( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: DeleteUser, ) -> Result<User, IggyError> { @@ -595,7 +595,7 @@ pub async fn execute_delete_user( } pub async fn execute_update_user( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: UpdateUser, ) -> Result<User, IggyError> { @@ -612,7 +612,7 @@ pub async fn execute_update_user( } pub async fn execute_change_password( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: ChangePassword, ) -> Result<(), IggyError> { @@ -643,7 +643,7 @@ pub async fn execute_change_password( } pub async fn execute_update_permissions( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: UpdatePermissions, ) -> Result<(), IggyError> { @@ -665,7 +665,7 @@ pub async fn execute_update_permissions( } pub async fn execute_create_personal_access_token( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: CreatePersonalAccessToken, ) -> Result<(PersonalAccessToken, String), IggyError> { @@ -687,7 +687,7 @@ pub async fn execute_create_personal_access_token( } pub async fn execute_delete_personal_access_token( - shard: &Rc<IggyShard>, + shard: &IggyShard, user_id: u32, command: DeletePersonalAccessToken, ) -> Result<(), IggyError> { diff --git a/core/server/src/shard/handlers.rs b/core/server/src/shard/handlers.rs index 89783476e..a2e14800c 100644 --- a/core/server/src/shard/handlers.rs +++ b/core/server/src/shard/handlers.rs @@ -120,15 +120,18 @@ async fn handle_request( } ShardRequestPayload::DeleteSegments { segments_count } => { let ns = namespace.expect("DeleteSegments requires routing namespace"); - shard - .delete_segments( + let (deleted_segments, deleted_messages) = shard + .delete_oldest_segments( ns.stream_id(), ns.topic_id(), ns.partition_id(), segments_count, ) .await?; - Ok(ShardResponse::DeleteSegments) + Ok(ShardResponse::DeleteSegments { + deleted_segments, + deleted_messages, + }) } ShardRequestPayload::CleanTopicMessages { stream_id, @@ -220,7 +223,7 @@ async fn handle_request( "DeleteUserRequest should only be handled by shard0" ); let user = execution::execute_delete_user(shard, user_id, command).await?; - Ok(ShardResponse::DeletedUser(user)) + Ok(ShardResponse::DeleteUserResponse(user)) } ShardRequestPayload::UpdateStreamRequest { user_id, command } => { assert_eq!( @@ -486,7 +489,7 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: ShardEvent) -> Result<() } ShardEvent::PurgedStream { stream_id } => { let stream = shard.resolve_stream(&stream_id)?; - shard.purge_stream(stream).await?; + shard.purge_stream_local(stream).await?; Ok(()) } ShardEvent::PurgedTopic { @@ -494,7 +497,7 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: ShardEvent) -> Result<() topic_id, } => { let topic = shard.resolve_topic(&stream_id, &topic_id)?; - shard.purge_topic(topic).await?; + shard.purge_topic_local(topic).await?; Ok(()) } ShardEvent::AddressBound { protocol, address } => { diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 6d89b1523..7e9f9ae6d 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -65,7 +65,7 @@ pub use communication::calculate_shard_assignment; pub const COMPONENT: &str = "SHARD"; pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); -pub const BROADCAST_TIMEOUT: Duration = Duration::from_secs(500); +pub const BROADCAST_TIMEOUT: Duration = Duration::from_secs(20); pub struct IggyShard { pub id: u16, diff --git a/core/server/src/shard/system/consumer_offsets.rs b/core/server/src/shard/system/consumer_offsets.rs index 85c032d7f..e8f76cde5 100644 --- a/core/server/src/shard/system/consumer_offsets.rs +++ b/core/server/src/shard/system/consumer_offsets.rs @@ -391,4 +391,48 @@ impl IggyShard { pub async fn delete_consumer_offset_from_disk(&self, path: &str) -> Result<(), IggyError> { crate::streaming::partitions::storage::delete_persisted_offset(path).await } + + /// Enumerates and deletes all consumer/group offset files for a partition from disk. + /// Uses filesystem paths from config rather than in-memory state (which may already be cleared). + pub async fn delete_all_consumer_offset_files( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + ) -> Result<(), IggyError> { + let consumers_path = + self.config + .system + .get_consumer_offsets_path(stream_id, topic_id, partition_id); + let groups_path = + self.config + .system + .get_consumer_group_offsets_path(stream_id, topic_id, partition_id); + + Self::delete_all_files_in_dir(&consumers_path).await?; + Self::delete_all_files_in_dir(&groups_path).await?; + Ok(()) + } + + async fn delete_all_files_in_dir(dir: &str) -> Result<(), IggyError> { + let entries = match std::fs::read_dir(dir) { + Ok(entries) => entries, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()), + Err(e) => { + return Err(IggyError::IoError(format!( + "Failed to read directory {dir}: {e}" + ))); + } + }; + for entry in entries.flatten() { + let path = entry.path(); + if path.is_file() { + crate::streaming::partitions::storage::delete_persisted_offset( + &path.to_string_lossy(), + ) + .await?; + } + } + Ok(()) + } } diff --git a/core/server/src/shard/system/segments.rs b/core/server/src/shard/system/segments.rs index 8940350ff..e71be0d7a 100644 --- a/core/server/src/shard/system/segments.rs +++ b/core/server/src/shard/system/segments.rs @@ -270,101 +270,149 @@ impl IggyShard { Ok((1, messages_in_segment)) } - pub(crate) async fn delete_segments( + /// Deletes the N oldest **sealed** segments from a partition, preserving the active segment + /// and partition offset. Reuses `remove_segment_by_offset` — same logic as the message cleaner. + /// + /// Segments containing unconsumed messages are protected: a segment is only eligible for + /// deletion if `end_offset <= min_committed_offset` across all consumers and consumer groups. + /// If no consumers exist, there is no barrier. + pub(crate) async fn delete_oldest_segments( &self, - stream: usize, - topic: usize, + stream_id: usize, + topic_id: usize, partition_id: usize, segments_count: u32, + ) -> Result<(u64, u64), IggyError> { + let ns = IggyNamespace::new(stream_id, topic_id, partition_id); + + let sealed_offsets: Vec<u64> = { + let partitions = self.local_partitions.borrow(); + let Some(partition) = partitions.get(&ns) else { + return Ok((0, 0)); + }; + + let min_committed = Self::min_committed_offset( + &partition.consumer_offsets, + &partition.consumer_group_offsets, + ); + + let segments = partition.log.segments(); + let last_idx = segments.len().saturating_sub(1); + + segments + .iter() + .enumerate() + .filter(|(idx, seg)| { + *idx != last_idx + && seg.sealed + && min_committed.is_none_or(|barrier| seg.end_offset <= barrier) + }) + .map(|(_, seg)| seg.start_offset) + .take(segments_count as usize) + .collect() + }; + + let mut total_segments = 0u64; + let mut total_messages = 0u64; + for offset in sealed_offsets { + let (s, m) = self + .remove_segment_by_offset(stream_id, topic_id, partition_id, offset) + .await?; + total_segments += s; + total_messages += m; + } + Ok((total_segments, total_messages)) + } + + /// Returns the minimum committed offset across all consumers and consumer groups, + /// or `None` if no consumers exist (no barrier). + fn min_committed_offset( + consumer_offsets: &crate::streaming::partitions::consumer_offsets::ConsumerOffsets, + consumer_group_offsets: &crate::streaming::partitions::consumer_group_offsets::ConsumerGroupOffsets, + ) -> Option<u64> { + let co_guard = consumer_offsets.pin(); + let cg_guard = consumer_group_offsets.pin(); + let consumers = co_guard + .iter() + .map(|(_, co)| co.offset.load(std::sync::atomic::Ordering::Relaxed)); + let groups = cg_guard + .iter() + .map(|(_, co)| co.offset.load(std::sync::atomic::Ordering::Relaxed)); + consumers.chain(groups).min() + } + + /// Drains all segments, deletes their files, and re-initializes the partition log at offset 0. + /// Used exclusively by `purge_topic_inner` — a destructive full reset. + pub(crate) async fn purge_all_segments( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, ) -> Result<(), IggyError> { - let namespace = IggyNamespace::new(stream, topic, partition_id); + let namespace = IggyNamespace::new(stream_id, topic_id, partition_id); - // Drain segments from local_partitions let (segments, storages, stats) = { let mut partitions = self.local_partitions.borrow_mut(); let partition = partitions .get_mut(&namespace) - .expect("delete_segments_base: partition must exist in local_partitions"); + .expect("purge_all_segments: partition must exist in local_partitions"); let upperbound = partition.log.segments().len(); - let begin = upperbound.saturating_sub(segments_count as usize); let segments = partition .log .segments_mut() - .drain(begin..upperbound) + .drain(..upperbound) .collect::<Vec<_>>(); let storages = partition .log .storages_mut() - .drain(begin..upperbound) + .drain(..upperbound) .collect::<Vec<_>>(); let _ = partition .log .indexes_mut() - .drain(begin..upperbound) + .drain(..upperbound) .collect::<Vec<_>>(); (segments, storages, partition.stats.clone()) }; for (mut storage, segment) in storages.into_iter().zip(segments.into_iter()) { let (msg_writer, index_writer) = storage.shutdown(); - if let Some(msg_writer) = msg_writer - && let Some(index_writer) = index_writer - { - // We need to fsync before closing to ensure all data is written to disk. - msg_writer.fsync().await?; - index_writer.fsync().await?; + let start_offset = segment.start_offset; + + let log_path = if let Some(msg_writer) = msg_writer { let path = msg_writer.path(); drop(msg_writer); - drop(index_writer); - // File might not exist if never actually written to disk (lazy creation) - match compio::fs::remove_file(&path).await { - Ok(()) => {} - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - tracing::debug!( - "Segment file already gone or never created at path: {}", - path - ); - } - Err(e) => { - tracing::error!( - "Failed to delete segment file at path: {}, err: {}", - path, - e - ); - return Err(IggyError::CannotDeleteFile); - } - } + path } else { - let start_offset = segment.start_offset; - let path = self.config.system.get_messages_file_path( - stream, - topic, + self.config.system.get_messages_file_path( + stream_id, + topic_id, partition_id, start_offset, - ); - // File might not exist if segment was never written to (lazy creation) - match compio::fs::remove_file(&path).await { + ) + }; + drop(index_writer); + + let index_path = + self.config + .system + .get_index_path(stream_id, topic_id, partition_id, start_offset); + + for path in [&log_path, &index_path] { + match compio::fs::remove_file(path).await { Ok(()) => {} Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - tracing::debug!( - "Segment file already gone or never created at path: {}", - path - ); + tracing::debug!("File already gone at path: {path}"); } Err(e) => { - tracing::error!( - "Failed to delete segment file at path: {}, err: {}", - path, - e - ); + tracing::error!("Failed to delete file at path: {path}, err: {e}"); return Err(IggyError::CannotDeleteFile); } } } } - // Add segment directly to local_partitions self.init_log_in_local_partitions(&namespace).await?; stats.increment_segments_count(1); Ok(()) @@ -374,9 +422,7 @@ impl IggyShard { /// /// The log is momentarily empty between `delete_segments`' drain and this call, which is /// safe because the message pump serializes all handlers — no concurrent operation can - /// observe the empty state. The new segment reuses the offset-0 file path; writers - /// open with `truncate(true)` when `!file_exists` to clear any stale data from a - /// previous incarnation at the same path. + /// observe the empty state. async fn init_log_in_local_partitions( &self, namespace: &IggyNamespace, diff --git a/core/server/src/shard/system/streams.rs b/core/server/src/shard/system/streams.rs index afccda732..408e8d2d4 100644 --- a/core/server/src/shard/system/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -146,6 +146,7 @@ impl IggyShard { Ok(stream_info) } + /// Clears in-memory state for all topics in a stream. pub async fn purge_stream(&self, stream: ResolvedStream) -> Result<(), IggyError> { let stream_id = stream.id(); let topic_ids = self.metadata.get_topic_ids(stream_id); @@ -160,4 +161,20 @@ impl IggyShard { Ok(()) } + + /// Disk cleanup for local partitions across all topics in a stream. + pub(crate) async fn purge_stream_local(&self, stream: ResolvedStream) -> Result<(), IggyError> { + let stream_id = stream.id(); + let topic_ids = self.metadata.get_topic_ids(stream_id); + + for topic_id in topic_ids { + let topic = ResolvedTopic { + stream_id, + topic_id, + }; + self.purge_topic_local(topic).await?; + } + + Ok(()) + } } diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index dc87cda80..24d53d220 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -193,69 +193,59 @@ impl IggyShard { Ok(topic_info) } + /// Clears in-memory state for a topic: consumer offsets and stats. + /// Called on the control plane before broadcasting to other shards. pub async fn purge_topic(&self, topic: ResolvedTopic) -> Result<(), IggyError> { let stream = topic.stream_id; let topic_id = topic.topic_id; - let partition_ids = self.metadata.get_partition_ids(stream, topic_id); - let mut all_consumer_paths = Vec::new(); - let mut all_group_paths = Vec::new(); - - for partition_id in &partition_ids { - let ns = IggyNamespace::new(stream, topic_id, *partition_id); - if let Some(partition) = self.local_partitions.borrow().get(&ns) { - all_consumer_paths.extend( - partition - .consumer_offsets - .pin() - .iter() - .map(|item| item.1.path.clone()), - ); - all_group_paths.extend( - partition - .consumer_group_offsets - .pin() - .iter() - .map(|item| item.1.path.clone()), - ); + for &partition_id in &partition_ids { + if let Some(offsets) = + self.metadata + .get_partition_consumer_offsets(stream, topic_id, partition_id) + { + offsets.pin().clear(); + } + if let Some(offsets) = + self.metadata + .get_partition_consumer_group_offsets(stream, topic_id, partition_id) + { + offsets.pin().clear(); } } - for path in all_consumer_paths { - self.delete_consumer_offset_from_disk(&path).await?; + if let Some(topic_stats) = self.metadata.get_topic_stats(stream, topic_id) { + topic_stats.zero_out_all(); } - for path in all_group_paths { - self.delete_consumer_offset_from_disk(&path).await?; + + for &partition_id in &partition_ids { + let ns = IggyNamespace::new(stream, topic_id, partition_id); + if let Some(partition_stats) = self.metadata.get_partition_stats(&ns) { + partition_stats.zero_out_all(); + } } - self.purge_topic_inner(topic).await + Ok(()) } - pub(crate) async fn purge_topic_inner(&self, topic: ResolvedTopic) -> Result<(), IggyError> { + /// Disk cleanup for local partitions: deletes consumer offset files and purges segments. + /// Called on each shard (including shard 0) after in-memory state is cleared. + pub(crate) async fn purge_topic_local(&self, topic: ResolvedTopic) -> Result<(), IggyError> { let stream = topic.stream_id; let topic_id = topic.topic_id; let partition_ids = self.metadata.get_partition_ids(stream, topic_id); for &partition_id in &partition_ids { let ns = IggyNamespace::new(stream, topic_id, partition_id); - - let has_partition = self.local_partitions.borrow().contains(&ns); - if has_partition { - self.delete_segments(stream, topic_id, partition_id, u32::MAX) - .await?; + if !self.local_partitions.borrow().contains(&ns) { + continue; } - } - - if let Some(topic_stats) = self.metadata.get_topic_stats(stream, topic_id) { - topic_stats.zero_out_all(); - } - for &partition_id in &partition_ids { - let ns = IggyNamespace::new(stream, topic_id, partition_id); - if let Some(partition_stats) = self.metadata.get_partition_stats(&ns) { - partition_stats.zero_out_all(); - } + self.delete_all_consumer_offset_files(stream, topic_id, partition_id) + .await?; + self.purge_all_segments(stream, topic_id, partition_id) + .await?; } Ok(()) diff --git a/core/server/src/shard/transmission/event.rs b/core/server/src/shard/transmission/event.rs index 64007a4ea..b243e26dd 100644 --- a/core/server/src/shard/transmission/event.rs +++ b/core/server/src/shard/transmission/event.rs @@ -42,13 +42,13 @@ pub enum ShardEvent { partition_id: usize, fsync: bool, }, - /// Purge all messages from a stream (requires per-shard log truncation) - PurgedStream { stream_id: Identifier }, - /// Purge all messages from a topic (requires per-shard log truncation) + /// Purge all messages, consumer groups and consumer group offsets from a topic PurgedTopic { stream_id: Identifier, topic_id: Identifier, }, + /// Purges all topics in a stream + PurgedStream { stream_id: Identifier }, /// New partitions created (requires per-shard log initialization) CreatedPartitions { stream_id: Identifier, diff --git a/core/server/src/shard/transmission/frame.rs b/core/server/src/shard/transmission/frame.rs index c8cc585a9..e6d630837 100644 --- a/core/server/src/shard/transmission/frame.rs +++ b/core/server/src/shard/transmission/frame.rs @@ -63,7 +63,10 @@ pub enum ShardResponse { FlushUnsavedBuffer { flushed_count: u32, }, - DeleteSegments, + DeleteSegments { + deleted_segments: u64, + deleted_messages: u64, + }, CleanTopicMessages { deleted_segments: u64, deleted_messages: u64, @@ -75,7 +78,7 @@ pub enum ShardResponse { UpdateTopicResponse, DeleteTopicResponse(usize), CreateUserResponse(User), - DeletedUser(User), + DeleteUserResponse(User), GetStatsResponse(Stats), CreatePartitionsResponse(Vec<usize>), DeletePartitionsResponse(Vec<usize>), diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs b/core/server/src/streaming/segments/indexes/index_writer.rs index fb5a0a533..bab61fe9a 100644 --- a/core/server/src/streaming/segments/indexes/index_writer.rs +++ b/core/server/src/streaming/segments/indexes/index_writer.rs @@ -50,11 +50,6 @@ impl IndexWriter { ) -> Result<Self, IggyError> { let mut opts = OpenOptions::new(); opts.create(true).write(true); - if !file_exists { - // When creating a fresh segment at a reused path (e.g. offset 0 after all segments - // were deleted), truncate to clear any stale data from a previous incarnation. - opts.truncate(true); - } let file = opts .open(file_path) .await diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs b/core/server/src/streaming/segments/messages/messages_writer.rs index 525f4292a..9b9f08c82 100644 --- a/core/server/src/streaming/segments/messages/messages_writer.rs +++ b/core/server/src/streaming/segments/messages/messages_writer.rs @@ -52,11 +52,6 @@ impl MessagesWriter { ) -> Result<Self, IggyError> { let mut opts = OpenOptions::new(); opts.create(true).write(true); - if !file_exists { - // When creating a fresh segment at a reused path (e.g. offset 0 after all segments - // were deleted), truncate to clear any stale data from a previous incarnation. - opts.truncate(true); - } let file = opts .open(file_path) .await
