This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 3d0ab78cdc58ef8880b691bab8ee86f1da95fe5f Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Tue Feb 6 09:57:58 2024 +0100 Updated to kafka 3.5.1 --- parent/pom.xml | 2 +- .../kafkaconnector/common/services/kafka/EmbeddedKafkaService.java | 3 ++- .../services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java | 3 ++- .../common/services/kafkaconnect/KafkaConnectRunner.java | 7 ++++--- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/parent/pom.xml b/parent/pom.xml index ebe783e404..4ed1d3cce4 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -27,7 +27,7 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <kafka.version>3.4.1</kafka.version> + <kafka.version>3.5.1</kafka.version> <camel.version>4.0.0</camel.version> <camel.kamelet.catalog.version>4.0.0</camel.kamelet.catalog.version> <apicurio.registry.version>1.3.2.Final</apicurio.registry.version> diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java index e10b7be17e..63b6892fed 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java @@ -26,6 +26,7 @@ import org.apache.camel.kafkaconnector.common.PluginPathHelper; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; import org.apache.camel.test.infra.kafka.services.KafkaService; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.rest.RestServerConfig; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.junit.jupiter.api.extension.ExtensionContext; import org.slf4j.Logger; @@ -54,7 +55,7 @@ public class EmbeddedKafkaService implements KafkaService { String address = "http://localhost:" + NetworkUtils.getFreePort(); LOG.info("Using the following address for the listener configuration: {}", address); - workerProps.put(WorkerConfig.LISTENERS_CONFIG, address); + workerProps.put(RestServerConfig.LISTENERS_CONFIG, address); String pluginPaths = PluginPathHelper.getInstance().pluginPaths(); diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java index 98a69cdfa3..764b3ccfb7 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java @@ -21,6 +21,7 @@ import java.util.Properties; import org.apache.camel.kafkaconnector.common.PluginPathHelper; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; +import org.apache.kafka.connect.runtime.rest.RestServerConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +57,7 @@ class DefaultKafkaConnectPropertyFactory implements KafkaConnectPropertyFactory String address = NetworkUtils.getAddress("http"); LOG.info("Using the following address for the listener configuration: {}", address); - props.put(StandaloneConfig.LISTENERS_CONFIG, address); + props.put(RestServerConfig.LISTENERS_CONFIG, address); String pluginPaths = PluginPathHelper.getInstance().pluginPaths(); props.put(StandaloneConfig.PLUGIN_PATH_CONFIG, pluginPaths); diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java index ff1c267859..4e76fec8e4 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java @@ -37,13 +37,14 @@ import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerInfo; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.ConnectRestServer; import org.apache.kafka.connect.runtime.rest.RestClient; -import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; import org.apache.kafka.connect.storage.FileOffsetBackingStore; +import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.FutureCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,14 +132,14 @@ class KafkaConnectRunner { AllConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy(); RestClient restClient = new RestClient(config); - RestServer rest = new RestServer(config, restClient); + ConnectRestServer rest = new ConnectRestServer(10, restClient, standAloneProperties); rest.initializeServer(); /* According to the Kafka source code "... Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving data to/from Kafka ..." */ - Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore(), allConnectorClientConfigOverridePolicy); + Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore(new StringConverter()), allConnectorClientConfigOverridePolicy); /* From Kafka source code: " ... The herder interface tracks and manages workers