This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 3d0ab78cdc58ef8880b691bab8ee86f1da95fe5f
Author: Andrea Tarocchi <andrea.taroc...@gmail.com>
AuthorDate: Tue Feb 6 09:57:58 2024 +0100

    Updated to kafka 3.5.1
---
 parent/pom.xml                                                     | 2 +-
 .../kafkaconnector/common/services/kafka/EmbeddedKafkaService.java | 3 ++-
 .../services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java  | 3 ++-
 .../common/services/kafkaconnect/KafkaConnectRunner.java           | 7 ++++---
 4 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/parent/pom.xml b/parent/pom.xml
index ebe783e404..4ed1d3cce4 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -27,7 +27,7 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 
-        <kafka.version>3.4.1</kafka.version>
+        <kafka.version>3.5.1</kafka.version>
         <camel.version>4.0.0</camel.version>
         <camel.kamelet.catalog.version>4.0.0</camel.kamelet.catalog.version>
         <apicurio.registry.version>1.3.2.Final</apicurio.registry.version>
diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
index e10b7be17e..63b6892fed 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
@@ -26,6 +26,7 @@ import 
org.apache.camel.kafkaconnector.common.PluginPathHelper;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public class EmbeddedKafkaService implements KafkaService {
 
         String address = "http://localhost:"; + NetworkUtils.getFreePort();
         LOG.info("Using the following address for  the listener configuration: 
{}", address);
-        workerProps.put(WorkerConfig.LISTENERS_CONFIG, address);
+        workerProps.put(RestServerConfig.LISTENERS_CONFIG, address);
 
         String pluginPaths = PluginPathHelper.getInstance().pluginPaths();
 
diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
index 98a69cdfa3..764b3ccfb7 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
@@ -21,6 +21,7 @@ import java.util.Properties;
 
 import org.apache.camel.kafkaconnector.common.PluginPathHelper;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +57,7 @@ class DefaultKafkaConnectPropertyFactory implements 
KafkaConnectPropertyFactory
 
         String address = NetworkUtils.getAddress("http");
         LOG.info("Using the following address for  the listener configuration: 
{}", address);
-        props.put(StandaloneConfig.LISTENERS_CONFIG, address);
+        props.put(RestServerConfig.LISTENERS_CONFIG, address);
 
         String pluginPaths = PluginPathHelper.getInstance().pluginPaths();
         props.put(StandaloneConfig.PLUGIN_PATH_CONFIG, pluginPaths);
diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
index ff1c267859..4e76fec8e4 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
@@ -37,13 +37,14 @@ import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
 import org.apache.kafka.connect.runtime.rest.RestClient;
-import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
+import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,14 +132,14 @@ class KafkaConnectRunner {
         AllConnectorClientConfigOverridePolicy 
allConnectorClientConfigOverridePolicy = new 
AllConnectorClientConfigOverridePolicy();
 
         RestClient restClient = new RestClient(config);
-        RestServer rest = new RestServer(config, restClient);
+        ConnectRestServer rest = new ConnectRestServer(10, restClient, 
standAloneProperties);
         rest.initializeServer();
 
         /*
          According to the Kafka source code "... Worker runs a (dynamic) set 
of tasks
          in a set of threads, doing the work of actually moving data to/from 
Kafka ..."
          */
-        Worker worker = new Worker(bootstrapServer, time, plugins, config, new 
FileOffsetBackingStore(), allConnectorClientConfigOverridePolicy);
+        Worker worker = new Worker(bootstrapServer, time, plugins, config, new 
FileOffsetBackingStore(new StringConverter()), 
allConnectorClientConfigOverridePolicy);
 
         /*
         From Kafka source code: " ... The herder interface tracks and manages 
workers

Reply via email to