This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 88a598ad9 fix(server): fix stream stats corruption on topic purge
(#2736)
88a598ad9 is described below
commit 88a598ad91bac00188d75731dcca866deb65def7
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Feb 16 10:33:39 2026 +0100
fix(server): fix stream stats corruption on topic purge (#2736)
Purging one topic while a sibling retained messages reported
zero messages for the entire stream via get_stats(). The
zero_out chain used store(0) at each tier and then called
zero_out on the parent, blanking the entire ancestor chain
rather than subtracting the purged child's contribution.
Replace store(0) with swap(0) + parent.decrement(prev) so
each tier only subtracts its own value. Remove the redundant
topic-level zero_out from purge_topic — partition zeroing
already propagates correctly through the hierarchy.
---
.../scenarios/stream_size_validation_scenario.rs | 34 +++++++++++++
core/server/src/shard/system/topics.rs | 6 +--
core/server/src/streaming/stats/mod.rs | 57 +++++-----------------
3 files changed, 48 insertions(+), 49 deletions(-)
diff --git
a/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
b/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
index 97d3d137d..fef4c339a 100644
--- a/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
+++ b/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
@@ -84,6 +84,23 @@ pub async fn run(harness: &TestHarness) {
validate_topic(&client, S2_NAME, T1_NAME, MSGS_SIZE * 2, MSGS_COUNT *
2).await;
validate_topic(&client, S2_NAME, T2_NAME, MSGS_SIZE * 2, MSGS_COUNT *
2).await;
+ // 13a. System stats must aggregate all streams: 2 streams × 2 topics ×
MSGS_COUNT*2.
+ validate_system_stats(&client, MSGS_SIZE * 8, MSGS_COUNT * 8).await;
+
+ // 13b. Purge T1 from S1 while T2 still has messages — S1's system-level
stats
+ // must reflect only T2, not be blindly zeroed.
+ //
+ // Regression: zero_out_all() propagated store(0) up to StreamStats,
zeroing the
+ // entire stream counter instead of subtracting only the purged topic's
contribution.
+ // get_stats() reads StreamStats atomics directly, so it reports 0 for the
stream
+ // while the sibling topic still has MSGS_COUNT*2 messages.
+ purge_topic(&client, S1_NAME, T1_NAME).await;
+ validate_topic(&client, S1_NAME, T1_NAME, 0, 0).await;
+ validate_topic(&client, S1_NAME, T2_NAME, MSGS_SIZE * 2, MSGS_COUNT *
2).await;
+ validate_stream(&client, S1_NAME, MSGS_SIZE * 2, MSGS_COUNT * 2).await;
+ // S1 lost MSGS_COUNT*2 from the purge; S2 is unchanged.
+ validate_system_stats(&client, MSGS_SIZE * 6, MSGS_COUNT * 6).await;
+
// 14. Delete first topic on the first stream
delete_topic(&client, S1_NAME, T1_NAME).await;
@@ -202,6 +219,23 @@ async fn validate_operations_on_topic_twice(
.await;
}
+async fn validate_system_stats(
+ client: &IggyClient,
+ expected_size: u64,
+ expected_messages_count: u64,
+) {
+ let stats = client.get_stats().await.unwrap();
+ assert_eq!(
+ stats.messages_count, expected_messages_count,
+ "system stats messages_count mismatch"
+ );
+ assert_eq!(
+ stats.messages_size_bytes.as_bytes_u64(),
+ expected_size,
+ "system stats messages_size_bytes mismatch"
+ );
+}
+
async fn validate_stream(
client: &IggyClient,
stream_name: &str,
diff --git a/core/server/src/shard/system/topics.rs
b/core/server/src/shard/system/topics.rs
index 24d53d220..c31da7f39 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -215,10 +215,8 @@ impl IggyShard {
}
}
- if let Some(topic_stats) = self.metadata.get_topic_stats(stream,
topic_id) {
- topic_stats.zero_out_all();
- }
-
+ // Zero partition stats — propagation handles topic and stream
counters.
+ // Topic stats must NOT be zeroed separately to avoid
double-decrementing the stream.
for &partition_id in &partition_ids {
let ns = IggyNamespace::new(stream, topic_id, partition_id);
if let Some(partition_stats) =
self.metadata.get_partition_stats(&ns) {
diff --git a/core/server/src/streaming/stats/mod.rs
b/core/server/src/streaming/stats/mod.rs
index 3493aec57..e1dbbc061 100644
--- a/core/server/src/streaming/stats/mod.rs
+++ b/core/server/src/streaming/stats/mod.rs
@@ -179,35 +179,19 @@ impl TopicStats {
self.segments_count.load(Ordering::Relaxed)
}
- pub fn zero_out_parent_size_bytes(&self) {
- self.parent.zero_out_size_bytes();
- }
-
- pub fn zero_out_parent_messages_count(&self) {
- self.parent.zero_out_messages_count();
- }
-
- pub fn zero_out_parent_segments_count(&self) {
- self.parent.zero_out_segments_count();
- }
-
- pub fn zero_out_parent_all(&self) {
- self.parent.zero_out_all();
- }
-
pub fn zero_out_size_bytes(&self) {
- self.size_bytes.store(0, Ordering::Relaxed);
- self.zero_out_parent_size_bytes();
+ let prev = self.size_bytes.swap(0, Ordering::AcqRel);
+ self.parent.decrement_size_bytes(prev);
}
pub fn zero_out_messages_count(&self) {
- self.messages_count.store(0, Ordering::Relaxed);
- self.zero_out_parent_messages_count();
+ let prev = self.messages_count.swap(0, Ordering::AcqRel);
+ self.parent.decrement_messages_count(prev);
}
pub fn zero_out_segments_count(&self) {
- self.segments_count.store(0, Ordering::Relaxed);
- self.zero_out_parent_segments_count();
+ let prev = self.segments_count.swap(0, Ordering::AcqRel);
+ self.parent.decrement_segments_count(prev);
}
pub fn zero_out_all(&self) {
@@ -319,35 +303,19 @@ impl PartitionStats {
self.current_offset.store(offset, Ordering::Relaxed);
}
- pub fn zero_out_parent_size_bytes(&self) {
- self.parent.zero_out_size_bytes();
- }
-
- pub fn zero_out_parent_messages_count(&self) {
- self.parent.zero_out_messages_count();
- }
-
- pub fn zero_out_parent_segments_count(&self) {
- self.parent.zero_out_segments_count();
- }
-
- pub fn zero_out_parent_all(&self) {
- self.parent.zero_out_all();
- }
-
pub fn zero_out_size_bytes(&self) {
- self.size_bytes.store(0, Ordering::Relaxed);
- self.zero_out_parent_size_bytes();
+ let prev = self.size_bytes.swap(0, Ordering::AcqRel);
+ self.parent.decrement_size_bytes(prev);
}
pub fn zero_out_messages_count(&self) {
- self.messages_count.store(0, Ordering::Relaxed);
- self.zero_out_parent_messages_count();
+ let prev = self.messages_count.swap(0, Ordering::AcqRel);
+ self.parent.decrement_messages_count(prev);
}
pub fn zero_out_segments_count(&self) {
- self.segments_count.store(0, Ordering::Relaxed);
- self.zero_out_parent_segments_count();
+ let prev = self.segments_count.swap(0, Ordering::AcqRel);
+ self.parent.decrement_segments_count(prev);
}
pub fn zero_out_current_offset(&self) {
@@ -359,6 +327,5 @@ impl PartitionStats {
self.zero_out_messages_count();
self.zero_out_segments_count();
self.zero_out_current_offset();
- self.zero_out_parent_all();
}
}