This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 88a598ad9 fix(server): fix stream stats corruption on topic purge 
(#2736)
88a598ad9 is described below

commit 88a598ad91bac00188d75731dcca866deb65def7
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Feb 16 10:33:39 2026 +0100

    fix(server): fix stream stats corruption on topic purge (#2736)
    
    Purging one topic while a sibling retained messages reported
    zero messages for the entire stream via get_stats(). The
    zero_out chain used store(0) at each tier and then called
    zero_out on the parent, blanking the entire ancestor chain
    rather than subtracting the purged child's contribution.
    
    Replace store(0) with swap(0) + parent.decrement(prev) so
    each tier only subtracts its own value. Remove the redundant
    topic-level zero_out from purge_topic — partition zeroing
    already propagates correctly through the hierarchy.
---
 .../scenarios/stream_size_validation_scenario.rs   | 34 +++++++++++++
 core/server/src/shard/system/topics.rs             |  6 +--
 core/server/src/streaming/stats/mod.rs             | 57 +++++-----------------
 3 files changed, 48 insertions(+), 49 deletions(-)

diff --git 
a/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs 
b/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
index 97d3d137d..fef4c339a 100644
--- a/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
+++ b/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
@@ -84,6 +84,23 @@ pub async fn run(harness: &TestHarness) {
     validate_topic(&client, S2_NAME, T1_NAME, MSGS_SIZE * 2, MSGS_COUNT * 
2).await;
     validate_topic(&client, S2_NAME, T2_NAME, MSGS_SIZE * 2, MSGS_COUNT * 
2).await;
 
+    // 13a. System stats must aggregate all streams: 2 streams × 2 topics × 
MSGS_COUNT*2.
+    validate_system_stats(&client, MSGS_SIZE * 8, MSGS_COUNT * 8).await;
+
+    // 13b. Purge T1 from S1 while T2 still has messages — S1's system-level 
stats
+    // must reflect only T2, not be blindly zeroed.
+    //
+    // Regression: zero_out_all() propagated store(0) up to StreamStats, 
zeroing the
+    // entire stream counter instead of subtracting only the purged topic's 
contribution.
+    // get_stats() reads StreamStats atomics directly, so it reports 0 for the 
stream
+    // while the sibling topic still has MSGS_COUNT*2 messages.
+    purge_topic(&client, S1_NAME, T1_NAME).await;
+    validate_topic(&client, S1_NAME, T1_NAME, 0, 0).await;
+    validate_topic(&client, S1_NAME, T2_NAME, MSGS_SIZE * 2, MSGS_COUNT * 
2).await;
+    validate_stream(&client, S1_NAME, MSGS_SIZE * 2, MSGS_COUNT * 2).await;
+    // S1 lost MSGS_COUNT*2 from the purge; S2 is unchanged.
+    validate_system_stats(&client, MSGS_SIZE * 6, MSGS_COUNT * 6).await;
+
     // 14. Delete first topic on the first stream
     delete_topic(&client, S1_NAME, T1_NAME).await;
 
@@ -202,6 +219,23 @@ async fn validate_operations_on_topic_twice(
     .await;
 }
 
+async fn validate_system_stats(
+    client: &IggyClient,
+    expected_size: u64,
+    expected_messages_count: u64,
+) {
+    let stats = client.get_stats().await.unwrap();
+    assert_eq!(
+        stats.messages_count, expected_messages_count,
+        "system stats messages_count mismatch"
+    );
+    assert_eq!(
+        stats.messages_size_bytes.as_bytes_u64(),
+        expected_size,
+        "system stats messages_size_bytes mismatch"
+    );
+}
+
 async fn validate_stream(
     client: &IggyClient,
     stream_name: &str,
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index 24d53d220..c31da7f39 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -215,10 +215,8 @@ impl IggyShard {
             }
         }
 
-        if let Some(topic_stats) = self.metadata.get_topic_stats(stream, 
topic_id) {
-            topic_stats.zero_out_all();
-        }
-
+        // Zero partition stats — propagation handles topic and stream 
counters.
+        // Topic stats must NOT be zeroed separately to avoid 
double-decrementing the stream.
         for &partition_id in &partition_ids {
             let ns = IggyNamespace::new(stream, topic_id, partition_id);
             if let Some(partition_stats) = 
self.metadata.get_partition_stats(&ns) {
diff --git a/core/server/src/streaming/stats/mod.rs 
b/core/server/src/streaming/stats/mod.rs
index 3493aec57..e1dbbc061 100644
--- a/core/server/src/streaming/stats/mod.rs
+++ b/core/server/src/streaming/stats/mod.rs
@@ -179,35 +179,19 @@ impl TopicStats {
         self.segments_count.load(Ordering::Relaxed)
     }
 
-    pub fn zero_out_parent_size_bytes(&self) {
-        self.parent.zero_out_size_bytes();
-    }
-
-    pub fn zero_out_parent_messages_count(&self) {
-        self.parent.zero_out_messages_count();
-    }
-
-    pub fn zero_out_parent_segments_count(&self) {
-        self.parent.zero_out_segments_count();
-    }
-
-    pub fn zero_out_parent_all(&self) {
-        self.parent.zero_out_all();
-    }
-
     pub fn zero_out_size_bytes(&self) {
-        self.size_bytes.store(0, Ordering::Relaxed);
-        self.zero_out_parent_size_bytes();
+        let prev = self.size_bytes.swap(0, Ordering::AcqRel);
+        self.parent.decrement_size_bytes(prev);
     }
 
     pub fn zero_out_messages_count(&self) {
-        self.messages_count.store(0, Ordering::Relaxed);
-        self.zero_out_parent_messages_count();
+        let prev = self.messages_count.swap(0, Ordering::AcqRel);
+        self.parent.decrement_messages_count(prev);
     }
 
     pub fn zero_out_segments_count(&self) {
-        self.segments_count.store(0, Ordering::Relaxed);
-        self.zero_out_parent_segments_count();
+        let prev = self.segments_count.swap(0, Ordering::AcqRel);
+        self.parent.decrement_segments_count(prev);
     }
 
     pub fn zero_out_all(&self) {
@@ -319,35 +303,19 @@ impl PartitionStats {
         self.current_offset.store(offset, Ordering::Relaxed);
     }
 
-    pub fn zero_out_parent_size_bytes(&self) {
-        self.parent.zero_out_size_bytes();
-    }
-
-    pub fn zero_out_parent_messages_count(&self) {
-        self.parent.zero_out_messages_count();
-    }
-
-    pub fn zero_out_parent_segments_count(&self) {
-        self.parent.zero_out_segments_count();
-    }
-
-    pub fn zero_out_parent_all(&self) {
-        self.parent.zero_out_all();
-    }
-
     pub fn zero_out_size_bytes(&self) {
-        self.size_bytes.store(0, Ordering::Relaxed);
-        self.zero_out_parent_size_bytes();
+        let prev = self.size_bytes.swap(0, Ordering::AcqRel);
+        self.parent.decrement_size_bytes(prev);
     }
 
     pub fn zero_out_messages_count(&self) {
-        self.messages_count.store(0, Ordering::Relaxed);
-        self.zero_out_parent_messages_count();
+        let prev = self.messages_count.swap(0, Ordering::AcqRel);
+        self.parent.decrement_messages_count(prev);
     }
 
     pub fn zero_out_segments_count(&self) {
-        self.segments_count.store(0, Ordering::Relaxed);
-        self.zero_out_parent_segments_count();
+        let prev = self.segments_count.swap(0, Ordering::AcqRel);
+        self.parent.decrement_segments_count(prev);
     }
 
     pub fn zero_out_current_offset(&self) {
@@ -359,6 +327,5 @@ impl PartitionStats {
         self.zero_out_messages_count();
         self.zero_out_segments_count();
         self.zero_out_current_offset();
-        self.zero_out_parent_all();
     }
 }

Reply via email to