This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_final
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit fdf2ae8db5c40cd6e84d105b73f844b3ac42ab16
Author: numminex <[email protected]>
AuthorDate: Wed Nov 5 14:01:03 2025 +0100

    fix broken cg loader
---
 .../data_integrity/verify_after_server_restart.rs  | 62 +++++++++++++++++++++-
 core/integration/tests/state/mod.rs                |  9 ++++
 core/server/src/main.rs                            | 15 +++++-
 core/server/src/state/file.rs                      | 23 +++++---
 4 files changed, 99 insertions(+), 10 deletions(-)

diff --git 
a/core/integration/tests/data_integrity/verify_after_server_restart.rs 
b/core/integration/tests/data_integrity/verify_after_server_restart.rs
index 32e7c1c54..46b3a757b 100644
--- a/core/integration/tests/data_integrity/verify_after_server_restart.rs
+++ b/core/integration/tests/data_integrity/verify_after_server_restart.rs
@@ -17,7 +17,7 @@
  */
 
 use iggy::clients::client::IggyClient;
-use iggy::prelude::{Identifier, IggyByteSize, MessageClient, SystemClient};
+use iggy::prelude::{ConsumerGroupClient, Identifier, IggyByteSize, 
MessageClient, SystemClient};
 use iggy_common::TransportProtocol;
 use integration::bench_utils::run_bench_and_wait_for_finish;
 use integration::{
@@ -101,6 +101,16 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
             .unwrap();
     }
 
+    // 4b. Create consumer groups to test persistence
+    let consumer_group_names = vec!["test-cg-1", "test-cg-2", "test-cg-3"];
+    for (idx, cg_name) in consumer_group_names.iter().enumerate() {
+        let stream_id = Identifier::numeric(idx as u32).unwrap();
+        client
+            .create_consumer_group(&stream_id, &topic_id, cg_name)
+            .await
+            .unwrap();
+    }
+
     // 5. Save stats from the first server
     let stats = client.get_stats().await.unwrap();
     let expected_messages_size_bytes = stats.messages_size_bytes;
@@ -163,6 +173,33 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
         expected_segments_count, stats_after_restart.segments_count,
         "Segments count should be preserved after restart"
     );
+    assert_eq!(
+        expected_consumer_groups_count, 
stats_after_restart.consumer_groups_count,
+        "Consumer groups count should be preserved after restart"
+    );
+
+    // 8b. Verify consumer groups exist after restart
+    for (idx, cg_name) in consumer_group_names.iter().enumerate() {
+        let stream_id = Identifier::numeric(idx as u32).unwrap();
+        let consumer_group = client_after_restart
+            .get_consumer_group(
+                &stream_id,
+                &topic_id,
+                &Identifier::from_str(cg_name).unwrap(),
+            )
+            .await
+            .unwrap();
+        assert!(
+            consumer_group.is_some(),
+            "Consumer group {} should exist after restart",
+            cg_name
+        );
+        assert_eq!(
+            consumer_group.unwrap().name,
+            *cg_name,
+            "Consumer group name should match"
+        );
+    }
 
     // 9. Run send bench again to add more data
     run_bench_and_wait_for_finish(
@@ -248,6 +285,29 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
         "Consumer groups count"
     );
 
+    // 14b. Verify consumer groups still exist after second benchmark run
+    for (idx, cg_name) in consumer_group_names.iter().enumerate() {
+        let stream_id = Identifier::numeric(idx as u32).unwrap();
+        let consumer_group = client
+            .get_consumer_group(
+                &stream_id,
+                &topic_id,
+                &Identifier::from_str(cg_name).unwrap(),
+            )
+            .await
+            .unwrap();
+        assert!(
+            consumer_group.is_some(),
+            "Consumer group {} should still exist after second benchmark",
+            cg_name
+        );
+        assert_eq!(
+            consumer_group.unwrap().name,
+            *cg_name,
+            "Consumer group name should match"
+        );
+    }
+
     // 15. Run poll bench to check if all data (10MB total) is still there
     run_bench_and_wait_for_finish(
         &server_addr,
diff --git a/core/integration/tests/state/mod.rs 
b/core/integration/tests/state/mod.rs
index 40ed97818..2ff235c45 100644
--- a/core/integration/tests/state/mod.rs
+++ b/core/integration/tests/state/mod.rs
@@ -24,6 +24,7 @@ use server::streaming::utils::file::overwrite;
 use server::versioning::SemanticVersion;
 use std::str::FromStr;
 use std::sync::Arc;
+use std::sync::atomic::{AtomicU32, AtomicU64};
 use uuid::Uuid;
 
 mod file;
@@ -54,11 +55,19 @@ impl StateSetup {
         let persister = PersisterKind::FileWithSync(FileWithSyncPersister {});
         let encryptor = encryption_key
             .map(|key| 
EncryptorKind::Aes256Gcm(Aes256GcmEncryptor::new(key).unwrap()));
+        let state_current_index = Arc::new(AtomicU64::new(0));
+        let state_entries_count = Arc::new(AtomicU64::new(0));
+        let state_current_leader = Arc::new(AtomicU32::new(0));
+        let state_term = Arc::new(AtomicU64::new(0));
         let state = FileState::new(
             &messages_file_path,
             &version,
             Arc::new(persister),
             encryptor,
+            state_current_index,
+            state_entries_count,
+            state_current_leader,
+            state_term,
         );
 
         Self {
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index eff4ebb4d..1f326237c 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -59,7 +59,8 @@ use server::versioning::SemanticVersion;
 use std::collections::HashSet;
 use std::rc::Rc;
 use std::str::FromStr;
-use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
 use tracing::{error, info, instrument, warn};
 
 const COMPONENT: &str = "MAIN";
@@ -206,11 +207,19 @@ async fn main() -> Result<(), ServerError> {
 
     // TENTH DISCRETE LOADING STEP.
     let state_persister = resolve_persister(config.system.state.enforce_fsync);
+    let state_current_index = Arc::new(AtomicU64::new(0));
+    let state_entries_count = Arc::new(AtomicU64::new(0));
+    let state_current_leader = Arc::new(AtomicU32::new(0));
+    let state_term = Arc::new(AtomicU64::new(0));
     let state = FileState::new(
         &config.system.get_state_messages_file_path(),
         &current_version,
         state_persister,
         encryptor.clone(),
+        state_current_index.clone(),
+        state_entries_count.clone(),
+        state_current_leader.clone(),
+        state_term.clone(),
     );
     let state = SystemState::load(state).await?;
     let (streams_state, users_state) = state.decompose();
@@ -306,6 +315,10 @@ async fn main() -> Result<(), ServerError> {
             &current_version,
             state_persister,
             encryptor.clone(),
+            state_current_index.clone(),
+            state_entries_count.clone(),
+            state_current_leader.clone(),
+            state_term.clone(),
         );
         let client_manager = client_manager.clone();
 
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index 5ea72192d..4d978e00d 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -40,10 +40,10 @@ const FILE_STATE_PARSE_ERROR: &str = "STATE - failed to 
parse file state";
 
 #[derive(Debug)]
 pub struct FileState {
-    current_index: AtomicU64,
-    entries_count: AtomicU64,
-    current_leader: AtomicU32,
-    term: AtomicU64,
+    current_index: Arc<AtomicU64>,
+    entries_count: Arc<AtomicU64>,
+    current_leader: Arc<AtomicU32>,
+    term: Arc<AtomicU64>,
     version: u32,
     path: String,
     persister: Arc<PersisterKind>,
@@ -51,17 +51,22 @@ pub struct FileState {
 }
 
 impl FileState {
+    #[allow(clippy::too_many_arguments)]
     pub fn new(
         path: &str,
         version: &SemanticVersion,
         persister: Arc<PersisterKind>,
         encryptor: Option<EncryptorKind>,
+        current_index: Arc<AtomicU64>,
+        entries_count: Arc<AtomicU64>,
+        current_leader: Arc<AtomicU32>,
+        term: Arc<AtomicU64>,
     ) -> Self {
         Self {
-            current_index: AtomicU64::new(0),
-            entries_count: AtomicU64::new(0),
-            current_leader: AtomicU32::new(0),
-            term: AtomicU64::new(0),
+            current_index,
+            entries_count,
+            current_leader,
+            term,
             path: path.into(),
             persister,
             encryptor,
@@ -146,6 +151,7 @@ impl FileState {
                 .await
                 .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} index. 
{error}"))
                 .map_err(|_| IggyError::InvalidNumberEncoding)?;
+            tracing::warn!("index: {}", index);
             total_size += 8;
             // Greater than one, because one of the entries after a fresh 
reboot is the default root user.
             if entries_count > 1 && index != current_index + 1 {
@@ -258,6 +264,7 @@ impl FileState {
             EntryCommand::from_bytes(command.clone()).with_error(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to parse entry 
command from bytes")
             })?;
+            tracing::warn!("command: {:?}", command);
             let calculated_checksum = StateEntry::calculate_checksum(
                 index, term, leader_id, version, flags, timestamp, user_id, 
&context, &command,
             );

Reply via email to