This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch command-handler-impr in repository https://gitbox.apache.org/repos/asf/iggy.git
commit d521d529b13aa405bcc2006bd0340f91a639ae3f Author: Hubert Gruszecki <[email protected]> AuthorDate: Thu Feb 12 00:43:11 2026 +0100 refactor(server): serialize all partition mutations through message pump Per-partition write locks (TokioMutex) masked a deeper issue: periodic tasks (message_saver, message_cleaner) could interleave with append handlers across async suspension points, creating subtle TOCTOU races on segment indices and storage vectors. The fix routes every partition mutation — appends, flushes, segment deletion — exclusively through the message pump's single-threaded loop. Periodic tasks now enqueue requests via the data-plane channel and await responses, replacing direct local_partitions access. This eliminates write locks entirely and guarantees strict ordering. Also: pump drains in-flight frames and performs flush+fsync on shutdown (replacing message_saver's on_shutdown hook), writers truncate when reusing segment paths after full deletion, and partition loading creates an initial segment when none exist on disk. --- Cargo.lock | 1 - core/server/Cargo.toml | 1 - core/server/src/shard/handlers.rs | 39 +- core/server/src/shard/mod.rs | 26 +- core/server/src/shard/system/messages.rs | 104 +----- core/server/src/shard/system/segments.rs | 267 +++++++++++++- .../src/shard/tasks/continuous/message_pump.rs | 81 +++- .../src/shard/tasks/periodic/message_cleaner.rs | 408 ++------------------- .../src/shard/tasks/periodic/message_saver.rs | 74 +--- core/server/src/shard/transmission/frame.rs | 8 +- core/server/src/shard/transmission/message.rs | 5 + .../src/streaming/partitions/local_partition.rs | 4 - .../src/streaming/segments/indexes/index_writer.rs | 11 +- .../streaming/segments/messages/messages_writer.rs | 11 +- 14 files changed, 488 insertions(+), 552 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d40a0c839..7f33c0c32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8321,7 +8321,6 @@ dependencies = [ "sysinfo 0.38.1", "tempfile", "thiserror 2.0.18", - "tokio", "toml 0.9.11+spec-1.1.0", "tower-http", "tracing", diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 433a1d7a8..15f7978f8 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -98,7 +98,6 @@ strum = { workspace = true } sysinfo = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } -tokio = { workspace = true, features = ["sync"] } toml = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } diff --git a/core/server/src/shard/handlers.rs b/core/server/src/shard/handlers.rs index a125f6441..89783476e 100644 --- a/core/server/src/shard/handlers.rs +++ b/core/server/src/shard/handlers.rs @@ -75,10 +75,26 @@ async fn handle_request( Ok(ShardResponse::SendMessages) } ShardRequestPayload::PollMessages { args, consumer } => { - let auto_commit = args.auto_commit; - let namespace = namespace.expect("PollMessages requires routing namespace"); + if args.count == 0 { + let current_offset = shard + .local_partitions + .borrow() + .get(&namespace) + .map(|p| p.offset.load(std::sync::atomic::Ordering::Relaxed)) + .unwrap_or(0); + return Ok(ShardResponse::PollMessages(( + iggy_common::IggyPollMetadata::new( + namespace.partition_id() as u32, + current_offset, + ), + crate::streaming::segments::IggyMessagesBatchSet::empty(), + ))); + } + + let auto_commit = args.auto_commit; + shard.ensure_partition(&namespace).await?; let (poll_metadata, batches) = shard @@ -97,10 +113,10 @@ async fn handle_request( } ShardRequestPayload::FlushUnsavedBuffer { fsync } => { let ns = namespace.expect("FlushUnsavedBuffer requires routing namespace"); - shard - .flush_unsaved_buffer_base(ns.stream_id(), ns.topic_id(), ns.partition_id(), fsync) + let flushed_count = shard + .flush_unsaved_buffer_from_local_partitions(&ns, fsync) .await?; - Ok(ShardResponse::FlushUnsavedBuffer) + Ok(ShardResponse::FlushUnsavedBuffer { flushed_count }) } ShardRequestPayload::DeleteSegments { segments_count } => { let ns = namespace.expect("DeleteSegments requires routing namespace"); @@ -114,6 +130,19 @@ async fn handle_request( .await?; Ok(ShardResponse::DeleteSegments) } + ShardRequestPayload::CleanTopicMessages { + stream_id, + topic_id, + partition_ids, + } => { + let (deleted_segments, deleted_messages) = shard + .clean_topic_messages(stream_id, topic_id, &partition_ids) + .await?; + Ok(ShardResponse::CleanTopicMessages { + deleted_segments, + deleted_messages, + }) + } ShardRequestPayload::CreatePartitionsRequest { user_id, command } => { assert_eq!( shard.id, 0, diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index ea4e14cfa..6d89b1523 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -269,7 +269,31 @@ impl IggyShard { ) .await { - Ok(loaded_log) => { + Ok(mut loaded_log) => { + if !loaded_log.has_segments() { + info!( + "No segments found on disk for partition ID: {} for topic ID: {} for stream ID: {}, creating initial segment", + partition_id, topic_id, stream_id + ); + let segment = crate::streaming::segments::Segment::new( + 0, + self.config.system.segment.size, + ); + let storage = + crate::streaming::segments::storage::create_segment_storage( + &self.config.system, + stream_id, + topic_id, + partition_id, + 0, + 0, + 0, + ) + .await?; + loaded_log.add_persisted_segment(segment, storage); + stats.increment_segments_count(1); + } + let current_offset = loaded_log.active_segment().end_offset; stats.set_current_offset(current_offset); diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 5307b5c04..bc07ea457 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -54,30 +54,14 @@ impl IggyShard { partition.partition_id, ); - // Fast path: if partition exists locally, append directly without channel routing - let is_local = self.local_partitions.borrow().contains(&namespace); - if is_local { - let batch = self.maybe_encrypt_messages(batch)?; - let messages_count = batch.count(); - - self.append_messages_to_local_partition(&namespace, batch, &self.config.system) - .await?; - - self.metrics.increment_messages(messages_count as u64); - return Ok(()); - } - - // Slow path: route through data-plane channel to target shard let payload = ShardRequestPayload::SendMessages { batch }; let request = ShardRequest::data_plane(namespace, payload); match self.send_to_data_plane(request).await? { - ShardResponse::SendMessages => {} - ShardResponse::ErrorResponse(err) => return Err(err), + ShardResponse::SendMessages => Ok(()), + ShardResponse::ErrorResponse(err) => Err(err), _ => unreachable!("Expected SendMessages response"), } - - Ok(()) } /// Polls messages from partition. Permission must be checked by caller via @@ -103,45 +87,6 @@ impl IggyShard { let namespace = IggyNamespace::new(topic.stream_id, topic.topic_id, partition_id); - // Fast path: if partition exists locally, poll directly without channel routing - if self.local_partitions.borrow().contains(&namespace) { - if args.count == 0 { - let current_offset = self - .local_partitions - .borrow() - .get(&namespace) - .map(|data| data.offset.load(Ordering::Relaxed)) - .unwrap_or(0); - return Ok(( - IggyPollMetadata::new(partition_id as u32, current_offset), - IggyMessagesBatchSet::empty(), - )); - } - - let auto_commit = args.auto_commit; - - let (poll_metadata, batches) = self - .poll_messages_from_local_partition(&namespace, consumer, args) - .await?; - - if auto_commit && !batches.is_empty() { - let offset = batches - .last_offset() - .expect("Batch set should have at least one batch"); - self.auto_commit_consumer_offset_from_local_partition(&namespace, consumer, offset) - .await?; - } - - let batches = if let Some(encryptor) = &self.encryptor { - self.decrypt_messages(batches, encryptor).await? - } else { - batches - }; - - return Ok((poll_metadata, batches)); - } - - // Slow path: route through data-plane channel to target shard let payload = ShardRequestPayload::PollMessages { consumer, args }; let request = ShardRequest::data_plane(namespace, payload); @@ -181,24 +126,12 @@ impl IggyShard { let request = ShardRequest::data_plane(namespace, payload); match self.send_to_data_plane(request).await? { - ShardResponse::FlushUnsavedBuffer => Ok(()), + ShardResponse::FlushUnsavedBuffer { .. } => Ok(()), ShardResponse::ErrorResponse(err) => Err(err), _ => unreachable!("Expected FlushUnsavedBuffer response"), } } - pub(crate) async fn flush_unsaved_buffer_base( - &self, - stream: usize, - topic: usize, - partition_id: usize, - fsync: bool, - ) -> Result<u32, IggyError> { - let namespace = IggyNamespace::new(stream, topic, partition_id); - self.flush_unsaved_buffer_from_local_partitions(&namespace, fsync) - .await - } - /// Flushes unsaved messages from the partition store to disk. /// Returns the number of messages saved. pub(crate) async fn flush_unsaved_buffer_from_local_partitions( @@ -206,16 +139,6 @@ impl IggyShard { namespace: &IggyNamespace, fsync: bool, ) -> Result<u32, IggyError> { - let write_lock = { - let partitions = self.local_partitions.borrow(); - let Some(partition) = partitions.get(namespace) else { - return Ok(0); - }; - partition.write_lock.clone() - }; - - let _write_guard = write_lock.lock().await; - let frozen_batches = { let mut partitions = self.local_partitions.borrow_mut(); let Some(partition) = partitions.get_mut(namespace) else { @@ -361,22 +284,17 @@ impl IggyShard { Ok(()) } - pub async fn append_messages_to_local_partition( + /// Appends a batch to the active segment, flushing to disk and rotating if needed. + /// + /// Safety: called exclusively from the message pump — segment indices captured before + /// internal `.await` points (prepare_for_persistence, persist, rotate) remain valid + /// because no other handler can modify the segment vec while this frame is in progress. + pub(crate) async fn append_messages_to_local_partition( &self, namespace: &IggyNamespace, mut batch: IggyMessagesBatchMut, config: &crate::configs::system::SystemConfig, ) -> Result<(), IggyError> { - let write_lock = { - let partitions = self.local_partitions.borrow(); - let partition = partitions - .get(namespace) - .expect("local_partitions: partition must exist"); - partition.write_lock.clone() - }; - - let _write_guard = write_lock.lock().await; - let ( current_offset, current_position, @@ -576,9 +494,7 @@ impl IggyShard { .get_mut(namespace) .expect("local_partitions: partition must exist"); - // Recalculate index: segment deletion during async I/O shifts indices let segment_index = partition.log.segments().len() - 1; - let indexes = partition.log.indexes_mut()[segment_index] .as_mut() .expect("indexes must exist for segment being persisted"); @@ -594,7 +510,7 @@ impl IggyShard { Ok(batch_count) } - pub async fn poll_messages_from_local_partition( + pub(crate) async fn poll_messages_from_local_partition( &self, namespace: &IggyNamespace, consumer: crate::streaming::polling_consumer::PollingConsumer, diff --git a/core/server/src/shard/system/segments.rs b/core/server/src/shard/system/segments.rs index bf15b7b39..8940350ff 100644 --- a/core/server/src/shard/system/segments.rs +++ b/core/server/src/shard/system/segments.rs @@ -18,10 +18,258 @@ use crate::configs::cache_indexes::CacheIndexesConfig; use crate::shard::IggyShard; use crate::streaming::segments::Segment; -use iggy_common::IggyError; use iggy_common::sharding::IggyNamespace; +use iggy_common::{IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize}; impl IggyShard { + /// Performs all cleanup for a topic's partitions: time-based expiry then size-based trimming. + /// + /// Runs entirely inside the message pump's serialized loop — reads partition state and + /// deletes segments atomically with no TOCTOU window. + pub(crate) async fn clean_topic_messages( + &self, + stream_id: usize, + topic_id: usize, + partition_ids: &[usize], + ) -> Result<(u64, u64), IggyError> { + let (expiry, max_topic_size) = self + .metadata + .get_topic_config(stream_id, topic_id) + .unwrap_or(( + self.config.system.topic.message_expiry, + MaxTopicSize::Unlimited, + )); + + let mut total_segments = 0u64; + let mut total_messages = 0u64; + + // Phase 1: time-based expiry + if !matches!(expiry, IggyExpiry::NeverExpire) { + let now = IggyTimestamp::now(); + for &partition_id in partition_ids { + let (s, m) = self + .delete_expired_segments_for_partition( + stream_id, + topic_id, + partition_id, + now, + expiry, + ) + .await?; + total_segments += s; + total_messages += m; + } + } + + // Phase 2: size-based trimming + if !matches!(max_topic_size, MaxTopicSize::Unlimited) { + let max_bytes = max_topic_size.as_bytes_u64(); + let threshold = max_bytes * 9 / 10; + + loop { + let current_size = self + .metadata + .with_metadata(|m| { + m.streams + .get(stream_id) + .and_then(|s| s.topics.get(topic_id)) + .map(|t| t.stats.size_bytes_inconsistent()) + }) + .unwrap_or(0); + + if current_size < threshold { + break; + } + + let Some((target_partition_id, target_offset)) = + self.find_oldest_sealed_segment(stream_id, topic_id, partition_ids) + else { + break; + }; + + let (s, m) = self + .remove_segment_by_offset( + stream_id, + topic_id, + target_partition_id, + target_offset, + ) + .await?; + if s == 0 { + break; + } + total_segments += s; + total_messages += m; + } + } + + Ok((total_segments, total_messages)) + } + + /// Deletes all expired sealed segments from a single partition. + async fn delete_expired_segments_for_partition( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + now: IggyTimestamp, + expiry: IggyExpiry, + ) -> Result<(u64, u64), IggyError> { + let ns = IggyNamespace::new(stream_id, topic_id, partition_id); + + let expired_offsets: Vec<u64> = { + let partitions = self.local_partitions.borrow(); + let Some(partition) = partitions.get(&ns) else { + return Ok((0, 0)); + }; + let segments = partition.log.segments(); + let last_idx = segments.len().saturating_sub(1); + + segments + .iter() + .enumerate() + .filter(|(idx, seg)| *idx != last_idx && seg.is_expired(now, expiry)) + .map(|(_, seg)| seg.start_offset) + .collect() + }; + + let mut total_segments = 0u64; + let mut total_messages = 0u64; + for offset in expired_offsets { + let (s, m) = self + .remove_segment_by_offset(stream_id, topic_id, partition_id, offset) + .await?; + total_segments += s; + total_messages += m; + } + Ok((total_segments, total_messages)) + } + + /// Finds the oldest sealed segment across the given partitions, comparing by timestamp. + /// Returns `(partition_id, start_offset)` or `None` if no deletable segments exist. + fn find_oldest_sealed_segment( + &self, + stream_id: usize, + topic_id: usize, + partition_ids: &[usize], + ) -> Option<(usize, u64)> { + let partitions = self.local_partitions.borrow(); + let mut oldest: Option<(usize, u64, u64)> = None; + + for &partition_id in partition_ids { + let ns = IggyNamespace::new(stream_id, topic_id, partition_id); + let Some(partition) = partitions.get(&ns) else { + continue; + }; + + let segments = partition.log.segments(); + if segments.len() <= 1 { + continue; + } + + let first = &segments[0]; + if !first.sealed { + continue; + } + + match &oldest { + None => oldest = Some((partition_id, first.start_offset, first.start_timestamp)), + Some((_, _, ts)) if first.start_timestamp < *ts => { + oldest = Some((partition_id, first.start_offset, first.start_timestamp)); + } + _ => {} + } + } + + oldest.map(|(pid, offset, _)| (pid, offset)) + } + + /// Removes a single segment identified by its start_offset from the given partition. + /// Skips if the segment no longer exists or is the active (last) segment. + async fn remove_segment_by_offset( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + start_offset: u64, + ) -> Result<(u64, u64), IggyError> { + let ns = IggyNamespace::new(stream_id, topic_id, partition_id); + + let removed = { + let mut partitions = self.local_partitions.borrow_mut(); + let Some(partition) = partitions.get_mut(&ns) else { + return Ok((0, 0)); + }; + + let log = &mut partition.log; + let last_idx = log.segments().len().saturating_sub(1); + + let Some(idx) = log + .segments() + .iter() + .position(|s| s.start_offset == start_offset) + else { + return Ok((0, 0)); + }; + + if idx == last_idx { + tracing::warn!( + "Refusing to delete active segment (start_offset: {start_offset}) \ + for partition ID: {partition_id}" + ); + return Ok((0, 0)); + } + + let segment = log.segments_mut().remove(idx); + let storage = log.storages_mut().remove(idx); + log.indexes_mut().remove(idx); + + Some((segment, storage, partition.stats.clone())) + }; + + let Some((segment, mut storage, stats)) = removed else { + return Ok((0, 0)); + }; + + let segment_size = segment.size.as_bytes_u64(); + let end_offset = segment.end_offset; + let messages_in_segment = if start_offset == end_offset { + 0 + } else { + (end_offset - start_offset) + 1 + }; + + let _ = storage.shutdown(); + let (messages_path, index_path) = storage.segment_and_index_paths(); + + if let Some(path) = messages_path + && let Err(e) = compio::fs::remove_file(&path).await + { + tracing::error!("Failed to delete messages file {}: {}", path, e); + } + + if let Some(path) = index_path + && let Err(e) = compio::fs::remove_file(&path).await + { + tracing::error!("Failed to delete index file {}: {}", path, e); + } + + stats.decrement_size_bytes(segment_size); + stats.decrement_segments_count(1); + stats.decrement_messages_count(messages_in_segment); + + tracing::info!( + "Deleted segment (start: {}, end: {}, size: {}, messages: {}) from partition {}", + start_offset, + end_offset, + segment_size, + messages_in_segment, + partition_id + ); + + Ok((1, messages_in_segment)) + } + pub(crate) async fn delete_segments( &self, stream: usize, @@ -122,6 +370,13 @@ impl IggyShard { Ok(()) } + /// Creates a fresh segment at offset 0 after all segments have been drained. + /// + /// The log is momentarily empty between `delete_segments`' drain and this call, which is + /// safe because the message pump serializes all handlers — no concurrent operation can + /// observe the empty state. The new segment reuses the offset-0 file path; writers + /// open with `truncate(true)` when `!file_exists` to clear any stale data from a + /// previous incarnation at the same path. async fn init_log_in_local_partitions( &self, namespace: &IggyNamespace, @@ -157,6 +412,10 @@ impl IggyShard { /// Rotate to a new segment when the current segment is full. /// The new segment starts at the next offset after the current segment's end. /// Seals the old segment so it becomes eligible for expiry-based cleanup. + /// + /// Safety: called exclusively from the message pump (via append handler) — the captured + /// `old_segment_index` remains valid across the `create_segment_storage` await because + /// no other handler can modify the segment vec while this frame is in progress. pub(crate) async fn rotate_segment_in_local_partitions( &self, namespace: &IggyNamespace, @@ -206,9 +465,11 @@ impl IggyShard { partition.log.add_persisted_segment(segment, storage); partition.stats.increment_segments_count(1); tracing::info!( - "Rotated to new segment at offset {} for partition {}", + "Rotated to new segment at offset {} for partition {} (stream {}, topic {})", start_offset, - namespace.partition_id() + namespace.partition_id(), + namespace.stream_id(), + namespace.topic_id() ); } Ok(()) diff --git a/core/server/src/shard/tasks/continuous/message_pump.rs b/core/server/src/shard/tasks/continuous/message_pump.rs index 97e1f341d..85ed29c25 100644 --- a/core/server/src/shard/tasks/continuous/message_pump.rs +++ b/core/server/src/shard/tasks/continuous/message_pump.rs @@ -21,7 +21,7 @@ use crate::shard::transmission::frame::ShardFrame; use crate::shard::{IggyShard, handlers::handle_shard_message}; use futures::FutureExt; use std::rc::Rc; -use tracing::{debug, info}; +use tracing::{debug, error, info}; pub fn spawn_message_pump(shard: Rc<IggyShard>) { let shard_clone = shard.clone(); @@ -33,6 +33,23 @@ pub fn spawn_message_pump(shard: Rc<IggyShard>) { .spawn(); } +/// Single serialization point for all partition mutations on this shard. +/// +/// Every operation that mutates `local_partitions` — appends, segment rotation, flush, +/// segment deletion — is dispatched exclusively through this pump. The loop awaits each +/// `process_frame` to completion before dequeuing the next message, so handlers never +/// interleave even across internal `.await` points (disk I/O, fsync). +/// +/// Periodic tasks (message_saver, message_cleaner) run as separate futures on the same +/// compio thread but **cannot** mutate partitions directly. They read partition metadata +/// via `borrow()` and enqueue mutation requests back into this pump's channel. Those +/// requests block on a response that is only sent after the current frame completes, +/// guaranteeing strict ordering. +/// +/// This invariant replaces per-partition write locks and eliminates TOCTOU races between +/// concurrent handlers. All `pub(crate)` mutation methods on `IggyShard` (e.g. +/// `append_messages_to_local_partition`, `delete_expired_segments`, +/// `rotate_segment_in_local_partitions`) assume they are called from within this pump. async fn message_pump( shard: Rc<IggyShard>, shutdown: ShutdownToken, @@ -44,24 +61,17 @@ async fn message_pump( info!("Starting message passing task"); - // Get the inner flume receiver directly let receiver = messages_receiver.inner; loop { futures::select! { _ = shutdown.wait().fuse() => { - debug!("Message receiver shutting down"); + debug!("Message pump shutting down"); break; } frame = receiver.recv_async().fuse() => { match frame { - Ok(ShardFrame { message, response_sender }) => { - if let (Some(response), Some(tx)) = - (handle_shard_message(&shard, message).await, response_sender) - { - let _ = tx.send(response).await; - } - } + Ok(frame) => process_frame(&shard, frame).await, Err(_) => { debug!("Message receiver closed; exiting pump"); break; @@ -71,5 +81,56 @@ async fn message_pump( } } + // Drain remaining frames before flushing — any in-flight appends must + // complete so their data lands in the journal before we flush to disk. + while let Ok(frame) = receiver.try_recv() { + process_frame(&shard, frame).await; + } + + flush_and_fsync_all_partitions(&shard).await; + Ok(()) } + +async fn process_frame(shard: &Rc<IggyShard>, frame: ShardFrame) { + let ShardFrame { + message, + response_sender, + } = frame; + if let (Some(response), Some(tx)) = + (handle_shard_message(shard, message).await, response_sender) + { + let _ = tx.send(response).await; + } +} + +/// Final flush + fsync of all local partitions. Runs inside the pump after +/// the main loop exits, so no other pump frame can interleave. +async fn flush_and_fsync_all_partitions(shard: &Rc<IggyShard>) { + let namespaces = shard.get_current_shard_namespaces(); + if namespaces.is_empty() { + return; + } + + let mut flushed = 0u32; + for ns in &namespaces { + match shard + .flush_unsaved_buffer_from_local_partitions(ns, false) + .await + { + Ok(saved) if saved > 0 => flushed += 1, + Ok(_) => {} + Err(e) => error!("Shutdown flush failed for partition {:?}: {}", ns, e), + } + } + if flushed > 0 { + info!("Shutdown: flushed {flushed} partitions."); + } + + for ns in &namespaces { + if let Err(e) = shard.fsync_all_messages_from_local_partitions(ns).await { + error!("Shutdown fsync failed for partition {:?}: {}", ns, e); + } + } + info!("Shutdown: fsync complete for all partitions."); +} diff --git a/core/server/src/shard/tasks/periodic/message_cleaner.rs b/core/server/src/shard/tasks/periodic/message_cleaner.rs index 5e33428c2..c0443da6e 100644 --- a/core/server/src/shard/tasks/periodic/message_cleaner.rs +++ b/core/server/src/shard/tasks/periodic/message_cleaner.rs @@ -17,10 +17,12 @@ */ use crate::shard::IggyShard; +use crate::shard::transmission::frame::ShardResponse; +use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload}; +use iggy_common::IggyError; use iggy_common::sharding::IggyNamespace; -use iggy_common::{IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize}; use std::rc::Rc; -use tracing::{debug, error, info, trace, warn}; +use tracing::{error, info, trace}; pub fn spawn_message_cleaner(shard: Rc<IggyShard>) { if !shard.config.data_maintenance.messages.cleaner_enabled { @@ -43,24 +45,23 @@ pub fn spawn_message_cleaner(shard: Rc<IggyShard>) { .task_registry .periodic("clean_messages") .every(period) - .tick(move |_shutdown| clean_expired_messages(shard_clone.clone())) + .tick(move |_shutdown| clean_messages(shard_clone.clone())) .spawn(); } -async fn clean_expired_messages(shard: Rc<IggyShard>) -> Result<(), IggyError> { +/// Groups namespaces by topic and sends a single `CleanTopicMessages` per topic to the pump. +/// All segment inspection and deletion happens inside the pump handler — no TOCTOU. +async fn clean_messages(shard: Rc<IggyShard>) -> Result<(), IggyError> { trace!("Cleaning expired messages..."); let namespaces = shard.get_current_shard_namespaces(); - let now = IggyTimestamp::now(); let mut topics: std::collections::HashMap<(usize, usize), Vec<usize>> = std::collections::HashMap::new(); for ns in namespaces { - let stream_id = ns.stream_id(); - let topic_id = ns.topic_id(); topics - .entry((stream_id, topic_id)) + .entry((ns.stream_id(), ns.topic_id())) .or_default() .push(ns.partition_id()); } @@ -69,63 +70,44 @@ async fn clean_expired_messages(shard: Rc<IggyShard>) -> Result<(), IggyError> { let mut total_deleted_messages = 0u64; for ((stream_id, topic_id), partition_ids) in topics { - let mut topic_deleted_segments = 0u64; - let mut topic_deleted_messages = 0u64; - - // Phase 1: Time-based expiry cleanup per partition - for &partition_id in &partition_ids { - let expired_result = - handle_expired_segments(&shard, stream_id, topic_id, partition_id, now).await; - - match expired_result { - Ok(deleted) => { - topic_deleted_segments += deleted.segments_count; - topic_deleted_messages += deleted.messages_count; - } - Err(err) => { - error!( - "Failed to clean expired segments for stream ID: {}, topic ID: {}, partition ID: {}. Error: {}", - stream_id, topic_id, partition_id, err + let ns = IggyNamespace::new(stream_id, topic_id, partition_ids[0]); + let payload = ShardRequestPayload::CleanTopicMessages { + stream_id, + topic_id, + partition_ids, + }; + let request = ShardRequest::data_plane(ns, payload); + + match shard.send_to_data_plane(request).await { + Ok(ShardResponse::CleanTopicMessages { + deleted_segments, + deleted_messages, + }) => { + if deleted_segments > 0 { + info!( + "Deleted {} segments and {} messages for stream {}, topic {}", + deleted_segments, deleted_messages, stream_id, topic_id ); + shard.metrics.decrement_segments(deleted_segments as u32); + shard.metrics.decrement_messages(deleted_messages); + total_deleted_segments += deleted_segments; + total_deleted_messages += deleted_messages; } } - } - - // Phase 2: Size-based cleanup at topic level (fair across partitions) - let size_result = - handle_size_based_cleanup(&shard, stream_id, topic_id, &partition_ids).await; - - match size_result { - Ok(deleted) => { - topic_deleted_segments += deleted.segments_count; - topic_deleted_messages += deleted.messages_count; + Ok(ShardResponse::ErrorResponse(err)) => { + error!( + "Failed to clean messages for stream {}, topic {}: {}", + stream_id, topic_id, err + ); } + Ok(_) => unreachable!("Expected CleanTopicMessages response"), Err(err) => { error!( - "Failed to clean segments by size for stream ID: {}, topic ID: {}. Error: {}", + "Failed to send CleanTopicMessages for stream {}, topic {}: {}", stream_id, topic_id, err ); } } - - if topic_deleted_segments > 0 { - info!( - "Deleted {} segments and {} messages for stream ID: {}, topic ID: {}", - topic_deleted_segments, topic_deleted_messages, stream_id, topic_id - ); - total_deleted_segments += topic_deleted_segments; - total_deleted_messages += topic_deleted_messages; - - shard - .metrics - .decrement_segments(topic_deleted_segments as u32); - shard.metrics.decrement_messages(topic_deleted_messages); - } else { - trace!( - "No segments were deleted for stream ID: {}, topic ID: {}", - stream_id, topic_id - ); - } } if total_deleted_segments > 0 { @@ -137,319 +119,3 @@ async fn clean_expired_messages(shard: Rc<IggyShard>) -> Result<(), IggyError> { Ok(()) } - -#[derive(Debug, Default)] -struct DeletedSegments { - pub segments_count: u64, - pub messages_count: u64, -} - -impl DeletedSegments { - fn add(&mut self, other: &DeletedSegments) { - self.segments_count += other.segments_count; - self.messages_count += other.messages_count; - } -} - -async fn handle_expired_segments( - shard: &Rc<IggyShard>, - stream_id: usize, - topic_id: usize, - partition_id: usize, - now: IggyTimestamp, -) -> Result<DeletedSegments, IggyError> { - let ns = IggyNamespace::new(stream_id, topic_id, partition_id); - - let expiry = shard - .metadata - .get_topic_config(stream_id, topic_id) - .map(|(exp, _)| exp) - .unwrap_or(shard.config.system.topic.message_expiry); - - if matches!(expiry, IggyExpiry::NeverExpire) { - return Ok(DeletedSegments::default()); - } - - let expired_segment_offsets: Vec<u64> = { - let partitions = shard.local_partitions.borrow(); - let Some(partition) = partitions.get(&ns) else { - return Ok(DeletedSegments::default()); - }; - let segments = partition.log.segments(); - let last_idx = segments.len().saturating_sub(1); - - segments - .iter() - .enumerate() - .filter(|(idx, segment)| *idx != last_idx && segment.is_expired(now, expiry)) - .map(|(_, segment)| segment.start_offset) - .collect() - }; - - if expired_segment_offsets.is_empty() { - return Ok(DeletedSegments::default()); - } - - debug!( - "Found {} expired segments for stream ID: {}, topic ID: {}, partition ID: {}", - expired_segment_offsets.len(), - stream_id, - topic_id, - partition_id - ); - - delete_segments( - shard, - stream_id, - topic_id, - partition_id, - &expired_segment_offsets, - ) - .await -} - -/// Handles size-based cleanup at the topic level. -/// Deletes the globally oldest sealed segment across all partitions until topic size is below 90% threshold. -async fn handle_size_based_cleanup( - shard: &Rc<IggyShard>, - stream_id: usize, - topic_id: usize, - partition_ids: &[usize], -) -> Result<DeletedSegments, IggyError> { - let Some((max_size, _)) = shard.metadata.with_metadata(|m| { - m.streams - .get(stream_id) - .and_then(|s| s.topics.get(topic_id)) - .map(|t| (t.max_topic_size, t.stats.size_bytes_inconsistent())) - }) else { - return Ok(DeletedSegments::default()); - }; - - if matches!(max_size, MaxTopicSize::Unlimited) { - return Ok(DeletedSegments::default()); - } - - let max_bytes = max_size.as_bytes_u64(); - let threshold = max_bytes * 9 / 10; - - let mut total_deleted = DeletedSegments::default(); - - loop { - let current_size = shard - .metadata - .with_metadata(|m| { - m.streams - .get(stream_id) - .and_then(|s| s.topics.get(topic_id)) - .map(|t| t.stats.size_bytes_inconsistent()) - }) - .unwrap_or(0); - - if current_size < threshold { - break; - } - - let Some((target_partition_id, target_offset, target_timestamp)) = - find_oldest_segment_in_shard(shard, stream_id, topic_id, partition_ids) - else { - debug!( - "No deletable segments found for stream ID: {}, topic ID: {} (all partitions have only active segment)", - stream_id, topic_id - ); - break; - }; - - info!( - "Deleting oldest segment (start_offset: {}, timestamp: {}) from partition {} for stream ID: {}, topic ID: {}", - target_offset, target_timestamp, target_partition_id, stream_id, topic_id - ); - - let deleted = delete_segments( - shard, - stream_id, - topic_id, - target_partition_id, - &[target_offset], - ) - .await?; - total_deleted.add(&deleted); - - if deleted.segments_count == 0 { - break; - } - } - - Ok(total_deleted) -} - -/// Finds the oldest sealed segment across partitions owned by this shard. -/// For each partition, the first segment in the vector is the oldest (segments are ordered). -/// Compares first segments across partitions by timestamp to ensure fair deletion. -/// Returns (partition_id, start_offset, start_timestamp) or None if no deletable segments exist. -fn find_oldest_segment_in_shard( - shard: &Rc<IggyShard>, - stream_id: usize, - topic_id: usize, - partition_ids: &[usize], -) -> Option<(usize, u64, u64)> { - let partitions = shard.local_partitions.borrow(); - - let mut oldest: Option<(usize, u64, u64)> = None; - - for &partition_id in partition_ids { - let ns = IggyNamespace::new(stream_id, topic_id, partition_id); - let Some(partition) = partitions.get(&ns) else { - continue; - }; - - let segments = partition.log.segments(); - if segments.len() <= 1 { - continue; - } - - // First segment is the oldest in this partition (segments are ordered chronologically) - let first_segment = &segments[0]; - if !first_segment.sealed { - continue; - } - - let candidate = ( - partition_id, - first_segment.start_offset, - first_segment.start_timestamp, - ); - match &oldest { - None => oldest = Some(candidate), - Some((_, _, oldest_ts)) if first_segment.start_timestamp < *oldest_ts => { - oldest = Some(candidate); - } - _ => {} - } - } - - oldest -} - -async fn delete_segments( - shard: &Rc<IggyShard>, - stream_id: usize, - topic_id: usize, - partition_id: usize, - segment_offsets: &[u64], -) -> Result<DeletedSegments, IggyError> { - if segment_offsets.is_empty() { - return Ok(DeletedSegments::default()); - } - - info!( - "Deleting {} segments for stream ID: {}, topic ID: {}, partition ID: {}...", - segment_offsets.len(), - stream_id, - topic_id, - partition_id - ); - - let mut segments_count = 0u64; - let mut messages_count = 0u64; - - let ns = IggyNamespace::new(stream_id, topic_id, partition_id); - - let (stats, segments_to_delete, mut storages_to_delete) = { - let mut partitions = shard.local_partitions.borrow_mut(); - let Some(partition) = partitions.get_mut(&ns) else { - return Ok(DeletedSegments::default()); - }; - - let log = &mut partition.log; - let mut segments_to_remove = Vec::new(); - let mut storages_to_remove = Vec::new(); - - let mut indices_to_remove: Vec<usize> = Vec::new(); - for &start_offset in segment_offsets { - if let Some(idx) = log - .segments() - .iter() - .position(|s| s.start_offset == start_offset) - { - indices_to_remove.push(idx); - } - } - - indices_to_remove.sort_by(|a, b| b.cmp(a)); - for idx in indices_to_remove { - let segment = log.segments_mut().remove(idx); - let storage = log.storages_mut().remove(idx); - log.indexes_mut().remove(idx); - - segments_to_remove.push(segment); - storages_to_remove.push(storage); - } - - ( - partition.stats.clone(), - segments_to_remove, - storages_to_remove, - ) - }; - - for (segment, storage) in segments_to_delete - .into_iter() - .zip(storages_to_delete.iter_mut()) - { - let segment_size = segment.size.as_bytes_u64(); - let start_offset = segment.start_offset; - let end_offset = segment.end_offset; - - let messages_in_segment = if start_offset == end_offset { - 0 - } else { - (end_offset - start_offset) + 1 - }; - - let _ = storage.shutdown(); - let (messages_path, index_path) = storage.segment_and_index_paths(); - - if let Some(path) = messages_path { - if let Err(e) = compio::fs::remove_file(&path).await { - error!("Failed to delete messages file {}: {}", path, e); - } else { - trace!("Deleted messages file: {}", path); - } - } else { - warn!( - "Messages writer path not found for segment starting at offset {}", - start_offset - ); - } - - if let Some(path) = index_path { - if let Err(e) = compio::fs::remove_file(&path).await { - error!("Failed to delete index file {}: {}", path, e); - } else { - trace!("Deleted index file: {}", path); - } - } else { - warn!( - "Index writer path not found for segment starting at offset {}", - start_offset - ); - } - - stats.decrement_size_bytes(segment_size); - stats.decrement_segments_count(1); - stats.decrement_messages_count(messages_in_segment); - - info!( - "Deleted segment with start offset {} (end: {}, size: {}, messages: {}) from partition ID: {}", - start_offset, end_offset, segment_size, messages_in_segment, partition_id - ); - - segments_count += 1; - messages_count += messages_in_segment; - } - - Ok(DeletedSegments { - segments_count, - messages_count, - }) -} diff --git a/core/server/src/shard/tasks/periodic/message_saver.rs b/core/server/src/shard/tasks/periodic/message_saver.rs index d959ca8c7..0ebb5cdfc 100644 --- a/core/server/src/shard/tasks/periodic/message_saver.rs +++ b/core/server/src/shard/tasks/periodic/message_saver.rs @@ -17,6 +17,8 @@ */ use crate::shard::IggyShard; +use crate::shard::transmission::frame::ShardResponse; +use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload}; use iggy_common::IggyError; use std::rc::Rc; use tracing::{error, info, trace}; @@ -29,16 +31,13 @@ pub fn spawn_message_saver(shard: Rc<IggyShard>) { period ); let shard_clone = shard.clone(); - let shard_for_shutdown = shard.clone(); shard .task_registry .periodic("save_messages") .every(period) - .last_tick_on_shutdown(true) + // No last_tick_on_shutdown — the pump handles final flush + fsync + // during its own shutdown (see message_pump.rs). .tick(move |_shutdown| save_messages(shard_clone.clone())) - .on_shutdown(move |result| { - fsync_all_segments_on_shutdown(shard_for_shutdown.clone(), result) - }) .spawn(); } @@ -46,63 +45,28 @@ async fn save_messages(shard: Rc<IggyShard>) -> Result<(), IggyError> { trace!("Saving buffered messages..."); let namespaces = shard.get_current_shard_namespaces(); - let mut total_saved_messages = 0u64; let mut partitions_flushed = 0u32; for ns in namespaces { - if shard.local_partitions.borrow().get(&ns).is_some() { - match shard - .flush_unsaved_buffer_from_local_partitions(&ns, false) - .await - { - Ok(saved) => { - if saved > 0 { - total_saved_messages += saved as u64; - partitions_flushed += 1; - } - } - Err(err) => { - error!("Failed to save messages for partition {:?}: {}", ns, err); - } + let payload = ShardRequestPayload::FlushUnsavedBuffer { fsync: false }; + let request = ShardRequest::data_plane(ns, payload); + match shard.send_to_data_plane(request).await { + Ok(ShardResponse::FlushUnsavedBuffer { flushed_count }) if flushed_count > 0 => { + partitions_flushed += 1; } + Ok(ShardResponse::FlushUnsavedBuffer { .. }) => {} + Ok(ShardResponse::ErrorResponse(err)) => { + error!("Failed to save messages for partition {:?}: {}", ns, err); + } + Err(err) => { + error!("Failed to save messages for partition {:?}: {}", ns, err); + } + _ => {} } } - if total_saved_messages > 0 { - info!("Saved {total_saved_messages} messages from {partitions_flushed} partitions."); + if partitions_flushed > 0 { + info!("Flushed {partitions_flushed} partitions."); } Ok(()) } - -async fn fsync_all_segments_on_shutdown(shard: Rc<IggyShard>, result: Result<(), IggyError>) { - if result.is_err() { - error!( - "Last save_messages tick failed, skipping fsync: {:?}", - result - ); - return; - } - - trace!("Performing fsync on all segments during shutdown..."); - - let namespaces = shard.get_current_shard_namespaces(); - - for ns in namespaces { - if shard.local_partitions.borrow().get(&ns).is_some() { - match shard.fsync_all_messages_from_local_partitions(&ns).await { - Ok(()) => { - trace!( - "Successfully fsynced segment for partition {:?} during shutdown", - ns - ); - } - Err(err) => { - error!( - "Failed to fsync segment for partition {:?} during shutdown: {}", - ns, err - ); - } - } - } - } -} diff --git a/core/server/src/shard/transmission/frame.rs b/core/server/src/shard/transmission/frame.rs index 4576221cf..c8cc585a9 100644 --- a/core/server/src/shard/transmission/frame.rs +++ b/core/server/src/shard/transmission/frame.rs @@ -60,8 +60,14 @@ pub struct ConsumerGroupResponseData { pub enum ShardResponse { PollMessages((IggyPollMetadata, IggyMessagesBatchSet)), SendMessages, - FlushUnsavedBuffer, + FlushUnsavedBuffer { + flushed_count: u32, + }, DeleteSegments, + CleanTopicMessages { + deleted_segments: u64, + deleted_messages: u64, + }, Event, CreateStreamResponse(StreamResponseData), DeleteStreamResponse(usize), diff --git a/core/server/src/shard/transmission/message.rs b/core/server/src/shard/transmission/message.rs index 4f04fc13e..6fe952b92 100644 --- a/core/server/src/shard/transmission/message.rs +++ b/core/server/src/shard/transmission/message.rs @@ -115,6 +115,11 @@ pub enum ShardRequestPayload { DeleteSegments { segments_count: u32, }, + CleanTopicMessages { + stream_id: usize, + topic_id: usize, + partition_ids: Vec<usize>, + }, SocketTransfer { fd: OwnedFd, from_shard: u16, diff --git a/core/server/src/streaming/partitions/local_partition.rs b/core/server/src/streaming/partitions/local_partition.rs index 4eeba5dbb..14f5e63d9 100644 --- a/core/server/src/streaming/partitions/local_partition.rs +++ b/core/server/src/streaming/partitions/local_partition.rs @@ -27,7 +27,6 @@ use super::{ use crate::streaming::{deduplication::MessageDeduplicator, stats::PartitionStats}; use iggy_common::IggyTimestamp; use std::sync::{Arc, atomic::AtomicU64}; -use tokio::sync::Mutex as TokioMutex; /// Per-shard partition data - mutable, single-threaded access. #[derive(Debug)] @@ -41,7 +40,6 @@ pub struct LocalPartition { pub created_at: IggyTimestamp, pub revision_id: u64, pub should_increment_offset: bool, - pub write_lock: Arc<TokioMutex<()>>, } impl LocalPartition { @@ -67,7 +65,6 @@ impl LocalPartition { created_at, revision_id, should_increment_offset, - write_lock: Arc::new(TokioMutex::new(())), } } @@ -94,7 +91,6 @@ impl LocalPartition { created_at, revision_id, should_increment_offset, - write_lock: Arc::new(TokioMutex::new(())), } } } diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs b/core/server/src/streaming/segments/indexes/index_writer.rs index 2022b411d..fb5a0a533 100644 --- a/core/server/src/streaming/segments/indexes/index_writer.rs +++ b/core/server/src/streaming/segments/indexes/index_writer.rs @@ -48,9 +48,14 @@ impl IndexWriter { fsync: bool, file_exists: bool, ) -> Result<Self, IggyError> { - let file = OpenOptions::new() - .create(true) - .write(true) + let mut opts = OpenOptions::new(); + opts.create(true).write(true); + if !file_exists { + // When creating a fresh segment at a reused path (e.g. offset 0 after all segments + // were deleted), truncate to clear any stale data from a previous incarnation. + opts.truncate(true); + } + let file = opts .open(file_path) .await .error(|e: &std::io::Error| format!("Failed to open index file: {file_path}. {e}")) diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs b/core/server/src/streaming/segments/messages/messages_writer.rs index 439e30a80..525f4292a 100644 --- a/core/server/src/streaming/segments/messages/messages_writer.rs +++ b/core/server/src/streaming/segments/messages/messages_writer.rs @@ -50,9 +50,14 @@ impl MessagesWriter { fsync: bool, file_exists: bool, ) -> Result<Self, IggyError> { - let file = OpenOptions::new() - .create(true) - .write(true) + let mut opts = OpenOptions::new(); + opts.create(true).write(true); + if !file_exists { + // When creating a fresh segment at a reused path (e.g. offset 0 after all segments + // were deleted), truncate to clear any stale data from a previous incarnation. + opts.truncate(true); + } + let file = opts .open(file_path) .await .error(|err: &std::io::Error| {
