This is an automated email from the ASF dual-hosted git repository.

fjtiradosarti pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git


The following commit(s) were added to refs/heads/main by this push:
     new 6eaaac3dd5 kie-issues-1987: IndependentJobsIT fails randomly on 
timeouts (#3963)
6eaaac3dd5 is described below

commit 6eaaac3dd57f39e7aabe93e7dec6695ee8210e3a
Author: Walter Medvedeo <[email protected]>
AuthorDate: Fri Jun 20 10:26:35 2025 +0200

    kie-issues-1987: IndependentJobsIT fails randomly on timeouts (#3963)
    
    - Add the ability of passing configuration properties to the 
KafkaTypedTestClient
---
 .../kie/kogito/test/quarkus/kafka/KafkaTestClient.java |  7 +++++++
 .../test/quarkus/kafka/KafkaTypedTestClient.java       | 18 ++++++++++++++++--
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git 
a/quarkus/test/src/main/java/org/kie/kogito/test/quarkus/kafka/KafkaTestClient.java
 
b/quarkus/test/src/main/java/org/kie/kogito/test/quarkus/kafka/KafkaTestClient.java
index b517ed88e6..04a2c789ef 100644
--- 
a/quarkus/test/src/main/java/org/kie/kogito/test/quarkus/kafka/KafkaTestClient.java
+++ 
b/quarkus/test/src/main/java/org/kie/kogito/test/quarkus/kafka/KafkaTestClient.java
@@ -18,6 +18,8 @@
  */
 package org.kie.kogito.test.quarkus.kafka;
 
+import java.util.Properties;
+
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
@@ -25,4 +27,9 @@ public class KafkaTestClient extends 
KafkaTypedTestClient<String, StringSerializ
     public KafkaTestClient(String hosts) {
         super(hosts, StringSerializer.class, StringDeserializer.class);
     }
+
+    public KafkaTestClient(String hosts, Properties additionalConfig) {
+        super(hosts, additionalConfig, StringSerializer.class, 
StringDeserializer.class);
+    }
+
 }
diff --git 
a/quarkus/test/src/main/java/org/kie/kogito/test/quarkus/kafka/KafkaTypedTestClient.java
 
b/quarkus/test/src/main/java/org/kie/kogito/test/quarkus/kafka/KafkaTypedTestClient.java
index 26353ab795..b2e8fae786 100644
--- 
a/quarkus/test/src/main/java/org/kie/kogito/test/quarkus/kafka/KafkaTypedTestClient.java
+++ 
b/quarkus/test/src/main/java/org/kie/kogito/test/quarkus/kafka/KafkaTypedTestClient.java
@@ -60,6 +60,7 @@ public class KafkaTypedTestClient<T, S extends Serializer<T>, 
D extends Deserial
     private Class<D> deserializer;
 
     private String hosts;
+    private Properties additionalConfig;
 
     public KafkaTypedTestClient(String hosts, Class<S> serializer, Class<D> 
deserializer) {
         this.hosts = hosts;
@@ -67,7 +68,12 @@ public class KafkaTypedTestClient<T, S extends 
Serializer<T>, D extends Deserial
         this.deserializer = deserializer;
     }
 
-    private KafkaConsumer<String, T> createDefaultConsumer(String hosts) {
+    public KafkaTypedTestClient(String hosts, Properties additionalConfig, 
Class<S> serializer, Class<D> deserializer) {
+        this(hosts, serializer, deserializer);
+        this.additionalConfig = additionalConfig;
+    }
+
+    private Properties createDefaultConfig(String hosts) {
         Properties consumerConfig = new Properties();
         consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
@@ -75,7 +81,15 @@ public class KafkaTypedTestClient<T, S extends 
Serializer<T>, D extends Deserial
         consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserializer.getName());
         consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, 
KafkaTypedTestClient.class.getName() + "Consumer");
-        return new KafkaConsumer<>(consumerConfig);
+        return consumerConfig;
+    }
+
+    private KafkaConsumer<String, T> createDefaultConsumer(String hosts) {
+        Properties resultConfig = createDefaultConfig(hosts);
+        if (additionalConfig != null) {
+            resultConfig.putAll(additionalConfig);
+        }
+        return new KafkaConsumer<>(resultConfig);
     }
 
     private KafkaProducer<String, T> createDefaultProducer(String hosts) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to