This is an automated email from the ASF dual-hosted git repository. fmariani pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit f99b6d9be41498783f666f8b889e8502852269e9 Author: Croway <[email protected]> AuthorDate: Tue Mar 31 14:12:48 2026 +0200 Fix KafkaConnectRunner for Kafka 3.9.1 API compatibility The embedded Kafka Connect runner was failing because its initialization did not match the Kafka 3.9.1 ConnectStandalone startup sequence: - FileOffsetBackingStore was created with StringConverter and never configured, causing NPE during worker.start() when the backing store tried to access its uninitialized file path - Missing plugins.compareAndSwapWithDelegatingLoader() call - Hardcoded REST server thread count instead of config.rebalanceTimeout() - Worker ID used bootstrap server instead of REST server advertised URL Align init() with Kafka's AbstractConnectCli.startConnect() and ConnectStandalone.createHerder() to properly initialize all components. --- .../services/kafkaconnect/KafkaConnectRunner.java | 32 ++++++++++------------ 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java index 4e76fec8e4..bb2d50dfe4 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java @@ -31,6 +31,8 @@ import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.runtime.Connect; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; @@ -44,7 +46,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; import org.apache.kafka.connect.storage.FileOffsetBackingStore; -import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.util.FutureCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,36 +118,32 @@ class KafkaConnectRunner { Time time = Time.SYSTEM; - // Initializes the system runtime information and logs some of the information WorkerInfo initInfo = new WorkerInfo(); initInfo.logAll(); Properties props = kafkaConnectPropertyFactory.getProperties(); - Map<String, String> standAloneProperties = Utils.propsToStringMap(props); - // Not needed, but we need this one to initialize the worker Plugins plugins = new Plugins(standAloneProperties); + plugins.compareAndSwapWithDelegatingLoader(); StandaloneConfig config = new StandaloneConfig(standAloneProperties); - String kafkaClusterId = config.kafkaClusterId(); AllConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy(); RestClient restClient = new RestClient(config); - ConnectRestServer rest = new ConnectRestServer(10, restClient, standAloneProperties); + ConnectRestServer rest = new ConnectRestServer(config.rebalanceTimeout(), restClient, config.originals()); rest.initializeServer(); - /* - According to the Kafka source code "... Worker runs a (dynamic) set of tasks - in a set of threads, doing the work of actually moving data to/from Kafka ..." - */ - Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore(new StringConverter()), allConnectorClientConfigOverridePolicy); - - /* - From Kafka source code: " ... The herder interface tracks and manages workers - and connectors ..." - */ - herder = new StandaloneHerder(worker, kafkaClusterId, allConnectorClientConfigOverridePolicy); + String workerId = rest.advertisedUrl().getHost() + ":" + rest.advertisedUrl().getPort(); + + OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore(plugins.newInternalConverter( + true, JsonConverter.class.getName(), + java.util.Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"))); + offsetBackingStore.configure(config); + + Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, allConnectorClientConfigOverridePolicy); + + herder = new StandaloneHerder(worker, config.kafkaClusterId(), allConnectorClientConfigOverridePolicy); connect = new Connect(herder, rest); LOG.info("Finished initializing the worker"); }
