This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 78d1b8fb52 DRILL-8534: Update Kafka to Version 3.9 (#3020)
78d1b8fb52 is described below
commit 78d1b8fb526212d107499386e568f82a2c13462e
Author: Charles S. Givre <[email protected]>
AuthorDate: Wed Sep 10 23:35:30 2025 -0400
DRILL-8534: Update Kafka to Version 3.9 (#3020)
---
contrib/storage-kafka/pom.xml | 5 ++-
.../drill/exec/store/kafka/TestKafkaSuite.java | 5 +--
.../store/kafka/cluster/EmbeddedKafkaCluster.java | 47 +++++++++++++---------
3 files changed, 34 insertions(+), 23 deletions(-)
diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml
index 28c6595ff7..e9c7dcb799 100644
--- a/contrib/storage-kafka/pom.xml
+++ b/contrib/storage-kafka/pom.xml
@@ -31,7 +31,8 @@
<name>Drill : Contrib : Storage : Kafka</name>
<properties>
- <kafka.version>2.8.2</kafka.version>
+ <kafka.version>3.9.0</kafka.version>
+ <kafka_scala.version>3.9.0</kafka_scala.version>
<kafka.TestSuite>**/TestKafkaSuite.class</kafka.TestSuite>
</properties>
@@ -80,7 +81,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
- <version>${kafka.version}</version>
+ <version>${kafka_scala.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuite.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuite.java
index a8a75f1bcb..ae86ed9df8 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuite.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuite.java
@@ -42,7 +42,6 @@ import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import java.util.Collections;
import java.util.HashMap;
@@ -83,8 +82,8 @@ public class TestKafkaSuite extends BaseTest {
embeddedKafkaCluster = new EmbeddedKafkaCluster();
zkClient =
KafkaZkClient.apply(embeddedKafkaCluster.getZkServer().getConnectionString(),
false, SESSION_TIMEOUT, CONN_TIMEOUT, 0, Time.SYSTEM,
- "kafka.server", "SessionExpireListener",
- Option.<String>empty(), Option.<ZKClientConfig>empty());
+ "kafka.server", new ZKClientConfig(),
+ "kafka.server", "SessionExpireListener", false, false);
createTopicHelper(TestQueryConstants.JSON_TOPIC, 1);
createTopicHelper(TestQueryConstants.AVRO_TOPIC, 1);
KafkaMessageGenerator generator = new
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
StringSerializer.class);
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
index 71a425d226..564cac47ee 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
@@ -70,7 +70,7 @@ public class EmbeddedKafkaCluster implements
TestQueryConstants {
}
this.props.put("metadata.broker.list", sb.toString());
- this.props.put(KafkaConfig.ZkConnectProp(),
this.zkHelper.getConnectionString());
+ this.props.put("zookeeper.connect", this.zkHelper.getConnectionString());
logger.info("Initialized Kafka Server");
this.closer = new KafkaAsyncCloser();
}
@@ -78,21 +78,20 @@ public class EmbeddedKafkaCluster implements
TestQueryConstants {
private void addBroker(Properties props, int brokerID, int
ephemeralBrokerPort) {
Properties properties = new Properties();
properties.putAll(props);
- properties.put(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(),
String.valueOf(1));
- properties.put(KafkaConfig.OffsetsTopicPartitionsProp(),
String.valueOf(1));
- properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(),
String.valueOf(1));
- properties.put(KafkaConfig.DefaultReplicationFactorProp(),
String.valueOf(1));
- properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp(),
String.valueOf(100));
- properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.FALSE);
- properties.put(KafkaConfig.ZkConnectProp(),
zkHelper.getConnectionString());
- properties.put(KafkaConfig.BrokerIdProp(), String.valueOf(brokerID + 1));
- properties.put(KafkaConfig.HostNameProp(), LOCAL_HOST);
- properties.put(KafkaConfig.AdvertisedHostNameProp(), LOCAL_HOST);
- properties.put(KafkaConfig.PortProp(),
String.valueOf(ephemeralBrokerPort));
- properties.put(KafkaConfig.AdvertisedPortProp(),
String.valueOf(ephemeralBrokerPort));
- properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.TRUE);
- properties.put(KafkaConfig.LogDirsProp(),
getTemporaryDir().getAbsolutePath());
- properties.put(KafkaConfig.LogFlushIntervalMessagesProp(),
String.valueOf(1));
+ properties.put("leader.imbalance.check.interval.seconds",
String.valueOf(1));
+ properties.put("offsets.topic.num.partitions", String.valueOf(1));
+ properties.put("offsets.topic.replication.factor", String.valueOf(1));
+ properties.put("default.replication.factor", String.valueOf(1));
+ properties.put("group.min.session.timeout.ms", String.valueOf(100));
+ properties.put("auto.create.topics.enable", Boolean.FALSE);
+ properties.put("zookeeper.connect", zkHelper.getConnectionString());
+ properties.put("broker.id", String.valueOf(brokerID + 1));
+ properties.put("listeners", "PLAINTEXT://" + LOCAL_HOST + ":" +
ephemeralBrokerPort);
+ properties.put("advertised.listeners", "PLAINTEXT://" + LOCAL_HOST + ":" +
ephemeralBrokerPort);
+ properties.put("port", String.valueOf(ephemeralBrokerPort));
+ properties.put("delete.topic.enable", Boolean.TRUE);
+ properties.put("log.dirs", getTemporaryDir().getAbsolutePath());
+ properties.put("log.flush.interval.messages", String.valueOf(1));
brokers.add(getBroker(properties));
}
@@ -119,7 +118,7 @@ public class EmbeddedKafkaCluster implements
TestQueryConstants {
public void shutDownBroker(int brokerId) {
brokers.stream()
- .filter(broker ->
Integer.parseInt(broker.config().getString(KafkaConfig.BrokerIdProp())) ==
brokerId)
+ .filter(broker ->
Integer.parseInt(broker.config().getString("broker.id")) == brokerId)
.findAny()
.ifPresent(KafkaServer::shutdown);
}
@@ -145,7 +144,19 @@ public class EmbeddedKafkaCluster implements
TestQueryConstants {
public String getKafkaBrokerList() {
return brokers.stream()
.map(KafkaServer::config)
- .map(serverConfig -> serverConfig.hostName() + ":" +
serverConfig.port())
+ .map(serverConfig -> {
+ // Try modern listeners first, fall back to legacy host.name/port
+ try {
+ String listeners = serverConfig.getString("listeners");
+ // Extract host:port from listeners (format: PLAINTEXT://host:port)
+ return listeners.replaceAll("^[A-Z]+://", "");
+ } catch (Exception e) {
+ // Fall back to legacy approach using advertised properties or
default host/port
+ String host = LOCAL_HOST;
+ int port = serverConfig.getInt("port");
+ return host + ":" + port;
+ }
+ })
.collect(Collectors.joining(","));
}