This is an automated email from the ASF dual-hosted git repository.

fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit f99b6d9be41498783f666f8b889e8502852269e9
Author: Croway <[email protected]>
AuthorDate: Tue Mar 31 14:12:48 2026 +0200

    Fix KafkaConnectRunner for Kafka 3.9.1 API compatibility
    
    The embedded Kafka Connect runner was failing because its initialization
    did not match the Kafka 3.9.1 ConnectStandalone startup sequence:
    
    - FileOffsetBackingStore was created with StringConverter and never
      configured, causing NPE during worker.start() when the backing store
      tried to access its uninitialized file path
    - Missing plugins.compareAndSwapWithDelegatingLoader() call
    - Hardcoded REST server thread count instead of config.rebalanceTimeout()
    - Worker ID used bootstrap server instead of REST server advertised URL
    
    Align init() with Kafka's AbstractConnectCli.startConnect() and
    ConnectStandalone.createHerder() to properly initialize all components.
---
 .../services/kafkaconnect/KafkaConnectRunner.java  | 32 ++++++++++------------
 1 file changed, 15 insertions(+), 17 deletions(-)

diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
index 4e76fec8e4..bb2d50dfe4 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
@@ -31,6 +31,8 @@ import 
org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
@@ -44,7 +46,7 @@ import 
org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
-import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,36 +118,32 @@ class KafkaConnectRunner {
 
         Time time = Time.SYSTEM;
 
-        // Initializes the system runtime information and logs some of the 
information
         WorkerInfo initInfo = new WorkerInfo();
         initInfo.logAll();
 
         Properties props = kafkaConnectPropertyFactory.getProperties();
-
         Map<String, String> standAloneProperties = 
Utils.propsToStringMap(props);
 
-        // Not needed, but we need this one to initialize the worker
         Plugins plugins = new Plugins(standAloneProperties);
+        plugins.compareAndSwapWithDelegatingLoader();
 
         StandaloneConfig config = new StandaloneConfig(standAloneProperties);
-        String kafkaClusterId = config.kafkaClusterId();
         AllConnectorClientConfigOverridePolicy 
allConnectorClientConfigOverridePolicy = new 
AllConnectorClientConfigOverridePolicy();
 
         RestClient restClient = new RestClient(config);
-        ConnectRestServer rest = new ConnectRestServer(10, restClient, 
standAloneProperties);
+        ConnectRestServer rest = new 
ConnectRestServer(config.rebalanceTimeout(), restClient, config.originals());
         rest.initializeServer();
 
-        /*
-         According to the Kafka source code "... Worker runs a (dynamic) set 
of tasks
-         in a set of threads, doing the work of actually moving data to/from 
Kafka ..."
-         */
-        Worker worker = new Worker(bootstrapServer, time, plugins, config, new 
FileOffsetBackingStore(new StringConverter()), 
allConnectorClientConfigOverridePolicy);
-
-        /*
-        From Kafka source code: " ... The herder interface tracks and manages 
workers
-        and connectors ..."
-         */
-        herder = new StandaloneHerder(worker, kafkaClusterId, 
allConnectorClientConfigOverridePolicy);
+        String workerId = rest.advertisedUrl().getHost() + ":" + 
rest.advertisedUrl().getPort();
+
+        OffsetBackingStore offsetBackingStore = new 
FileOffsetBackingStore(plugins.newInternalConverter(
+                true, JsonConverter.class.getName(),
+                
java.util.Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, 
"false")));
+        offsetBackingStore.configure(config);
+
+        Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, allConnectorClientConfigOverridePolicy);
+
+        herder = new StandaloneHerder(worker, config.kafkaClusterId(), 
allConnectorClientConfigOverridePolicy);
         connect = new Connect(herder, rest);
         LOG.info("Finished initializing the worker");
     }

Reply via email to