This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors_fix
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 7a804061ec1c8624b5a4d1882dc60145deb84b51
Author: spetz <[email protected]>
AuthorDate: Thu Feb 5 18:24:57 2026 +0100

    Fix tests
---
 .../tests/connectors/elasticsearch/elasticsearch_sink.rs |  6 +++---
 .../connectors/elasticsearch/elasticsearch_source.rs     | 13 ++++++-------
 .../integration/tests/connectors/iceberg/iceberg_sink.rs |  4 ++--
 .../tests/connectors/postgres/postgres_source.rs         |  7 +++----
 .../integration/tests/connectors/random/random_source.rs |  7 +++----
 core/integration/tests/connectors/stdout/stdout_sink.rs  |  6 +++---
 ...consumer_group_new_messages_after_restart_scenario.rs | 16 +++++++++++-----
 7 files changed, 31 insertions(+), 28 deletions(-)

diff --git 
a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs 
b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
index 5e2113e76..7478a6e6b 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
@@ -35,7 +35,7 @@ async fn elasticsearch_sink_stores_json_messages(
     harness: &TestHarness,
     fixture: ElasticsearchSinkFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
 
     let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
     let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
@@ -93,7 +93,7 @@ async fn elasticsearch_sink_handles_bulk_messages(
     harness: &TestHarness,
     fixture: ElasticsearchSinkFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let bulk_count = 50;
 
     let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
@@ -147,7 +147,7 @@ async fn elasticsearch_sink_preserves_json_structure(
     harness: &TestHarness,
     fixture: ElasticsearchSinkFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
 
     let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
     let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
diff --git 
a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs 
b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
index f421eeb35..eb9e3fa1d 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
@@ -34,7 +34,7 @@ async fn elasticsearch_source_produces_messages_to_iggy(
     harness: &TestHarness,
     fixture: ElasticsearchSourcePreCreatedFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
 
     fixture
         .insert_documents(TEST_MESSAGE_COUNT)
@@ -111,7 +111,7 @@ async fn elasticsearch_source_handles_empty_index(
     harness: &TestHarness,
     fixture: ElasticsearchSourcePreCreatedFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
 
     let doc_count = fixture
         .get_document_count()
@@ -151,7 +151,7 @@ async fn elasticsearch_source_produces_bulk_messages(
     harness: &TestHarness,
     fixture: ElasticsearchSourcePreCreatedFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let bulk_count = 10;
 
     fixture
@@ -213,11 +213,11 @@ async fn state_persists_across_connector_restart(
     let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
     let consumer_id: Identifier = "state_test_consumer".try_into().unwrap();
 
+    let client = harness.root_client().await.unwrap();
     let received_before = {
         let mut received: Vec<serde_json::Value> = Vec::new();
         for _ in 0..POLL_ATTEMPTS {
-            if let Ok(polled) = harness
-                .client()
+            if let Ok(polled) = client
                 .poll_messages(
                     &stream_id,
                     &topic_id,
@@ -274,8 +274,7 @@ async fn state_persists_across_connector_restart(
 
     let mut received_after: Vec<serde_json::Value> = Vec::new();
     for _ in 0..POLL_ATTEMPTS {
-        if let Ok(polled) = harness
-            .client()
+        if let Ok(polled) = client
             .poll_messages(
                 &stream_id,
                 &topic_id,
diff --git a/core/integration/tests/connectors/iceberg/iceberg_sink.rs 
b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
index 9dfd7a1d1..f4f553bc7 100644
--- a/core/integration/tests/connectors/iceberg/iceberg_sink.rs
+++ b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
@@ -75,7 +75,7 @@ async fn iceberg_sink_consumes_json_messages(
     harness: &TestHarness,
     fixture: IcebergPreCreatedFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let api_address = harness
         .connectors_runtime()
         .expect("connector runtime should be available")
@@ -146,7 +146,7 @@ async fn iceberg_sink_handles_bulk_messages(
     harness: &TestHarness,
     fixture: IcebergPreCreatedFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let api_address = harness
         .connectors_runtime()
         .expect("connector runtime should be available")
diff --git a/core/integration/tests/connectors/postgres/postgres_source.rs 
b/core/integration/tests/connectors/postgres/postgres_source.rs
index 82cb578f9..ffde4d61e 100644
--- a/core/integration/tests/connectors/postgres/postgres_source.rs
+++ b/core/integration/tests/connectors/postgres/postgres_source.rs
@@ -426,11 +426,11 @@ async fn state_persists_across_connector_restart(
     let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
     let consumer_id: Identifier = "state_test_consumer".try_into().unwrap();
 
+    let client = harness.root_client().await.unwrap();
     let received_before = {
         let mut received: Vec<DatabaseRecord> = Vec::new();
         for _ in 0..POLL_ATTEMPTS {
-            if let Ok(polled) = harness
-                .client()
+            if let Ok(polled) = client
                 .poll_messages(
                     &stream_id,
                     &topic_id,
@@ -486,8 +486,7 @@ async fn state_persists_across_connector_restart(
 
     let mut received_after: Vec<DatabaseRecord> = Vec::new();
     for _ in 0..POLL_ATTEMPTS {
-        if let Ok(polled) = harness
-            .client()
+        if let Ok(polled) = client
             .poll_messages(
                 &stream_id,
                 &topic_id,
diff --git a/core/integration/tests/connectors/random/random_source.rs 
b/core/integration/tests/connectors/random/random_source.rs
index ae2d3f75d..772f4c9df 100644
--- a/core/integration/tests/connectors/random/random_source.rs
+++ b/core/integration/tests/connectors/random/random_source.rs
@@ -70,9 +70,9 @@ async fn state_persists_across_connector_restart(harness: 
&mut TestHarness) {
 
     sleep(Duration::from_secs(1)).await;
 
+    let client = harness.root_client().await.unwrap();
     let offset_before = {
-        let messages = harness
-            .client()
+        let messages = client
             .poll_messages(
                 &stream_id,
                 &topic_id,
@@ -102,8 +102,7 @@ async fn state_persists_across_connector_restart(harness: 
&mut TestHarness) {
         .expect("Failed to restart connectors");
     sleep(Duration::from_secs(1)).await;
 
-    let offset_after = harness
-        .client()
+    let offset_after = client
         .poll_messages(
             &stream_id,
             &topic_id,
diff --git a/core/integration/tests/connectors/stdout/stdout_sink.rs 
b/core/integration/tests/connectors/stdout/stdout_sink.rs
index 783c30de5..b5fa268da 100644
--- a/core/integration/tests/connectors/stdout/stdout_sink.rs
+++ b/core/integration/tests/connectors/stdout/stdout_sink.rs
@@ -36,7 +36,7 @@ const API_KEY: &str = "test-api-key";
     seed = seeds::connector_stream
 )]
 async fn stdout_sink_consumes_messages(harness: &TestHarness) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let api_address = harness
         .connectors_runtime()
         .expect("connector runtime should be available")
@@ -94,7 +94,7 @@ async fn stdout_sink_consumes_messages(harness: &TestHarness) 
{
     seed = seeds::connector_stream
 )]
 async fn stdout_sink_reports_metrics(harness: &TestHarness) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let api_address = harness
         .connectors_runtime()
         .expect("connector runtime should be available")
@@ -154,7 +154,7 @@ async fn stdout_sink_reports_metrics(harness: &TestHarness) 
{
     seed = seeds::connector_stream
 )]
 async fn stdout_sink_handles_bulk_messages(harness: &TestHarness) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let api_address = harness
         .connectors_runtime()
         .expect("connector runtime should be available")
diff --git 
a/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs
 
b/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs
index fcb3b6aaa..b45283ae5 100644
--- 
a/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs
+++ 
b/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs
@@ -16,9 +16,7 @@
  * under the License.
  */
 
-use crate::server::scenarios::{
-    CONSUMER_GROUP_NAME, PARTITION_ID, STREAM_NAME, TOPIC_NAME, create_client,
-};
+use crate::server::scenarios::{CONSUMER_GROUP_NAME, PARTITION_ID, STREAM_NAME, 
TOPIC_NAME};
 use futures::StreamExt;
 use iggy::prelude::*;
 use integration::harness::TestHarness;
@@ -68,7 +66,11 @@ async fn execute_scenario(harness: &TestHarness, client: 
&IggyClient) {
     produce_messages(client, 1, INITIAL_MESSAGES_COUNT).await;
 
     // 2. Create a separate client to simulate the runtime
-    let runtime_client = create_client(harness).await;
+    let runtime_client = harness.new_client().await.unwrap();
+    runtime_client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
 
     // 3. Create consumer and consume all initial messages
     let mut consumer = create_consumer(&runtime_client).await;
@@ -106,7 +108,11 @@ async fn execute_scenario(harness: &TestHarness, client: 
&IggyClient) {
     .await;
 
     // 7. Create a new client (simulating runtime restart)
-    let new_runtime_client = create_client(harness).await;
+    let new_runtime_client = harness.new_client().await.unwrap();
+    new_runtime_client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
 
     // 8. Reconnect consumer and consume new messages
     let mut consumer = create_consumer(&new_runtime_client).await;

Reply via email to