This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 78d1b8fb52 DRILL-8534: Update Kafka to Version 3.9  (#3020)
78d1b8fb52 is described below

commit 78d1b8fb526212d107499386e568f82a2c13462e
Author: Charles S. Givre <[email protected]>
AuthorDate: Wed Sep 10 23:35:30 2025 -0400

    DRILL-8534: Update Kafka to Version 3.9  (#3020)
---
 contrib/storage-kafka/pom.xml                      |  5 ++-
 .../drill/exec/store/kafka/TestKafkaSuite.java     |  5 +--
 .../store/kafka/cluster/EmbeddedKafkaCluster.java  | 47 +++++++++++++---------
 3 files changed, 34 insertions(+), 23 deletions(-)

diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml
index 28c6595ff7..e9c7dcb799 100644
--- a/contrib/storage-kafka/pom.xml
+++ b/contrib/storage-kafka/pom.xml
@@ -31,7 +31,8 @@
   <name>Drill : Contrib : Storage : Kafka</name>
 
   <properties>
-    <kafka.version>2.8.2</kafka.version>
+    <kafka.version>3.9.0</kafka.version>
+    <kafka_scala.version>3.9.0</kafka_scala.version>
     <kafka.TestSuite>**/TestKafkaSuite.class</kafka.TestSuite>
   </properties>
 
@@ -80,7 +81,7 @@
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.13</artifactId>
-      <version>${kafka.version}</version>
+      <version>${kafka_scala.version}</version>
       <scope>test</scope>
       <exclusions>
         <exclusion>
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuite.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuite.java
index a8a75f1bcb..ae86ed9df8 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuite.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuite.java
@@ -42,7 +42,6 @@ import org.junit.runners.Suite;
 import org.junit.runners.Suite.SuiteClasses;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -83,8 +82,8 @@ public class TestKafkaSuite extends BaseTest {
         embeddedKafkaCluster = new EmbeddedKafkaCluster();
         zkClient = 
KafkaZkClient.apply(embeddedKafkaCluster.getZkServer().getConnectionString(),
             false, SESSION_TIMEOUT, CONN_TIMEOUT, 0, Time.SYSTEM,
-            "kafka.server", "SessionExpireListener",
-            Option.<String>empty(), Option.<ZKClientConfig>empty());
+            "kafka.server", new ZKClientConfig(),
+            "kafka.server", "SessionExpireListener", false, false);
         createTopicHelper(TestQueryConstants.JSON_TOPIC, 1);
         createTopicHelper(TestQueryConstants.AVRO_TOPIC, 1);
         KafkaMessageGenerator generator = new 
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), 
StringSerializer.class);
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
index 71a425d226..564cac47ee 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
@@ -70,7 +70,7 @@ public class EmbeddedKafkaCluster implements 
TestQueryConstants {
     }
 
     this.props.put("metadata.broker.list", sb.toString());
-    this.props.put(KafkaConfig.ZkConnectProp(), 
this.zkHelper.getConnectionString());
+    this.props.put("zookeeper.connect", this.zkHelper.getConnectionString());
     logger.info("Initialized Kafka Server");
     this.closer = new KafkaAsyncCloser();
   }
@@ -78,21 +78,20 @@ public class EmbeddedKafkaCluster implements 
TestQueryConstants {
   private void addBroker(Properties props, int brokerID, int 
ephemeralBrokerPort) {
     Properties properties = new Properties();
     properties.putAll(props);
-    properties.put(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(), 
String.valueOf(1));
-    properties.put(KafkaConfig.OffsetsTopicPartitionsProp(), 
String.valueOf(1));
-    properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), 
String.valueOf(1));
-    properties.put(KafkaConfig.DefaultReplicationFactorProp(), 
String.valueOf(1));
-    properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp(), 
String.valueOf(100));
-    properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.FALSE);
-    properties.put(KafkaConfig.ZkConnectProp(), 
zkHelper.getConnectionString());
-    properties.put(KafkaConfig.BrokerIdProp(), String.valueOf(brokerID + 1));
-    properties.put(KafkaConfig.HostNameProp(), LOCAL_HOST);
-    properties.put(KafkaConfig.AdvertisedHostNameProp(), LOCAL_HOST);
-    properties.put(KafkaConfig.PortProp(), 
String.valueOf(ephemeralBrokerPort));
-    properties.put(KafkaConfig.AdvertisedPortProp(), 
String.valueOf(ephemeralBrokerPort));
-    properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.TRUE);
-    properties.put(KafkaConfig.LogDirsProp(), 
getTemporaryDir().getAbsolutePath());
-    properties.put(KafkaConfig.LogFlushIntervalMessagesProp(), 
String.valueOf(1));
+    properties.put("leader.imbalance.check.interval.seconds", 
String.valueOf(1));
+    properties.put("offsets.topic.num.partitions", String.valueOf(1));
+    properties.put("offsets.topic.replication.factor", String.valueOf(1));
+    properties.put("default.replication.factor", String.valueOf(1));
+    properties.put("group.min.session.timeout.ms", String.valueOf(100));
+    properties.put("auto.create.topics.enable", Boolean.FALSE);
+    properties.put("zookeeper.connect", zkHelper.getConnectionString());
+    properties.put("broker.id", String.valueOf(brokerID + 1));
+    properties.put("listeners", "PLAINTEXT://" + LOCAL_HOST + ":" + 
ephemeralBrokerPort);
+    properties.put("advertised.listeners", "PLAINTEXT://" + LOCAL_HOST + ":" + 
ephemeralBrokerPort);
+    properties.put("port", String.valueOf(ephemeralBrokerPort));
+    properties.put("delete.topic.enable", Boolean.TRUE);
+    properties.put("log.dirs", getTemporaryDir().getAbsolutePath());
+    properties.put("log.flush.interval.messages", String.valueOf(1));
     brokers.add(getBroker(properties));
   }
 
@@ -119,7 +118,7 @@ public class EmbeddedKafkaCluster implements 
TestQueryConstants {
 
   public void shutDownBroker(int brokerId) {
     brokers.stream()
-        .filter(broker -> 
Integer.parseInt(broker.config().getString(KafkaConfig.BrokerIdProp())) == 
brokerId)
+        .filter(broker -> 
Integer.parseInt(broker.config().getString("broker.id")) == brokerId)
         .findAny()
         .ifPresent(KafkaServer::shutdown);
   }
@@ -145,7 +144,19 @@ public class EmbeddedKafkaCluster implements 
TestQueryConstants {
   public String getKafkaBrokerList() {
     return brokers.stream()
         .map(KafkaServer::config)
-        .map(serverConfig -> serverConfig.hostName() + ":" + 
serverConfig.port())
+        .map(serverConfig -> {
+          // Try modern listeners first, fall back to legacy host.name/port
+          try {
+            String listeners = serverConfig.getString("listeners");
+            // Extract host:port from listeners (format: PLAINTEXT://host:port)
+            return listeners.replaceAll("^[A-Z]+://", "");
+          } catch (Exception e) {
+            // Fall back to legacy approach using advertised properties or 
default host/port
+            String host = LOCAL_HOST;
+            int port = serverConfig.getInt("port");
+            return host + ":" + port;
+          }
+        })
         .collect(Collectors.joining(","));
   }
 

Reply via email to