This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch STORM-4004---Upgrade-Kafka-Clients-to-3.6.0
in repository https://gitbox.apache.org/repos/asf/storm.git

commit ed5cd8bf75efd3a197999084514bb561d1d59fa1
Author: Richard Zowalla <richard.zowa...@hs-heilbronn.de>
AuthorDate: Wed Nov 29 13:00:07 2023 +0100

    STORM-4004 - Upgrade Kafka Clients to 3.6.0
---
 LICENSE-binary                                          |  5 +++--
 external/storm-kafka-client/pom.xml                     |  4 ++--
 .../org/apache/storm/kafka/spout/KafkaSpoutConfig.java  | 17 +++++++++++++++++
 .../src/test/java/org/apache/storm/kafka/KafkaUnit.java |  5 ++---
 .../java/org/apache/storm/kafka/bolt/KafkaBoltTest.java | 13 ++++++++++---
 .../storm/kafka/spout/KafkaSpoutNullTupleTest.java      |  1 +
 pom.xml                                                 |  2 +-
 7 files changed, 36 insertions(+), 11 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index a6c05f4c8..6861db1f9 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -738,7 +738,7 @@ The license texts of these dependencies can be found in the 
licenses directory.
         * Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.14 - 
http://hc.apache.org/httpcomponents-client-ga)
         * Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.16 - 
http://hc.apache.org/httpcomponents-core-ga)
         * Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/)
-        * Apache Kafka (org.apache.kafka:kafka-clients:0.11.0.3 - 
http://kafka.apache.org)
+        * Apache Kafka (org.apache.kafka:kafka-clients:3.6.0 - 
https://kafka.apache.org)
         * Apache Log4j 1.x Compatibility API 
(org.apache.logging.log4j:log4j-1.2-api:2.21.1 - 
https://logging.apache.org/log4j/2.x/log4j-1.2-api/)
         * Apache Log4j API (org.apache.logging.log4j:log4j-api:2.21.1 - 
https://logging.apache.org/log4j/2.x/log4j-api/)
         * Apache Log4j Core (org.apache.logging.log4j:log4j-core:2.21.1 - 
https://logging.apache.org/log4j/2.x/log4j-core/)
@@ -886,7 +886,7 @@ The license texts of these dependencies can be found in the 
licenses directory.
         * Kerby PKIX Project (org.apache.kerby:kerby-pkix:1.0.1 - 
http://directory.apache.org/kerby/kerby-pkix)
         * Kerby Util (org.apache.kerby:kerby-util:1.0.1 - 
http://directory.apache.org/kerby/kerby-common/kerby-util)
         * Kerby XDR Project (org.apache.kerby:kerby-xdr:1.0.1 - 
http://directory.apache.org/kerby/kerby-common/kerby-xdr)
-        * LZ4 and xxHash (net.jpountz.lz4:lz4:1.3.0 - 
https://github.com/jpountz/lz4-java)
+        * LZ4 and xxHash (org.lz4:lz4-java:1.8.0 - 
https://github.com/lz4/lz4-java)
         * Maven Artifact (org.apache.maven:maven-artifact:3.6.0 - 
https://maven.apache.org/ref/3.6.0/maven-artifact/)
         * Maven Artifact Resolver API 
(org.apache.maven.resolver:maven-resolver-api:1.3.3 - 
https://maven.apache.org/resolver/maven-resolver-api/)
         * Maven Artifact Resolver Connector Basic 
(org.apache.maven.resolver:maven-resolver-connector-basic:1.3.3 - 
https://maven.apache.org/resolver/maven-resolver-connector-basic/)
@@ -1030,6 +1030,7 @@ The license texts of these dependencies can be found in 
the licenses directory.
     BSD 2-Clause license
 
         * dnsjava (dnsjava:dnsjava:2.1.7 - http://www.dnsjava.org)
+        * zstd-jni (com.github.luben:zstd-jni:1.5.5-1 - 
https://github.com/luben/zstd-jni)
 
     BSD-3-Clause
 
diff --git a/external/storm-kafka-client/pom.xml 
b/external/storm-kafka-client/pom.xml
index 36a4055d7..2b70874ca 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -110,7 +110,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
+            <artifactId>kafka_2.13</artifactId>
             <version>${storm.kafka.client.version}</version>
             <classifier>test</classifier>
             <scope>test</scope>
@@ -130,7 +130,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
+            <artifactId>kafka_2.13</artifactId>
             <version>${storm.kafka.client.version}</version>
             <scope>test</scope>
             <exclusions>
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 18010378d..cc2efc475 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -82,6 +82,7 @@ public class KafkaSpoutConfig<K, V> extends 
CommonKafkaSpoutConfig<K, V> {
         this.processingGuarantee = builder.processingGuarantee;
         this.tupleTrackingEnforced = builder.tupleTrackingEnforced;
         this.metricsTimeBucketSizeInSecs = builder.metricsTimeBucketSizeInSecs;
+        this.setConsumerGroupId(builder.groupId);
     }
 
     /**
@@ -122,6 +123,7 @@ public class KafkaSpoutConfig<K, V> extends 
CommonKafkaSpoutConfig<K, V> {
         private ProcessingGuarantee processingGuarantee = 
DEFAULT_PROCESSING_GUARANTEE;
         private boolean tupleTrackingEnforced = false;
         private int metricsTimeBucketSizeInSecs = 
DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;
+        private String groupId;
 
         public Builder(String bootstrapServers, String... topics) {
             super(bootstrapServers, topics);
@@ -160,6 +162,15 @@ public class KafkaSpoutConfig<K, V> extends 
CommonKafkaSpoutConfig<K, V> {
             return this;
         }
 
+        /**
+         * Specifies the group id.
+         * @param groupId the group id
+         */
+        public Builder<K, V> setGroupId(String groupId) {
+            this.groupId = groupId;
+            return this;
+        }
+
         /**
          * Defines the max number of polled offsets (records) that can be 
pending commit, before another poll can take place.
          * Once this limit is reached, no more offsets (records) can be polled 
until the next successful commit(s) sets the number
@@ -348,6 +359,12 @@ public class KafkaSpoutConfig<K, V> extends 
CommonKafkaSpoutConfig<K, V> {
         return (String) getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG);
     }
 
+    public void setConsumerGroupId(String groupId) {
+        if (groupId != null) {
+            getKafkaProps().put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        }
+    }
+
     public int getMaxUncommittedOffsets() {
         return maxUncommittedOffsets;
     }
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
index 97d358db2..50dbdf84a 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
 import kafka.utils.TestUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.kafka.clients.admin.AdminClient;
@@ -37,6 +36,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.storm.testing.TmpPath;
 
 public class KafkaUnit {
@@ -64,8 +64,7 @@ public class KafkaUnit {
         brokerProps.setProperty("listeners", 
String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
         brokerProps.setProperty("offsets.topic.replication.factor", "1");
         KafkaConfig config = new KafkaConfig(brokerProps);
-        MockTime mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);
+        kafkaServer = TestUtils.createServer(config, new MockTime());
 
         // setup default Producer
         createProducer();
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
index 0b0a64ed3..63b98eb6a 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.storm.Testing;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -66,7 +67,9 @@ public class KafkaBoltTest {
 
     @Test
     public void testSimple() {
-        MockProducer<String, String> producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+        MockProducer<String, String> producer = new MockProducer<>(
+                Cluster.empty(), false,
+                null, new StringSerializer(), new StringSerializer());
         KafkaBolt<String, String> bolt = makeBolt(producer);
 
         OutputCollector collector = mock(OutputCollector.class);
@@ -95,7 +98,9 @@ public class KafkaBoltTest {
 
     @Test
     public void testSimpleWithError() {
-        MockProducer<String, String> producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+        MockProducer<String, String> producer = new MockProducer<>(
+                Cluster.empty(), false,
+                null, new StringSerializer(), new StringSerializer());
         KafkaBolt<String, String> bolt = makeBolt(producer);
 
         OutputCollector collector = mock(OutputCollector.class);
@@ -126,7 +131,9 @@ public class KafkaBoltTest {
 
     @Test
     public void testCustomCallbackIsWrappedByDefaultCallbackBehavior() {
-        MockProducer<String, String> producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+        MockProducer<String, String> producer = new MockProducer<>(
+                Cluster.empty(), false,
+                null, new StringSerializer(), new StringSerializer());
         KafkaBolt<String, String> bolt = makeBolt(producer);
 
         PreparableCallback customCallback = mock(PreparableCallback.class);
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
index 6d520304c..e6dee2e1c 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
@@ -39,6 +39,7 @@ public class KafkaSpoutNullTupleTest extends 
KafkaSpoutAbstractTest {
     KafkaSpoutConfig<String, String> createSpoutConfig() {
         return KafkaSpoutConfig.builder("127.0.0.1:" + 
kafkaUnitExtension.getKafkaUnit().getKafkaPort(),
                 Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))
+                .setGroupId("test")
                 .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
                 .setRecordTranslator(new NullRecordTranslator<>())
                 .build();
diff --git a/pom.xml b/pom.xml
index c1716ee15..b97f1c357 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,7 +134,7 @@
         <jackson.version>2.15.2</jackson.version>
         <jackson.databind.version>2.15.2</jackson.databind.version>
 
-        <storm.kafka.client.version>0.11.0.3</storm.kafka.client.version>
+        <storm.kafka.client.version>3.6.0</storm.kafka.client.version>
 
         <!-- Java and clojure build lifecycle test properties are defined here 
to avoid having to create a default profile -->
         
<java.unit.test.exclude.groups>PerformanceTest</java.unit.test.exclude.groups>

Reply via email to