This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch STORM-4004---Upgrade-Kafka-Clients-to-3.6.0 in repository https://gitbox.apache.org/repos/asf/storm.git
commit ed5cd8bf75efd3a197999084514bb561d1d59fa1 Author: Richard Zowalla <richard.zowa...@hs-heilbronn.de> AuthorDate: Wed Nov 29 13:00:07 2023 +0100 STORM-4004 - Upgrade Kafka Clients to 3.6.0 --- LICENSE-binary | 5 +++-- external/storm-kafka-client/pom.xml | 4 ++-- .../org/apache/storm/kafka/spout/KafkaSpoutConfig.java | 17 +++++++++++++++++ .../src/test/java/org/apache/storm/kafka/KafkaUnit.java | 5 ++--- .../java/org/apache/storm/kafka/bolt/KafkaBoltTest.java | 13 ++++++++++--- .../storm/kafka/spout/KafkaSpoutNullTupleTest.java | 1 + pom.xml | 2 +- 7 files changed, 36 insertions(+), 11 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index a6c05f4c8..6861db1f9 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -738,7 +738,7 @@ The license texts of these dependencies can be found in the licenses directory. * Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.14 - http://hc.apache.org/httpcomponents-client-ga) * Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.16 - http://hc.apache.org/httpcomponents-core-ga) * Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/) - * Apache Kafka (org.apache.kafka:kafka-clients:0.11.0.3 - http://kafka.apache.org) + * Apache Kafka (org.apache.kafka:kafka-clients:3.6.0 - https://kafka.apache.org) * Apache Log4j 1.x Compatibility API (org.apache.logging.log4j:log4j-1.2-api:2.21.1 - https://logging.apache.org/log4j/2.x/log4j-1.2-api/) * Apache Log4j API (org.apache.logging.log4j:log4j-api:2.21.1 - https://logging.apache.org/log4j/2.x/log4j-api/) * Apache Log4j Core (org.apache.logging.log4j:log4j-core:2.21.1 - https://logging.apache.org/log4j/2.x/log4j-core/) @@ -886,7 +886,7 @@ The license texts of these dependencies can be found in the licenses directory. * Kerby PKIX Project (org.apache.kerby:kerby-pkix:1.0.1 - http://directory.apache.org/kerby/kerby-pkix) * Kerby Util (org.apache.kerby:kerby-util:1.0.1 - http://directory.apache.org/kerby/kerby-common/kerby-util) * Kerby XDR Project (org.apache.kerby:kerby-xdr:1.0.1 - http://directory.apache.org/kerby/kerby-common/kerby-xdr) - * LZ4 and xxHash (net.jpountz.lz4:lz4:1.3.0 - https://github.com/jpountz/lz4-java) + * LZ4 and xxHash (org.lz4:lz4-java:1.8.0 - https://github.com/lz4/lz4-java) * Maven Artifact (org.apache.maven:maven-artifact:3.6.0 - https://maven.apache.org/ref/3.6.0/maven-artifact/) * Maven Artifact Resolver API (org.apache.maven.resolver:maven-resolver-api:1.3.3 - https://maven.apache.org/resolver/maven-resolver-api/) * Maven Artifact Resolver Connector Basic (org.apache.maven.resolver:maven-resolver-connector-basic:1.3.3 - https://maven.apache.org/resolver/maven-resolver-connector-basic/) @@ -1030,6 +1030,7 @@ The license texts of these dependencies can be found in the licenses directory. BSD 2-Clause license * dnsjava (dnsjava:dnsjava:2.1.7 - http://www.dnsjava.org) + * zstd-jni (com.github.luben:zstd-jni:1.5.5-1 - https://github.com/luben/zstd-jni) BSD-3-Clause diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml index 36a4055d7..2b70874ca 100644 --- a/external/storm-kafka-client/pom.xml +++ b/external/storm-kafka-client/pom.xml @@ -110,7 +110,7 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> + <artifactId>kafka_2.13</artifactId> <version>${storm.kafka.client.version}</version> <classifier>test</classifier> <scope>test</scope> @@ -130,7 +130,7 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> + <artifactId>kafka_2.13</artifactId> <version>${storm.kafka.client.version}</version> <scope>test</scope> <exclusions> diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 18010378d..cc2efc475 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -82,6 +82,7 @@ public class KafkaSpoutConfig<K, V> extends CommonKafkaSpoutConfig<K, V> { this.processingGuarantee = builder.processingGuarantee; this.tupleTrackingEnforced = builder.tupleTrackingEnforced; this.metricsTimeBucketSizeInSecs = builder.metricsTimeBucketSizeInSecs; + this.setConsumerGroupId(builder.groupId); } /** @@ -122,6 +123,7 @@ public class KafkaSpoutConfig<K, V> extends CommonKafkaSpoutConfig<K, V> { private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE; private boolean tupleTrackingEnforced = false; private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS; + private String groupId; public Builder(String bootstrapServers, String... topics) { super(bootstrapServers, topics); @@ -160,6 +162,15 @@ public class KafkaSpoutConfig<K, V> extends CommonKafkaSpoutConfig<K, V> { return this; } + /** + * Specifies the group id. + * @param groupId the group id + */ + public Builder<K, V> setGroupId(String groupId) { + this.groupId = groupId; + return this; + } + /** * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number @@ -348,6 +359,12 @@ public class KafkaSpoutConfig<K, V> extends CommonKafkaSpoutConfig<K, V> { return (String) getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG); } + public void setConsumerGroupId(String groupId) { + if (groupId != null) { + getKafkaProps().put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + } + } + public int getMaxUncommittedOffsets() { return maxUncommittedOffsets; } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java index 97d358db2..50dbdf84a 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java @@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; -import kafka.utils.MockTime; import kafka.utils.TestUtils; import org.apache.curator.test.TestingServer; import org.apache.kafka.clients.admin.AdminClient; @@ -37,6 +36,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.utils.MockTime; import org.apache.storm.testing.TmpPath; public class KafkaUnit { @@ -64,8 +64,7 @@ public class KafkaUnit { brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT)); brokerProps.setProperty("offsets.topic.replication.factor", "1"); KafkaConfig config = new KafkaConfig(brokerProps); - MockTime mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock); + kafkaServer = TestUtils.createServer(config, new MockTime()); // setup default Producer createProducer(); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java index 0b0a64ed3..63b98eb6a 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.storm.Testing; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -66,7 +67,9 @@ public class KafkaBoltTest { @Test public void testSimple() { - MockProducer<String, String> producer = new MockProducer<>(Cluster.empty(), false, null, null, null); + MockProducer<String, String> producer = new MockProducer<>( + Cluster.empty(), false, + null, new StringSerializer(), new StringSerializer()); KafkaBolt<String, String> bolt = makeBolt(producer); OutputCollector collector = mock(OutputCollector.class); @@ -95,7 +98,9 @@ public class KafkaBoltTest { @Test public void testSimpleWithError() { - MockProducer<String, String> producer = new MockProducer<>(Cluster.empty(), false, null, null, null); + MockProducer<String, String> producer = new MockProducer<>( + Cluster.empty(), false, + null, new StringSerializer(), new StringSerializer()); KafkaBolt<String, String> bolt = makeBolt(producer); OutputCollector collector = mock(OutputCollector.class); @@ -126,7 +131,9 @@ public class KafkaBoltTest { @Test public void testCustomCallbackIsWrappedByDefaultCallbackBehavior() { - MockProducer<String, String> producer = new MockProducer<>(Cluster.empty(), false, null, null, null); + MockProducer<String, String> producer = new MockProducer<>( + Cluster.empty(), false, + null, new StringSerializer(), new StringSerializer()); KafkaBolt<String, String> bolt = makeBolt(producer); PreparableCallback customCallback = mock(PreparableCallback.class); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java index 6d520304c..e6dee2e1c 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java @@ -39,6 +39,7 @@ public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest { KafkaSpoutConfig<String, String> createSpoutConfig() { return KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitExtension.getKafkaUnit().getKafkaPort(), Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)) + .setGroupId("test") .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setRecordTranslator(new NullRecordTranslator<>()) .build(); diff --git a/pom.xml b/pom.xml index c1716ee15..b97f1c357 100644 --- a/pom.xml +++ b/pom.xml @@ -134,7 +134,7 @@ <jackson.version>2.15.2</jackson.version> <jackson.databind.version>2.15.2</jackson.databind.version> - <storm.kafka.client.version>0.11.0.3</storm.kafka.client.version> + <storm.kafka.client.version>3.6.0</storm.kafka.client.version> <!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile --> <java.unit.test.exclude.groups>PerformanceTest</java.unit.test.exclude.groups>