This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch connectors_fix in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 7a804061ec1c8624b5a4d1882dc60145deb84b51 Author: spetz <[email protected]> AuthorDate: Thu Feb 5 18:24:57 2026 +0100 Fix tests --- .../tests/connectors/elasticsearch/elasticsearch_sink.rs | 6 +++--- .../connectors/elasticsearch/elasticsearch_source.rs | 13 ++++++------- .../integration/tests/connectors/iceberg/iceberg_sink.rs | 4 ++-- .../tests/connectors/postgres/postgres_source.rs | 7 +++---- .../integration/tests/connectors/random/random_source.rs | 7 +++---- core/integration/tests/connectors/stdout/stdout_sink.rs | 6 +++--- ...consumer_group_new_messages_after_restart_scenario.rs | 16 +++++++++++----- 7 files changed, 31 insertions(+), 28 deletions(-) diff --git a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs index 5e2113e76..7478a6e6b 100644 --- a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs +++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs @@ -35,7 +35,7 @@ async fn elasticsearch_sink_stores_json_messages( harness: &TestHarness, fixture: ElasticsearchSinkFixture, ) { - let client = harness.client(); + let client = harness.root_client().await.unwrap(); let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); @@ -93,7 +93,7 @@ async fn elasticsearch_sink_handles_bulk_messages( harness: &TestHarness, fixture: ElasticsearchSinkFixture, ) { - let client = harness.client(); + let client = harness.root_client().await.unwrap(); let bulk_count = 50; let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); @@ -147,7 +147,7 @@ async fn elasticsearch_sink_preserves_json_structure( harness: &TestHarness, fixture: ElasticsearchSinkFixture, ) { - let client = harness.client(); + let client = harness.root_client().await.unwrap(); let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); diff --git a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs index f421eeb35..eb9e3fa1d 100644 --- a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs +++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs @@ -34,7 +34,7 @@ async fn elasticsearch_source_produces_messages_to_iggy( harness: &TestHarness, fixture: ElasticsearchSourcePreCreatedFixture, ) { - let client = harness.client(); + let client = harness.root_client().await.unwrap(); fixture .insert_documents(TEST_MESSAGE_COUNT) @@ -111,7 +111,7 @@ async fn elasticsearch_source_handles_empty_index( harness: &TestHarness, fixture: ElasticsearchSourcePreCreatedFixture, ) { - let client = harness.client(); + let client = harness.root_client().await.unwrap(); let doc_count = fixture .get_document_count() @@ -151,7 +151,7 @@ async fn elasticsearch_source_produces_bulk_messages( harness: &TestHarness, fixture: ElasticsearchSourcePreCreatedFixture, ) { - let client = harness.client(); + let client = harness.root_client().await.unwrap(); let bulk_count = 10; fixture @@ -213,11 +213,11 @@ async fn state_persists_across_connector_restart( let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); let consumer_id: Identifier = "state_test_consumer".try_into().unwrap(); + let client = harness.root_client().await.unwrap(); let received_before = { let mut received: Vec<serde_json::Value> = Vec::new(); for _ in 0..POLL_ATTEMPTS { - if let Ok(polled) = harness - .client() + if let Ok(polled) = client .poll_messages( &stream_id, &topic_id, @@ -274,8 +274,7 @@ async fn state_persists_across_connector_restart( let mut received_after: Vec<serde_json::Value> = Vec::new(); for _ in 0..POLL_ATTEMPTS { - if let Ok(polled) = harness - .client() + if let Ok(polled) = client .poll_messages( &stream_id, &topic_id, diff --git a/core/integration/tests/connectors/iceberg/iceberg_sink.rs b/core/integration/tests/connectors/iceberg/iceberg_sink.rs index 9dfd7a1d1..f4f553bc7 100644 --- a/core/integration/tests/connectors/iceberg/iceberg_sink.rs +++ b/core/integration/tests/connectors/iceberg/iceberg_sink.rs @@ -75,7 +75,7 @@ async fn iceberg_sink_consumes_json_messages( harness: &TestHarness, fixture: IcebergPreCreatedFixture, ) { - let client = harness.client(); + let client = harness.root_client().await.unwrap(); let api_address = harness .connectors_runtime() .expect("connector runtime should be available") @@ -146,7 +146,7 @@ async fn iceberg_sink_handles_bulk_messages( harness: &TestHarness, fixture: IcebergPreCreatedFixture, ) { - let client = harness.client(); + let client = harness.root_client().await.unwrap(); let api_address = harness .connectors_runtime() .expect("connector runtime should be available") diff --git a/core/integration/tests/connectors/postgres/postgres_source.rs b/core/integration/tests/connectors/postgres/postgres_source.rs index 82cb578f9..ffde4d61e 100644 --- a/core/integration/tests/connectors/postgres/postgres_source.rs +++ b/core/integration/tests/connectors/postgres/postgres_source.rs @@ -426,11 +426,11 @@ async fn state_persists_across_connector_restart( let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); let consumer_id: Identifier = "state_test_consumer".try_into().unwrap(); + let client = harness.root_client().await.unwrap(); let received_before = { let mut received: Vec<DatabaseRecord> = Vec::new(); for _ in 0..POLL_ATTEMPTS { - if let Ok(polled) = harness - .client() + if let Ok(polled) = client .poll_messages( &stream_id, &topic_id, @@ -486,8 +486,7 @@ async fn state_persists_across_connector_restart( let mut received_after: Vec<DatabaseRecord> = Vec::new(); for _ in 0..POLL_ATTEMPTS { - if let Ok(polled) = harness - .client() + if let Ok(polled) = client .poll_messages( &stream_id, &topic_id, diff --git a/core/integration/tests/connectors/random/random_source.rs b/core/integration/tests/connectors/random/random_source.rs index ae2d3f75d..772f4c9df 100644 --- a/core/integration/tests/connectors/random/random_source.rs +++ b/core/integration/tests/connectors/random/random_source.rs @@ -70,9 +70,9 @@ async fn state_persists_across_connector_restart(harness: &mut TestHarness) { sleep(Duration::from_secs(1)).await; + let client = harness.root_client().await.unwrap(); let offset_before = { - let messages = harness - .client() + let messages = client .poll_messages( &stream_id, &topic_id, @@ -102,8 +102,7 @@ async fn state_persists_across_connector_restart(harness: &mut TestHarness) { .expect("Failed to restart connectors"); sleep(Duration::from_secs(1)).await; - let offset_after = harness - .client() + let offset_after = client .poll_messages( &stream_id, &topic_id, diff --git a/core/integration/tests/connectors/stdout/stdout_sink.rs b/core/integration/tests/connectors/stdout/stdout_sink.rs index 783c30de5..b5fa268da 100644 --- a/core/integration/tests/connectors/stdout/stdout_sink.rs +++ b/core/integration/tests/connectors/stdout/stdout_sink.rs @@ -36,7 +36,7 @@ const API_KEY: &str = "test-api-key"; seed = seeds::connector_stream )] async fn stdout_sink_consumes_messages(harness: &TestHarness) { - let client = harness.client(); + let client = harness.root_client().await.unwrap(); let api_address = harness .connectors_runtime() .expect("connector runtime should be available") @@ -94,7 +94,7 @@ async fn stdout_sink_consumes_messages(harness: &TestHarness) { seed = seeds::connector_stream )] async fn stdout_sink_reports_metrics(harness: &TestHarness) { - let client = harness.client(); + let client = harness.root_client().await.unwrap(); let api_address = harness .connectors_runtime() .expect("connector runtime should be available") @@ -154,7 +154,7 @@ async fn stdout_sink_reports_metrics(harness: &TestHarness) { seed = seeds::connector_stream )] async fn stdout_sink_handles_bulk_messages(harness: &TestHarness) { - let client = harness.client(); + let client = harness.root_client().await.unwrap(); let api_address = harness .connectors_runtime() .expect("connector runtime should be available") diff --git a/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs b/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs index fcb3b6aaa..b45283ae5 100644 --- a/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs +++ b/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs @@ -16,9 +16,7 @@ * under the License. */ -use crate::server::scenarios::{ - CONSUMER_GROUP_NAME, PARTITION_ID, STREAM_NAME, TOPIC_NAME, create_client, -}; +use crate::server::scenarios::{CONSUMER_GROUP_NAME, PARTITION_ID, STREAM_NAME, TOPIC_NAME}; use futures::StreamExt; use iggy::prelude::*; use integration::harness::TestHarness; @@ -68,7 +66,11 @@ async fn execute_scenario(harness: &TestHarness, client: &IggyClient) { produce_messages(client, 1, INITIAL_MESSAGES_COUNT).await; // 2. Create a separate client to simulate the runtime - let runtime_client = create_client(harness).await; + let runtime_client = harness.new_client().await.unwrap(); + runtime_client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); // 3. Create consumer and consume all initial messages let mut consumer = create_consumer(&runtime_client).await; @@ -106,7 +108,11 @@ async fn execute_scenario(harness: &TestHarness, client: &IggyClient) { .await; // 7. Create a new client (simulating runtime restart) - let new_runtime_client = create_client(harness).await; + let new_runtime_client = harness.new_client().await.unwrap(); + new_runtime_client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); // 8. Reconnect consumer and consume new messages let mut consumer = create_consumer(&new_runtime_client).await;
