This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors_fix
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 02b40f13722a19daa45c936395a8c87c6fe10b77
Merge: d13f09333 4052619bf
Author: spetz <[email protected]>
AuthorDate: Thu Feb 5 18:19:48 2026 +0100

    Merge

 Cargo.lock                                         |   3 +-
 bdd/rust/Cargo.toml                                |   1 -
 bdd/rust/tests/helpers/cluster.rs                  |  19 +-
 bdd/rust/tests/steps/auth.rs                       |  26 +-
 bdd/rust/tests/steps/leader_redirection.rs         |   6 +-
 core/harness_derive/src/codegen.rs                 |   2 +-
 core/integration/Cargo.toml                        |   2 +-
 core/integration/src/bench_utils.rs                |  20 +-
 core/integration/src/file.rs                       |  32 -
 core/integration/src/harness/handle/server.rs      |  99 ++-
 core/integration/src/harness/helpers.rs            |   2 -
 .../src/harness/orchestrator/builder.rs            |  10 +-
 .../src/harness/orchestrator/harness.rs            | 199 +-----
 core/integration/src/http_client.rs                |  52 --
 core/integration/src/lib.rs                        |  17 -
 core/integration/src/quic_client.rs                |  55 --
 core/integration/src/tcp_client.rs                 |  82 ---
 core/integration/src/test_connectors_runtime.rs    | 266 -------
 core/integration/src/test_mcp_server.rs            | 274 --------
 core/integration/src/test_server.rs                | 769 ---------------------
 core/integration/src/test_tls_utils.rs             |  52 --
 core/integration/src/websocket_client.rs           |  76 --
 core/integration/tests/cli/common/mod.rs           |  72 +-
 .../tests/cli/context/test_context_applied.rs      |   8 +-
 .../tests/cli/system/test_me_command.rs            |  10 +-
 core/integration/tests/config_provider/mod.rs      |   7 +-
 .../connectors/fixtures/quickwit/container.rs      |  33 +
 .../tests/connectors/postgres/postgres_sink.rs     |   6 +-
 .../tests/connectors/postgres/postgres_source.rs   |  10 +-
 .../tests/connectors/quickwit/quickwit_sink.rs     |  36 +-
 .../tests/connectors/random/random_source.rs       |   2 +-
 .../data_integrity/verify_after_server_restart.rs  | 171 ++---
 core/integration/tests/mod.rs                      |  72 +-
 core/integration/tests/sdk/producer/background.rs  | 195 +-----
 core/integration/tests/server/cg.rs                |  77 ++-
 .../tests/server/concurrent_addition.rs            |  55 +-
 core/integration/tests/server/general.rs           | 157 ++++-
 core/integration/tests/server/message_retrieval.rs | 112 ++-
 core/integration/tests/server/mod.rs               | 140 ----
 .../server/scenarios/authentication_scenario.rs    |  10 +-
 .../tests/server/scenarios/bench_scenario.rs       |  25 +-
 .../tests/server/scenarios/concurrent_scenario.rs  | 101 +--
 ...umer_group_auto_commit_reconnection_scenario.rs |  14 +-
 .../scenarios/consumer_group_join_scenario.rs      |  19 +-
 ...er_group_new_messages_after_restart_scenario.rs |  54 +-
 .../consumer_group_offset_cleanup_scenario.rs      |  12 +-
 ...h_multiple_clients_polling_messages_scenario.rs |  18 +-
 ...with_single_client_polling_messages_scenario.rs |  12 +-
 .../consumer_timestamp_polling_scenario.rs         |  12 +-
 .../server/scenarios/create_message_payload.rs     |  12 +-
 .../scenarios/cross_protocol_pat_scenario.rs       | 133 +---
 .../tests/server/scenarios/encryption_scenario.rs  |  91 +--
 .../server/scenarios/log_rotation_scenario.rs      |  71 +-
 .../server/scenarios/message_headers_scenario.rs   |  12 +-
 .../server/scenarios/message_size_scenario.rs      |  12 +-
 core/integration/tests/server/scenarios/mod.rs     |  10 +-
 .../tests/server/scenarios/offset_scenario.rs      |  11 +-
 .../tests/server/scenarios/permissions_scenario.rs | 159 ++---
 .../scenarios/read_during_persistence_scenario.rs  |  65 +-
 .../scenarios/segment_rotation_race_scenario.rs    |  60 +-
 .../scenarios/single_message_per_batch_scenario.rs |  11 +-
 .../tests/server/scenarios/snapshot_scenario.rs    |  13 +-
 .../stale_client_consumer_group_scenario.rs        |  50 +-
 .../scenarios/stream_size_validation_scenario.rs   |  13 +-
 .../tests/server/scenarios/system_scenario.rs      |  16 +-
 .../tests/server/scenarios/tcp_tls_scenario.rs     |   4 +-
 .../tests/server/scenarios/timestamp_scenario.rs   |  11 +-
 .../tests/server/scenarios/user_scenario.rs        |  10 +-
 .../server/scenarios/websocket_tls_scenario.rs     |   4 +-
 core/integration/tests/server/specific.rs          | 297 ++------
 .../server/src/websocket/websocket_tls_listener.rs |   2 +-
 71 files changed, 1129 insertions(+), 3442 deletions(-)

diff --cc core/integration/tests/connectors/quickwit/quickwit_sink.rs
index 0f2aa64b5,a888f7bba..30f4efedd
--- a/core/integration/tests/connectors/quickwit/quickwit_sink.rs
+++ b/core/integration/tests/connectors/quickwit/quickwit_sink.rs
@@@ -148,14 -138,11 +138,14 @@@ async fn given_nonexistent_quickwit_ind
  }
  
  #[iggy_harness(
 -    server(connectors_runtime(config_path = 
"tests/connectors/quickwit/config.toml")),
 +    server(connectors_runtime(config_path = 
"tests/connectors/quickwit/sink.toml")),
      seed = seeds::connector_stream
  )]
 -async fn given_bulk_message_send_should_store(harness: &TestHarness, fixture: 
QuickwitFixture) {
 +async fn given_bulk_message_send_should_store(
 +    harness: &TestHarness,
 +    fixture: QuickwitPreCreatedFixture,
 +) {
-     let client = harness.client();
+     let client = harness.root_client().await.unwrap();
      let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
      let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
  
diff --cc core/integration/tests/server/cg.rs
index 0a2b343c0,dbb85f98c..8095e1260
--- a/core/integration/tests/server/cg.rs
+++ b/core/integration/tests/server/cg.rs
@@@ -15,38 -15,53 +15,61 @@@
  // specific language governing permissions and limitations
  // under the License.
  
- use crate::server::{
-     ScenarioFn, auto_commit_reconnection_scenario, join_scenario, 
multiple_clients_scenario,
-     new_messages_after_restart_scenario, offset_cleanup_scenario, 
run_scenario,
-     single_client_scenario,
+ use crate::server::scenarios::{
+     consumer_group_auto_commit_reconnection_scenario, 
consumer_group_join_scenario,
 -    consumer_group_offset_cleanup_scenario,
++    consumer_group_new_messages_after_restart_scenario, 
consumer_group_offset_cleanup_scenario,
+     consumer_group_with_multiple_clients_polling_messages_scenario,
+     consumer_group_with_single_client_polling_messages_scenario,
  };
- use iggy_common::TransportProtocol;
- use serial_test::parallel;
- use test_case::test_matrix;
+ use integration::iggy_harness;
  
- fn tcp() -> TransportProtocol {
-     TransportProtocol::Tcp
+ // Consumer group scenarios do not support HTTP (stateful operations).
+ // TODO: Add QUIC support.
+ 
+ #[iggy_harness(
+     test_client_transport = [Tcp, WebSocket],
+     server(tcp.socket.override_defaults = true, tcp.socket.nodelay = true)
+ )]
+ async fn join(harness: &TestHarness) {
+     consumer_group_join_scenario::run(harness).await;
+ }
+ 
+ #[iggy_harness(
+     test_client_transport = [Tcp, WebSocket],
+     server(tcp.socket.override_defaults = true, tcp.socket.nodelay = true)
+ )]
+ async fn single_client(harness: &TestHarness) {
+     
consumer_group_with_single_client_polling_messages_scenario::run(harness).await;
+ }
+ 
+ #[iggy_harness(
+     test_client_transport = [Tcp, WebSocket],
+     server(tcp.socket.override_defaults = true, tcp.socket.nodelay = true)
+ )]
+ async fn multiple_clients(harness: &TestHarness) {
+     
consumer_group_with_multiple_clients_polling_messages_scenario::run(harness).await;
+ }
+ 
+ #[iggy_harness(
+     test_client_transport = [Tcp, WebSocket],
+     server(tcp.socket.override_defaults = true, tcp.socket.nodelay = true)
+ )]
+ async fn auto_commit_reconnection(harness: &TestHarness) {
+     consumer_group_auto_commit_reconnection_scenario::run(harness).await;
  }
  
- fn websocket() -> TransportProtocol {
-     TransportProtocol::WebSocket
++#[iggy_harness(
++    test_client_transport = [Tcp, WebSocket],
++    server(tcp.socket.override_defaults = true, tcp.socket.nodelay = true)
++)]
++async fn new_messages_after_restart(harness: &TestHarness) {
++    consumer_group_new_messages_after_restart_scenario::run(harness).await;
 +}
 +
- // TODO: Add `QUIC`.
- // Consumer group scenarios do not support HTTP
- #[test_matrix(
-     [tcp(), websocket()],
-     [
-         join_scenario(),
-         single_client_scenario(),
-         multiple_clients_scenario(),
-         auto_commit_reconnection_scenario(),
-         new_messages_after_restart_scenario(),
-         offset_cleanup_scenario(),
-     ]
+ #[iggy_harness(
+     test_client_transport = [Tcp, WebSocket],
+     server(tcp.socket.override_defaults = true, tcp.socket.nodelay = true)
  )]
- #[tokio::test]
- #[parallel]
- async fn matrix(transport: TransportProtocol, scenario: ScenarioFn) {
-     run_scenario(transport, scenario).await;
+ async fn offset_cleanup(harness: &TestHarness) {
+     consumer_group_offset_cleanup_scenario::run(harness).await;
  }
diff --cc 
core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs
index 3b219a3bc,000000000..fcb3b6aaa
mode 100644,000000..100644
--- 
a/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs
+++ 
b/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs
@@@ -1,193 -1,0 +1,189 @@@
 +/* Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +use crate::server::scenarios::{
 +    CONSUMER_GROUP_NAME, PARTITION_ID, STREAM_NAME, TOPIC_NAME, create_client,
 +};
 +use futures::StreamExt;
 +use iggy::prelude::*;
- use integration::test_server::{ClientFactory, login_root};
++use integration::harness::TestHarness;
 +use std::str::FromStr;
 +use tokio::time::{Duration, sleep, timeout};
 +
 +const INITIAL_MESSAGES_COUNT: u32 = 10;
 +const NEW_MESSAGES_COUNT: u32 = 5;
 +
- pub async fn run(client_factory: &dyn ClientFactory) {
-     let client = create_client(client_factory).await;
-     login_root(&client).await;
++pub async fn run(harness: &TestHarness) {
++    let client = harness
++        .root_client()
++        .await
++        .expect("Failed to get root client");
 +    init_system(&client).await;
-     execute_scenario(client_factory, &client).await;
++    execute_scenario(harness, &client).await;
 +}
 +
 +async fn init_system(client: &IggyClient) {
 +    client.create_stream(STREAM_NAME).await.unwrap();
 +
 +    client
 +        .create_topic(
 +            &Identifier::named(STREAM_NAME).unwrap(),
 +            TOPIC_NAME,
 +            1,
 +            CompressionAlgorithm::default(),
 +            None,
 +            IggyExpiry::NeverExpire,
 +            MaxTopicSize::ServerDefault,
 +        )
 +        .await
 +        .unwrap();
 +
 +    client
 +        .create_consumer_group(
 +            &Identifier::named(STREAM_NAME).unwrap(),
 +            &Identifier::named(TOPIC_NAME).unwrap(),
 +            CONSUMER_GROUP_NAME,
 +        )
 +        .await
 +        .unwrap();
 +}
 +
- async fn execute_scenario(client_factory: &dyn ClientFactory, client: 
&IggyClient) {
++async fn execute_scenario(harness: &TestHarness, client: &IggyClient) {
 +    // 1. Produce initial messages
 +    produce_messages(client, 1, INITIAL_MESSAGES_COUNT).await;
 +
 +    // 2. Create a separate client to simulate the runtime
-     let runtime_client = create_client(client_factory).await;
-     login_root(&runtime_client).await;
++    let runtime_client = create_client(harness).await;
 +
 +    // 3. Create consumer and consume all initial messages
 +    let mut consumer = create_consumer(&runtime_client).await;
 +    let consumed_messages = consume_messages(&mut consumer, 
INITIAL_MESSAGES_COUNT).await;
 +    assert_eq!(
 +        consumed_messages.len(),
 +        INITIAL_MESSAGES_COUNT as usize,
 +        "Should consume all initial messages"
 +    );
 +
 +    for (index, message) in consumed_messages.iter().enumerate() {
 +        let expected_payload = format!("test_message_{}", index + 1);
 +        let actual_payload = String::from_utf8_lossy(&message.payload);
 +        assert_eq!(
 +            actual_payload, expected_payload,
 +            "Message content mismatch at index {index}"
 +        );
 +    }
 +
 +    // 4. Wait for auto-commit to process
 +    sleep(Duration::from_secs(2)).await;
 +
 +    // 5. Disconnect the consumer and client (simulating runtime restart)
 +    drop(consumer);
 +    runtime_client.disconnect().await.unwrap();
 +    drop(runtime_client);
 +    sleep(Duration::from_millis(500)).await;
 +
 +    // 6. Send new messages after consumer disconnected
 +    produce_messages(
 +        client,
 +        INITIAL_MESSAGES_COUNT + 1,
 +        INITIAL_MESSAGES_COUNT + NEW_MESSAGES_COUNT,
 +    )
 +    .await;
 +
 +    // 7. Create a new client (simulating runtime restart)
-     let new_runtime_client = create_client(client_factory).await;
-     login_root(&new_runtime_client).await;
++    let new_runtime_client = create_client(harness).await;
 +
 +    // 8. Reconnect consumer and consume new messages
 +    let mut consumer = create_consumer(&new_runtime_client).await;
 +    let new_messages = consume_messages(&mut consumer, 
NEW_MESSAGES_COUNT).await;
 +    assert_eq!(
 +        new_messages.len(),
 +        NEW_MESSAGES_COUNT as usize,
 +        "Should receive all new messages sent after restart"
 +    );
 +
 +    for (index, message) in new_messages.iter().enumerate() {
 +        let expected_payload =
 +            format!("test_message_{}", INITIAL_MESSAGES_COUNT + 1 + index as 
u32);
 +        let actual_payload = String::from_utf8_lossy(&message.payload);
 +        assert_eq!(
 +            actual_payload, expected_payload,
 +            "New message content mismatch at index {index}"
 +        );
 +    }
 +
 +    drop(consumer);
 +    drop(new_runtime_client);
 +}
 +
 +async fn produce_messages(client: &IggyClient, start_id: u32, end_id: u32) {
 +    let mut messages = Vec::new();
 +    for message_id in start_id..=end_id {
 +        let payload = format!("test_message_{message_id}");
 +        let message = IggyMessage::from_str(&payload).unwrap();
 +        messages.push(message);
 +    }
 +
 +    client
 +        .send_messages(
 +            &Identifier::named(STREAM_NAME).unwrap(),
 +            &Identifier::named(TOPIC_NAME).unwrap(),
 +            &Partitioning::partition_id(PARTITION_ID),
 +            &mut messages,
 +        )
 +        .await
 +        .unwrap();
 +}
 +
 +async fn create_consumer(client: &IggyClient) -> IggyConsumer {
 +    let mut consumer = client
 +        .consumer_group(CONSUMER_GROUP_NAME, STREAM_NAME, TOPIC_NAME)
 +        .unwrap()
 +        .batch_length(10)
 +        .poll_interval(IggyDuration::from_str("100ms").unwrap())
 +        .polling_strategy(PollingStrategy::next())
 +        .auto_join_consumer_group()
 +        .create_consumer_group_if_not_exists()
 +        .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
 +        .build();
 +
 +    consumer.init().await.unwrap();
 +    consumer
 +}
 +
 +async fn consume_messages(consumer: &mut IggyConsumer, expected_count: u32) 
-> Vec<IggyMessage> {
-     let mut messages = Vec::new();
-     let mut count = 0;
- 
-     let result = timeout(Duration::from_secs(30), async {
-         while count < expected_count {
-             if let Some(message_result) = consumer.next().await {
-                 match message_result {
-                     Ok(polled_message) => {
-                         messages.push(polled_message.message);
-                         count += 1;
-                     }
-                     Err(error) => panic!("Error while consuming messages: 
{error}"),
-                 }
-             }
++    let mut messages = Vec::with_capacity(expected_count as usize);
++
++    timeout(Duration::from_secs(30), async {
++        while messages.len() < expected_count as usize {
++            let Some(Ok(polled_message)) = consumer.next().await else {
++                continue;
++            };
++            messages.push(polled_message.message);
 +        }
 +    })
-     .await;
- 
-     if result.is_err() {
-         panic!("Timeout waiting for messages. Expected {expected_count}, 
received {count}");
-     }
++    .await
++    .unwrap_or_else(|_| {
++        panic!(
++            "Timeout waiting for messages. Expected {expected_count}, 
received {}",
++            messages.len()
++        )
++    });
 +
 +    messages
 +}

Reply via email to