This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch connectors_fix in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 02b40f13722a19daa45c936395a8c87c6fe10b77 Merge: d13f09333 4052619bf Author: spetz <[email protected]> AuthorDate: Thu Feb 5 18:19:48 2026 +0100 Merge Cargo.lock | 3 +- bdd/rust/Cargo.toml | 1 - bdd/rust/tests/helpers/cluster.rs | 19 +- bdd/rust/tests/steps/auth.rs | 26 +- bdd/rust/tests/steps/leader_redirection.rs | 6 +- core/harness_derive/src/codegen.rs | 2 +- core/integration/Cargo.toml | 2 +- core/integration/src/bench_utils.rs | 20 +- core/integration/src/file.rs | 32 - core/integration/src/harness/handle/server.rs | 99 ++- core/integration/src/harness/helpers.rs | 2 - .../src/harness/orchestrator/builder.rs | 10 +- .../src/harness/orchestrator/harness.rs | 199 +----- core/integration/src/http_client.rs | 52 -- core/integration/src/lib.rs | 17 - core/integration/src/quic_client.rs | 55 -- core/integration/src/tcp_client.rs | 82 --- core/integration/src/test_connectors_runtime.rs | 266 ------- core/integration/src/test_mcp_server.rs | 274 -------- core/integration/src/test_server.rs | 769 --------------------- core/integration/src/test_tls_utils.rs | 52 -- core/integration/src/websocket_client.rs | 76 -- core/integration/tests/cli/common/mod.rs | 72 +- .../tests/cli/context/test_context_applied.rs | 8 +- .../tests/cli/system/test_me_command.rs | 10 +- core/integration/tests/config_provider/mod.rs | 7 +- .../connectors/fixtures/quickwit/container.rs | 33 + .../tests/connectors/postgres/postgres_sink.rs | 6 +- .../tests/connectors/postgres/postgres_source.rs | 10 +- .../tests/connectors/quickwit/quickwit_sink.rs | 36 +- .../tests/connectors/random/random_source.rs | 2 +- .../data_integrity/verify_after_server_restart.rs | 171 ++--- core/integration/tests/mod.rs | 72 +- core/integration/tests/sdk/producer/background.rs | 195 +----- core/integration/tests/server/cg.rs | 77 ++- .../tests/server/concurrent_addition.rs | 55 +- core/integration/tests/server/general.rs | 157 ++++- core/integration/tests/server/message_retrieval.rs | 112 ++- core/integration/tests/server/mod.rs | 140 ---- .../server/scenarios/authentication_scenario.rs | 10 +- .../tests/server/scenarios/bench_scenario.rs | 25 +- .../tests/server/scenarios/concurrent_scenario.rs | 101 +-- ...umer_group_auto_commit_reconnection_scenario.rs | 14 +- .../scenarios/consumer_group_join_scenario.rs | 19 +- ...er_group_new_messages_after_restart_scenario.rs | 54 +- .../consumer_group_offset_cleanup_scenario.rs | 12 +- ...h_multiple_clients_polling_messages_scenario.rs | 18 +- ...with_single_client_polling_messages_scenario.rs | 12 +- .../consumer_timestamp_polling_scenario.rs | 12 +- .../server/scenarios/create_message_payload.rs | 12 +- .../scenarios/cross_protocol_pat_scenario.rs | 133 +--- .../tests/server/scenarios/encryption_scenario.rs | 91 +-- .../server/scenarios/log_rotation_scenario.rs | 71 +- .../server/scenarios/message_headers_scenario.rs | 12 +- .../server/scenarios/message_size_scenario.rs | 12 +- core/integration/tests/server/scenarios/mod.rs | 10 +- .../tests/server/scenarios/offset_scenario.rs | 11 +- .../tests/server/scenarios/permissions_scenario.rs | 159 ++--- .../scenarios/read_during_persistence_scenario.rs | 65 +- .../scenarios/segment_rotation_race_scenario.rs | 60 +- .../scenarios/single_message_per_batch_scenario.rs | 11 +- .../tests/server/scenarios/snapshot_scenario.rs | 13 +- .../stale_client_consumer_group_scenario.rs | 50 +- .../scenarios/stream_size_validation_scenario.rs | 13 +- .../tests/server/scenarios/system_scenario.rs | 16 +- .../tests/server/scenarios/tcp_tls_scenario.rs | 4 +- .../tests/server/scenarios/timestamp_scenario.rs | 11 +- .../tests/server/scenarios/user_scenario.rs | 10 +- .../server/scenarios/websocket_tls_scenario.rs | 4 +- core/integration/tests/server/specific.rs | 297 ++------ .../server/src/websocket/websocket_tls_listener.rs | 2 +- 71 files changed, 1129 insertions(+), 3442 deletions(-) diff --cc core/integration/tests/connectors/quickwit/quickwit_sink.rs index 0f2aa64b5,a888f7bba..30f4efedd --- a/core/integration/tests/connectors/quickwit/quickwit_sink.rs +++ b/core/integration/tests/connectors/quickwit/quickwit_sink.rs @@@ -148,14 -138,11 +138,14 @@@ async fn given_nonexistent_quickwit_ind } #[iggy_harness( - server(connectors_runtime(config_path = "tests/connectors/quickwit/config.toml")), + server(connectors_runtime(config_path = "tests/connectors/quickwit/sink.toml")), seed = seeds::connector_stream )] -async fn given_bulk_message_send_should_store(harness: &TestHarness, fixture: QuickwitFixture) { +async fn given_bulk_message_send_should_store( + harness: &TestHarness, + fixture: QuickwitPreCreatedFixture, +) { - let client = harness.client(); + let client = harness.root_client().await.unwrap(); let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); diff --cc core/integration/tests/server/cg.rs index 0a2b343c0,dbb85f98c..8095e1260 --- a/core/integration/tests/server/cg.rs +++ b/core/integration/tests/server/cg.rs @@@ -15,38 -15,53 +15,61 @@@ // specific language governing permissions and limitations // under the License. - use crate::server::{ - ScenarioFn, auto_commit_reconnection_scenario, join_scenario, multiple_clients_scenario, - new_messages_after_restart_scenario, offset_cleanup_scenario, run_scenario, - single_client_scenario, + use crate::server::scenarios::{ + consumer_group_auto_commit_reconnection_scenario, consumer_group_join_scenario, - consumer_group_offset_cleanup_scenario, ++ consumer_group_new_messages_after_restart_scenario, consumer_group_offset_cleanup_scenario, + consumer_group_with_multiple_clients_polling_messages_scenario, + consumer_group_with_single_client_polling_messages_scenario, }; - use iggy_common::TransportProtocol; - use serial_test::parallel; - use test_case::test_matrix; + use integration::iggy_harness; - fn tcp() -> TransportProtocol { - TransportProtocol::Tcp + // Consumer group scenarios do not support HTTP (stateful operations). + // TODO: Add QUIC support. + + #[iggy_harness( + test_client_transport = [Tcp, WebSocket], + server(tcp.socket.override_defaults = true, tcp.socket.nodelay = true) + )] + async fn join(harness: &TestHarness) { + consumer_group_join_scenario::run(harness).await; + } + + #[iggy_harness( + test_client_transport = [Tcp, WebSocket], + server(tcp.socket.override_defaults = true, tcp.socket.nodelay = true) + )] + async fn single_client(harness: &TestHarness) { + consumer_group_with_single_client_polling_messages_scenario::run(harness).await; + } + + #[iggy_harness( + test_client_transport = [Tcp, WebSocket], + server(tcp.socket.override_defaults = true, tcp.socket.nodelay = true) + )] + async fn multiple_clients(harness: &TestHarness) { + consumer_group_with_multiple_clients_polling_messages_scenario::run(harness).await; + } + + #[iggy_harness( + test_client_transport = [Tcp, WebSocket], + server(tcp.socket.override_defaults = true, tcp.socket.nodelay = true) + )] + async fn auto_commit_reconnection(harness: &TestHarness) { + consumer_group_auto_commit_reconnection_scenario::run(harness).await; } - fn websocket() -> TransportProtocol { - TransportProtocol::WebSocket ++#[iggy_harness( ++ test_client_transport = [Tcp, WebSocket], ++ server(tcp.socket.override_defaults = true, tcp.socket.nodelay = true) ++)] ++async fn new_messages_after_restart(harness: &TestHarness) { ++ consumer_group_new_messages_after_restart_scenario::run(harness).await; +} + - // TODO: Add `QUIC`. - // Consumer group scenarios do not support HTTP - #[test_matrix( - [tcp(), websocket()], - [ - join_scenario(), - single_client_scenario(), - multiple_clients_scenario(), - auto_commit_reconnection_scenario(), - new_messages_after_restart_scenario(), - offset_cleanup_scenario(), - ] + #[iggy_harness( + test_client_transport = [Tcp, WebSocket], + server(tcp.socket.override_defaults = true, tcp.socket.nodelay = true) )] - #[tokio::test] - #[parallel] - async fn matrix(transport: TransportProtocol, scenario: ScenarioFn) { - run_scenario(transport, scenario).await; + async fn offset_cleanup(harness: &TestHarness) { + consumer_group_offset_cleanup_scenario::run(harness).await; } diff --cc core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs index 3b219a3bc,000000000..fcb3b6aaa mode 100644,000000..100644 --- a/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs +++ b/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs @@@ -1,193 -1,0 +1,189 @@@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::server::scenarios::{ + CONSUMER_GROUP_NAME, PARTITION_ID, STREAM_NAME, TOPIC_NAME, create_client, +}; +use futures::StreamExt; +use iggy::prelude::*; - use integration::test_server::{ClientFactory, login_root}; ++use integration::harness::TestHarness; +use std::str::FromStr; +use tokio::time::{Duration, sleep, timeout}; + +const INITIAL_MESSAGES_COUNT: u32 = 10; +const NEW_MESSAGES_COUNT: u32 = 5; + - pub async fn run(client_factory: &dyn ClientFactory) { - let client = create_client(client_factory).await; - login_root(&client).await; ++pub async fn run(harness: &TestHarness) { ++ let client = harness ++ .root_client() ++ .await ++ .expect("Failed to get root client"); + init_system(&client).await; - execute_scenario(client_factory, &client).await; ++ execute_scenario(harness, &client).await; +} + +async fn init_system(client: &IggyClient) { + client.create_stream(STREAM_NAME).await.unwrap(); + + client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + + client + .create_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + CONSUMER_GROUP_NAME, + ) + .await + .unwrap(); +} + - async fn execute_scenario(client_factory: &dyn ClientFactory, client: &IggyClient) { ++async fn execute_scenario(harness: &TestHarness, client: &IggyClient) { + // 1. Produce initial messages + produce_messages(client, 1, INITIAL_MESSAGES_COUNT).await; + + // 2. Create a separate client to simulate the runtime - let runtime_client = create_client(client_factory).await; - login_root(&runtime_client).await; ++ let runtime_client = create_client(harness).await; + + // 3. Create consumer and consume all initial messages + let mut consumer = create_consumer(&runtime_client).await; + let consumed_messages = consume_messages(&mut consumer, INITIAL_MESSAGES_COUNT).await; + assert_eq!( + consumed_messages.len(), + INITIAL_MESSAGES_COUNT as usize, + "Should consume all initial messages" + ); + + for (index, message) in consumed_messages.iter().enumerate() { + let expected_payload = format!("test_message_{}", index + 1); + let actual_payload = String::from_utf8_lossy(&message.payload); + assert_eq!( + actual_payload, expected_payload, + "Message content mismatch at index {index}" + ); + } + + // 4. Wait for auto-commit to process + sleep(Duration::from_secs(2)).await; + + // 5. Disconnect the consumer and client (simulating runtime restart) + drop(consumer); + runtime_client.disconnect().await.unwrap(); + drop(runtime_client); + sleep(Duration::from_millis(500)).await; + + // 6. Send new messages after consumer disconnected + produce_messages( + client, + INITIAL_MESSAGES_COUNT + 1, + INITIAL_MESSAGES_COUNT + NEW_MESSAGES_COUNT, + ) + .await; + + // 7. Create a new client (simulating runtime restart) - let new_runtime_client = create_client(client_factory).await; - login_root(&new_runtime_client).await; ++ let new_runtime_client = create_client(harness).await; + + // 8. Reconnect consumer and consume new messages + let mut consumer = create_consumer(&new_runtime_client).await; + let new_messages = consume_messages(&mut consumer, NEW_MESSAGES_COUNT).await; + assert_eq!( + new_messages.len(), + NEW_MESSAGES_COUNT as usize, + "Should receive all new messages sent after restart" + ); + + for (index, message) in new_messages.iter().enumerate() { + let expected_payload = + format!("test_message_{}", INITIAL_MESSAGES_COUNT + 1 + index as u32); + let actual_payload = String::from_utf8_lossy(&message.payload); + assert_eq!( + actual_payload, expected_payload, + "New message content mismatch at index {index}" + ); + } + + drop(consumer); + drop(new_runtime_client); +} + +async fn produce_messages(client: &IggyClient, start_id: u32, end_id: u32) { + let mut messages = Vec::new(); + for message_id in start_id..=end_id { + let payload = format!("test_message_{message_id}"); + let message = IggyMessage::from_str(&payload).unwrap(); + messages.push(message); + } + + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut messages, + ) + .await + .unwrap(); +} + +async fn create_consumer(client: &IggyClient) -> IggyConsumer { + let mut consumer = client + .consumer_group(CONSUMER_GROUP_NAME, STREAM_NAME, TOPIC_NAME) + .unwrap() + .batch_length(10) + .poll_interval(IggyDuration::from_str("100ms").unwrap()) + .polling_strategy(PollingStrategy::next()) + .auto_join_consumer_group() + .create_consumer_group_if_not_exists() + .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) + .build(); + + consumer.init().await.unwrap(); + consumer +} + +async fn consume_messages(consumer: &mut IggyConsumer, expected_count: u32) -> Vec<IggyMessage> { - let mut messages = Vec::new(); - let mut count = 0; - - let result = timeout(Duration::from_secs(30), async { - while count < expected_count { - if let Some(message_result) = consumer.next().await { - match message_result { - Ok(polled_message) => { - messages.push(polled_message.message); - count += 1; - } - Err(error) => panic!("Error while consuming messages: {error}"), - } - } ++ let mut messages = Vec::with_capacity(expected_count as usize); ++ ++ timeout(Duration::from_secs(30), async { ++ while messages.len() < expected_count as usize { ++ let Some(Ok(polled_message)) = consumer.next().await else { ++ continue; ++ }; ++ messages.push(polled_message.message); + } + }) - .await; - - if result.is_err() { - panic!("Timeout waiting for messages. Expected {expected_count}, received {count}"); - } ++ .await ++ .unwrap_or_else(|_| { ++ panic!( ++ "Timeout waiting for messages. Expected {expected_count}, received {}", ++ messages.len() ++ ) ++ }); + + messages +}
