This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch fix_segment_leak
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 2f67133a5143a4d3026f6516c3d030875cec2e33
Author: spetz <[email protected]>
AuthorDate: Thu Feb 5 14:32:21 2026 +0100

    fix(server): memory leak in segment rotation, clear indexes and close 
writers
---
 Cargo.lock                                         |  1 +
 core/integration/Cargo.toml                        |  1 +
 .../scenarios/segment_rotation_race_scenario.rs    | 25 ++++++++++++++++++++++
 core/server/src/shard/system/segments.rs           | 20 +++++++++++++++--
 4 files changed, 45 insertions(+), 2 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index a8a4e57bd..70ad39614 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5011,6 +5011,7 @@ dependencies = [
  "socket2 0.6.2",
  "sqlx",
  "strum",
+ "sysinfo 0.38.0",
  "tempfile",
  "test-case",
  "testcontainers-modules",
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index fa57fd1be..e3c05ba82 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -67,6 +67,7 @@ server = { workspace = true }
 socket2 = { workspace = true }
 sqlx = { workspace = true }
 strum = { workspace = true }
+sysinfo = { workspace = true }
 tempfile = { workspace = true }
 test-case = { workspace = true }
 testcontainers-modules = { workspace = true }
diff --git 
a/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs 
b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
index de6274402..2630262ee 100644
--- a/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
+++ b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
@@ -33,6 +33,7 @@ use integration::test_server::{ClientFactory, login_root};
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
 use std::time::Duration;
+use sysinfo::{Pid, ProcessesToUpdate, System};
 use tokio::task::JoinSet;
 
 const STREAM_NAME: &str = "race-test-stream";
@@ -41,6 +42,7 @@ const PRODUCERS_PER_PROTOCOL: usize = 2;
 const PARTITION_ID: u32 = 0;
 const TEST_DURATION_SECS: u64 = 10;
 const MESSAGES_PER_BATCH: usize = 5;
+const MAX_ALLOWED_MEMORY_BYTES: u64 = 200 * 1024 * 1024;
 
 /// Runs the segment rotation race condition test with multiple protocols.
 /// Each client factory represents a different protocol (TCP, HTTP, QUIC, 
WebSocket).
@@ -54,6 +56,9 @@ pub async fn run(client_factories: &[&dyn ClientFactory]) {
     let admin_client = create_client(client_factories[0]).await;
     login_root(&admin_client).await;
 
+    let stats = admin_client.get_stats().await.unwrap();
+    let server_pid = stats.process_id;
+
     let total_producers = client_factories.len() * PRODUCERS_PER_PROTOCOL;
     init_system(&admin_client, total_producers).await;
 
@@ -111,6 +116,18 @@ pub async fn run(client_factories: &[&dyn ClientFactory]) {
     let sent = total_messages.load(Ordering::SeqCst);
     println!("Test completed successfully. Total messages sent: {}", sent);
 
+    let final_memory = get_process_memory(server_pid);
+    println!(
+        "Final server memory: {:.2} MB",
+        final_memory as f64 / 1024.0 / 1024.0
+    );
+    assert!(
+        final_memory < MAX_ALLOWED_MEMORY_BYTES,
+        "Memory leak detected! Server using {:.2} MB, max allowed is {:.2} MB",
+        final_memory as f64 / 1024.0 / 1024.0,
+        MAX_ALLOWED_MEMORY_BYTES as f64 / 1024.0 / 1024.0
+    );
+
     cleanup(&admin_client).await;
 }
 
@@ -194,3 +211,11 @@ async fn cleanup(client: &IggyClient) {
         .await
         .unwrap();
 }
+
+fn get_process_memory(pid: u32) -> u64 {
+    let mut sys = System::new();
+    sys.refresh_processes(ProcessesToUpdate::Some(&[Pid::from_u32(pid)]), 
true);
+    sys.process(Pid::from_u32(pid))
+        .map(|p| p.memory())
+        .unwrap_or(0)
+}
diff --git a/core/server/src/shard/system/segments.rs 
b/core/server/src/shard/system/segments.rs
index 38e91fd2d..b805f3da1 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use crate::configs::cache_indexes::CacheIndexesConfig;
 use crate::shard::IggyShard;
 use crate::streaming::segments::Segment;
 use iggy_common::IggyError;
@@ -162,14 +163,15 @@ impl IggyShard {
     ) -> Result<(), IggyError> {
         use crate::streaming::segments::storage::create_segment_storage;
 
-        let start_offset = {
+        let (start_offset, old_segment_index) = {
             let mut partitions = self.local_partitions.borrow_mut();
             let partition = partitions
                 .get_mut(namespace)
                 .expect("rotate_segment: partition must exist");
+            let old_segment_index = partition.log.segments().len() - 1;
             let active_segment = partition.log.active_segment_mut();
             active_segment.sealed = true;
-            active_segment.end_offset + 1
+            (active_segment.end_offset + 1, old_segment_index)
         };
 
         let segment = Segment::new(start_offset, 
self.config.system.segment.size);
@@ -187,6 +189,20 @@ impl IggyShard {
 
         let mut partitions = self.local_partitions.borrow_mut();
         if let Some(partition) = partitions.get_mut(namespace) {
+            // Clear old segment's indexes if cache_indexes is not set to All.
+            // This prevents memory accumulation from keeping index buffers 
for sealed segments.
+            if !matches!(
+                self.config.system.segment.cache_indexes,
+                CacheIndexesConfig::All
+            ) {
+                partition.log.indexes_mut()[old_segment_index] = None;
+            }
+
+            // Close writers for the sealed segment - they're never needed 
after sealing.
+            // This releases file handles and associated kernel/io_uring 
resources.
+            let old_storage = &mut 
partition.log.storages_mut()[old_segment_index];
+            let _ = old_storage.shutdown();
+
             partition.log.add_persisted_segment(segment, storage);
             partition.stats.increment_segments_count(1);
             tracing::info!(

Reply via email to