This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_final in repository https://gitbox.apache.org/repos/asf/iggy.git
commit fdf2ae8db5c40cd6e84d105b73f844b3ac42ab16 Author: numminex <[email protected]> AuthorDate: Wed Nov 5 14:01:03 2025 +0100 fix broken cg loader --- .../data_integrity/verify_after_server_restart.rs | 62 +++++++++++++++++++++- core/integration/tests/state/mod.rs | 9 ++++ core/server/src/main.rs | 15 +++++- core/server/src/state/file.rs | 23 +++++--- 4 files changed, 99 insertions(+), 10 deletions(-) diff --git a/core/integration/tests/data_integrity/verify_after_server_restart.rs b/core/integration/tests/data_integrity/verify_after_server_restart.rs index 32e7c1c54..46b3a757b 100644 --- a/core/integration/tests/data_integrity/verify_after_server_restart.rs +++ b/core/integration/tests/data_integrity/verify_after_server_restart.rs @@ -17,7 +17,7 @@ */ use iggy::clients::client::IggyClient; -use iggy::prelude::{Identifier, IggyByteSize, MessageClient, SystemClient}; +use iggy::prelude::{ConsumerGroupClient, Identifier, IggyByteSize, MessageClient, SystemClient}; use iggy_common::TransportProtocol; use integration::bench_utils::run_bench_and_wait_for_finish; use integration::{ @@ -101,6 +101,16 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) .unwrap(); } + // 4b. Create consumer groups to test persistence + let consumer_group_names = vec!["test-cg-1", "test-cg-2", "test-cg-3"]; + for (idx, cg_name) in consumer_group_names.iter().enumerate() { + let stream_id = Identifier::numeric(idx as u32).unwrap(); + client + .create_consumer_group(&stream_id, &topic_id, cg_name) + .await + .unwrap(); + } + // 5. Save stats from the first server let stats = client.get_stats().await.unwrap(); let expected_messages_size_bytes = stats.messages_size_bytes; @@ -163,6 +173,33 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) expected_segments_count, stats_after_restart.segments_count, "Segments count should be preserved after restart" ); + assert_eq!( + expected_consumer_groups_count, stats_after_restart.consumer_groups_count, + "Consumer groups count should be preserved after restart" + ); + + // 8b. Verify consumer groups exist after restart + for (idx, cg_name) in consumer_group_names.iter().enumerate() { + let stream_id = Identifier::numeric(idx as u32).unwrap(); + let consumer_group = client_after_restart + .get_consumer_group( + &stream_id, + &topic_id, + &Identifier::from_str(cg_name).unwrap(), + ) + .await + .unwrap(); + assert!( + consumer_group.is_some(), + "Consumer group {} should exist after restart", + cg_name + ); + assert_eq!( + consumer_group.unwrap().name, + *cg_name, + "Consumer group name should match" + ); + } // 9. Run send bench again to add more data run_bench_and_wait_for_finish( @@ -248,6 +285,29 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) "Consumer groups count" ); + // 14b. Verify consumer groups still exist after second benchmark run + for (idx, cg_name) in consumer_group_names.iter().enumerate() { + let stream_id = Identifier::numeric(idx as u32).unwrap(); + let consumer_group = client + .get_consumer_group( + &stream_id, + &topic_id, + &Identifier::from_str(cg_name).unwrap(), + ) + .await + .unwrap(); + assert!( + consumer_group.is_some(), + "Consumer group {} should still exist after second benchmark", + cg_name + ); + assert_eq!( + consumer_group.unwrap().name, + *cg_name, + "Consumer group name should match" + ); + } + // 15. Run poll bench to check if all data (10MB total) is still there run_bench_and_wait_for_finish( &server_addr, diff --git a/core/integration/tests/state/mod.rs b/core/integration/tests/state/mod.rs index 40ed97818..2ff235c45 100644 --- a/core/integration/tests/state/mod.rs +++ b/core/integration/tests/state/mod.rs @@ -24,6 +24,7 @@ use server::streaming::utils::file::overwrite; use server::versioning::SemanticVersion; use std::str::FromStr; use std::sync::Arc; +use std::sync::atomic::{AtomicU32, AtomicU64}; use uuid::Uuid; mod file; @@ -54,11 +55,19 @@ impl StateSetup { let persister = PersisterKind::FileWithSync(FileWithSyncPersister {}); let encryptor = encryption_key .map(|key| EncryptorKind::Aes256Gcm(Aes256GcmEncryptor::new(key).unwrap())); + let state_current_index = Arc::new(AtomicU64::new(0)); + let state_entries_count = Arc::new(AtomicU64::new(0)); + let state_current_leader = Arc::new(AtomicU32::new(0)); + let state_term = Arc::new(AtomicU64::new(0)); let state = FileState::new( &messages_file_path, &version, Arc::new(persister), encryptor, + state_current_index, + state_entries_count, + state_current_leader, + state_term, ); Self { diff --git a/core/server/src/main.rs b/core/server/src/main.rs index eff4ebb4d..1f326237c 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -59,7 +59,8 @@ use server::versioning::SemanticVersion; use std::collections::HashSet; use std::rc::Rc; use std::str::FromStr; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use tracing::{error, info, instrument, warn}; const COMPONENT: &str = "MAIN"; @@ -206,11 +207,19 @@ async fn main() -> Result<(), ServerError> { // TENTH DISCRETE LOADING STEP. let state_persister = resolve_persister(config.system.state.enforce_fsync); + let state_current_index = Arc::new(AtomicU64::new(0)); + let state_entries_count = Arc::new(AtomicU64::new(0)); + let state_current_leader = Arc::new(AtomicU32::new(0)); + let state_term = Arc::new(AtomicU64::new(0)); let state = FileState::new( &config.system.get_state_messages_file_path(), ¤t_version, state_persister, encryptor.clone(), + state_current_index.clone(), + state_entries_count.clone(), + state_current_leader.clone(), + state_term.clone(), ); let state = SystemState::load(state).await?; let (streams_state, users_state) = state.decompose(); @@ -306,6 +315,10 @@ async fn main() -> Result<(), ServerError> { ¤t_version, state_persister, encryptor.clone(), + state_current_index.clone(), + state_entries_count.clone(), + state_current_leader.clone(), + state_term.clone(), ); let client_manager = client_manager.clone(); diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs index 5ea72192d..4d978e00d 100644 --- a/core/server/src/state/file.rs +++ b/core/server/src/state/file.rs @@ -40,10 +40,10 @@ const FILE_STATE_PARSE_ERROR: &str = "STATE - failed to parse file state"; #[derive(Debug)] pub struct FileState { - current_index: AtomicU64, - entries_count: AtomicU64, - current_leader: AtomicU32, - term: AtomicU64, + current_index: Arc<AtomicU64>, + entries_count: Arc<AtomicU64>, + current_leader: Arc<AtomicU32>, + term: Arc<AtomicU64>, version: u32, path: String, persister: Arc<PersisterKind>, @@ -51,17 +51,22 @@ pub struct FileState { } impl FileState { + #[allow(clippy::too_many_arguments)] pub fn new( path: &str, version: &SemanticVersion, persister: Arc<PersisterKind>, encryptor: Option<EncryptorKind>, + current_index: Arc<AtomicU64>, + entries_count: Arc<AtomicU64>, + current_leader: Arc<AtomicU32>, + term: Arc<AtomicU64>, ) -> Self { Self { - current_index: AtomicU64::new(0), - entries_count: AtomicU64::new(0), - current_leader: AtomicU32::new(0), - term: AtomicU64::new(0), + current_index, + entries_count, + current_leader, + term, path: path.into(), persister, encryptor, @@ -146,6 +151,7 @@ impl FileState { .await .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} index. {error}")) .map_err(|_| IggyError::InvalidNumberEncoding)?; + tracing::warn!("index: {}", index); total_size += 8; // Greater than one, because one of the entries after a fresh reboot is the default root user. if entries_count > 1 && index != current_index + 1 { @@ -258,6 +264,7 @@ impl FileState { EntryCommand::from_bytes(command.clone()).with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to parse entry command from bytes") })?; + tracing::warn!("command: {:?}", command); let calculated_checksum = StateEntry::calculate_checksum( index, term, leader_id, version, flags, timestamp, user_id, &context, &command, );
