(kafka) branch 3.6 updated: KAFKA-16226; Reduce synchronization between producer threads (#15323) (#15498)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new 6a3cc229895 KAFKA-16226; Reduce synchronization between producer threads (#15323) (#15498) 6a3cc229895 is described below commit 6a3cc229895b36373360a079d970e071d318e4fe Author: Mayank Shekhar Narula <42991652+msn-t...@users.noreply.github.com> AuthorDate: Thu Mar 14 14:09:04 2024 + KAFKA-16226; Reduce synchronization between producer threads (#15323) (#15498) As this [JIRA](https://issues.apache.org/jira/browse/KAFKA-16226) explains, there is increased synchronization between application-thread, and the background thread as the background thread started to synchronized methods Metadata.currentLeader() in [original PR](https://github.com/apache/kafka/pull/14384). So this PR does the following changes 1. Changes background thread, i.e. RecordAccumulator's partitionReady(), and drainBatchesForOneNode(), to not use `Metadata.currentLeader()`. Instead rely on `MetadataCache` that is immutable. So access to it is unsynchronized. 2. This PR repurposes `MetadataCache` as an immutable snapshot of Metadata. This is a wrapper around public `Cluster`. `MetadataCache`'s API/functionality should be extended for internal client usage Vs public `Cluster`. For example, this PR adds `MetadataCache.leaderEpochFor()` 3. Rename `MetadataCache` to `MetadataSnapshot` to make it explicit its immutable. **Note both `Cluster` and `MetadataCache` are not syncronized, hence reduce synchronization from the hot path for high partition counts.** Reviewers: Jason Gustafson --- .../java/org/apache/kafka/clients/Metadata.java| 45 ++- .../{MetadataCache.java => MetadataSnapshot.java} | 77 ++-- .../clients/producer/internals/ProducerBatch.java | 13 +- .../producer/internals/RecordAccumulator.java | 78 ++-- .../kafka/clients/producer/internals/Sender.java | 6 +- .../kafka/common/requests/MetadataResponse.java| 6 +- ...ataCacheTest.java => MetadataSnapshotTest.java} | 49 ++- .../org/apache/kafka/clients/MetadataTest.java | 4 +- .../producer/internals/ProducerBatchTest.java | 36 +- .../producer/internals/RecordAccumulatorTest.java | 405 + .../clients/producer/internals/SenderTest.java | 9 +- .../producer/internals/TransactionManagerTest.java | 33 +- .../test/java/org/apache/kafka/test/TestUtils.java | 39 ++ 13 files changed, 424 insertions(+), 376 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index c42eb474a6c..8455e994f40 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -69,7 +69,7 @@ public class Metadata implements Closeable { private KafkaException fatalException; private Set invalidTopics; private Set unauthorizedTopics; -private MetadataCache cache = MetadataCache.empty(); +private volatile MetadataSnapshot metadataSnapshot = MetadataSnapshot.empty(); private boolean needFullUpdate; private boolean needPartialUpdate; private final ClusterResourceListeners clusterResourceListeners; @@ -108,8 +108,15 @@ public class Metadata implements Closeable { /** * Get the current cluster info without blocking */ -public synchronized Cluster fetch() { -return cache.cluster(); +public Cluster fetch() { +return metadataSnapshot.cluster(); +} + +/** + * Get the current metadata cache. + */ +public MetadataSnapshot fetchMetadataSnapshot() { +return metadataSnapshot; } /** @@ -207,7 +214,7 @@ public class Metadata implements Closeable { */ synchronized Optional partitionMetadataIfCurrent(TopicPartition topicPartition) { Integer epoch = lastSeenLeaderEpochs.get(topicPartition); -Optional partitionMetadata = cache.partitionMetadata(topicPartition); +Optional partitionMetadata = metadataSnapshot.partitionMetadata(topicPartition); if (epoch == null) { // old cluster format (no epochs) return partitionMetadata; @@ -220,8 +227,8 @@ public class Metadata implements Closeable { /** * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache */ -public synchronized Map topicIds() { -return cache.topicIds(); +public Map topicIds() { +return metadataSnapshot.topicIds(); } public synchronized LeaderAndEpoch currentLeader(TopicPartition topicPartition) { @@ -231,14 +238,14 @@ public class Metadata implements Closeable { MetadataResponse.PartitionMetadata partitionMetadat
(kafka) branch 3.7 updated: KAFKA-16226; Reduce synchronization between producer threads (#15323) (#15493)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.7 by this push: new f31307aa223 KAFKA-16226; Reduce synchronization between producer threads (#15323) (#15493) f31307aa223 is described below commit f31307aa223c1f2f418a41bbf8f4e2100659e319 Author: Mayank Shekhar Narula <42991652+msn-t...@users.noreply.github.com> AuthorDate: Thu Mar 14 13:46:11 2024 + KAFKA-16226; Reduce synchronization between producer threads (#15323) (#15493) As this [JIRA](https://issues.apache.org/jira/browse/KAFKA-16226) explains, there is increased synchronization between application-thread, and the background thread as the background thread started to synchronized methods Metadata.currentLeader() in [original PR](https://github.com/apache/kafka/pull/14384). So this PR does the following changes 1. Changes background thread, i.e. RecordAccumulator's partitionReady(), and drainBatchesForOneNode(), to not use `Metadata.currentLeader()`. Instead rely on `MetadataCache` that is immutable. So access to it is unsynchronized. 2. This PR repurposes `MetadataCache` as an immutable snapshot of Metadata. This is a wrapper around public `Cluster`. `MetadataCache`'s API/functionality should be extended for internal client usage Vs public `Cluster`. For example, this PR adds `MetadataCache.leaderEpochFor()` 3. Rename `MetadataCache` to `MetadataSnapshot` to make it explicit its immutable. **Note both `Cluster` and `MetadataCache` are not synchronized, hence reduce synchronization from the hot path for high partition counts.** Reviewers: Jason Gustafson --- .../java/org/apache/kafka/clients/Metadata.java| 65 +-- .../{MetadataCache.java => MetadataSnapshot.java} | 77 ++-- .../clients/producer/internals/ProducerBatch.java | 13 +- .../producer/internals/RecordAccumulator.java | 78 ++-- .../kafka/clients/producer/internals/Sender.java | 6 +- .../kafka/common/requests/MetadataResponse.java| 6 +- ...ataCacheTest.java => MetadataSnapshotTest.java} | 56 ++- .../org/apache/kafka/clients/MetadataTest.java | 4 +- .../producer/internals/ProducerBatchTest.java | 36 +- .../producer/internals/RecordAccumulatorTest.java | 468 + .../clients/producer/internals/SenderTest.java | 9 +- .../producer/internals/TransactionManagerTest.java | 33 +- .../test/java/org/apache/kafka/test/TestUtils.java | 39 ++ 13 files changed, 476 insertions(+), 414 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 607c74eeddb..30cad44a4bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -75,7 +75,7 @@ public class Metadata implements Closeable { private KafkaException fatalException; private Set invalidTopics; private Set unauthorizedTopics; -private MetadataCache cache = MetadataCache.empty(); +private volatile MetadataSnapshot metadataSnapshot = MetadataSnapshot.empty(); private boolean needFullUpdate; private boolean needPartialUpdate; private long equivalentResponseCount; @@ -123,8 +123,15 @@ public class Metadata implements Closeable { /** * Get the current cluster info without blocking */ -public synchronized Cluster fetch() { -return cache.cluster(); +public Cluster fetch() { +return metadataSnapshot.cluster(); +} + +/** + * Get the current metadata cache. + */ +public MetadataSnapshot fetchMetadataSnapshot() { +return metadataSnapshot; } /** @@ -265,7 +272,7 @@ public class Metadata implements Closeable { */ synchronized Optional partitionMetadataIfCurrent(TopicPartition topicPartition) { Integer epoch = lastSeenLeaderEpochs.get(topicPartition); -Optional partitionMetadata = cache.partitionMetadata(topicPartition); +Optional partitionMetadata = metadataSnapshot.partitionMetadata(topicPartition); if (epoch == null) { // old cluster format (no epochs) return partitionMetadata; @@ -278,8 +285,8 @@ public class Metadata implements Closeable { /** * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache */ -public synchronized Map topicIds() { -return cache.topicIds(); +public Map topicIds() { +return metadataSnapshot.topicIds(); } public synchronized LeaderAndEpoch currentLeader(TopicPartition topicPartition) { @@ -289,14 +296,14 @@ public class Metadata implements Closeable { MetadataResponse.PartitionMetadata partitionMetadata = maybeMetadata.get(); O
(kafka) branch trunk updated: MINOR: Remove unnecessary easymock/powermock dependencies (#15460)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2c0cab39aed MINOR: Remove unnecessary easymock/powermock dependencies (#15460) 2c0cab39aed is described below commit 2c0cab39aedab3a8635510acfac2551aaeb62ffb Author: Ismael Juma AuthorDate: Sun Mar 3 23:31:57 2024 -0800 MINOR: Remove unnecessary easymock/powermock dependencies (#15460) These projects don't actually use easymock/powermock. Reviewers: Chia-Ping Tsai --- build.gradle | 7 +-- 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/build.gradle b/build.gradle index b362c2f673a..f2ba22e86b3 100644 --- a/build.gradle +++ b/build.gradle @@ -2183,8 +2183,6 @@ project(':streams') { testImplementation libs.junitJupiter testImplementation libs.junitVintageEngine testImplementation libs.easymock -testImplementation libs.powermockJunit4 -testImplementation libs.powermockEasymock testImplementation libs.bcpkix testImplementation libs.hamcrest testImplementation libs.mockitoCore @@ -2332,7 +2330,6 @@ project(':streams:streams-scala') { testImplementation project(':streams:test-utils') testImplementation libs.junitJupiter -testImplementation libs.easymock testImplementation libs.mockitoCore testImplementation libs.mockitoJunitJupiter // supports MockitoExtension testImplementation libs.hamcrest @@ -2869,7 +2866,6 @@ project(':connect:transforms') { implementation libs.slf4jApi -testImplementation libs.easymock testImplementation libs.junitJupiter testRuntimeOnly libs.slf4jlog4j @@ -2909,8 +2905,7 @@ project(':connect:json') { api libs.jacksonAfterburner implementation libs.slf4jApi - -testImplementation libs.easymock + testImplementation libs.junitJupiter testRuntimeOnly libs.slf4jlog4j
(kafka) branch trunk updated: Delete unused classes (#14797)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 70e0dbd7954 Delete unused classes (#14797) 70e0dbd7954 is described below commit 70e0dbd7954c68910b8d7de75e3deb680dd53357 Author: Ismael Juma AuthorDate: Tue Jan 23 22:04:44 2024 -0800 Delete unused classes (#14797) Reviewers: Mickael Maison --- .../common/InconsistentBrokerIdException.scala | 27 -- .../InconsistentBrokerMetadataException.scala | 27 -- .../common/InconsistentClusterIdException.scala| 27 -- .../kafka/common/InconsistentNodeIdException.scala | 22 -- .../kafka/server/ServerGenerateBrokerIdTest.scala | 6 + 5 files changed, 1 insertion(+), 108 deletions(-) diff --git a/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala b/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala deleted file mode 100644 index 0c0d1cd731a..000 --- a/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Indicates the brokerId stored in logDirs is not consistent across logDirs. - */ -class InconsistentBrokerIdException(message: String, cause: Throwable) extends RuntimeException(message, cause) { - def this(message: String) = this(message, null) - def this(cause: Throwable) = this(null, cause) - def this() = this(null, null) -} diff --git a/core/src/main/scala/kafka/common/InconsistentBrokerMetadataException.scala b/core/src/main/scala/kafka/common/InconsistentBrokerMetadataException.scala deleted file mode 100644 index 2b11512e44c..000 --- a/core/src/main/scala/kafka/common/InconsistentBrokerMetadataException.scala +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Indicates the BrokerMetadata stored in logDirs is not consistent across logDirs. - */ -class InconsistentBrokerMetadataException(message: String, cause: Throwable) extends RuntimeException(message, cause) { - def this(message: String) = this(message, null) - def this(cause: Throwable) = this(null, cause) - def this() = this(null, null) -} diff --git a/core/src/main/scala/kafka/common/InconsistentClusterIdException.scala b/core/src/main/scala/kafka/common/InconsistentClusterIdException.scala deleted file mode 100644 index 6868dd8780d..000 --- a/core/src/main/scala/kafka/common/InconsistentClusterIdException.scala +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS&q
(kafka) branch 3.7 updated: Note that Java 11 support for broker and tools is deprecated for removal in 4.0 (#15236)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.7 by this push: new 6535fe7f049 Note that Java 11 support for broker and tools is deprecated for removal in 4.0 (#15236) 6535fe7f049 is described below commit 6535fe7f04946a5773af35f5d6ca82cebe131e3e Author: Ismael Juma AuthorDate: Fri Jan 19 14:08:07 2024 -0800 Note that Java 11 support for broker and tools is deprecated for removal in 4.0 (#15236) Reviewers: Divij Vaidya --- README.md | 6 -- docs/ops.html | 10 +++--- docs/upgrade.html | 4 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 1a645dc409f..b857e7c1843 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,10 @@ You need to have [Java](http://www.oracle.com/technetwork/java/javase/downloads/ We build and test Apache Kafka with Java 8, 11, 17 and 21. We set the `release` parameter in javac and scalac to `8` to ensure the generated binaries are compatible with Java 8 or higher (independently of the Java version -used for compilation). Java 8 support has been deprecated since Apache Kafka 3.0 and will be removed in Apache -Kafka 4.0 (see [KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223) for more details). +used for compilation). Java 8 support project-wide has been deprecated since Apache Kafka 3.0, Java 11 support for +the broker and tools has been deprecated since Apache Kafka 3.7 and removal of both is planned for Apache Kafka 4.0 ( +see [KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223) and +[KIP-1013](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510) for more details). Scala 2.12 and 2.13 are supported and 2.13 is used by default. Scala 2.12 support has been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0 (see [KIP-751](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218) diff --git a/docs/ops.html b/docs/ops.html index eb0cbd78828..f943dbed630 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -1289,12 +1289,16 @@ $ bin/kafka-acls.sh \ 6.6 Java Version - Java 8, Java 11, and Java 17 are supported. Note that Java 8 support has been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0. + Java 8, Java 11, and Java 17 are supported. + + Note that Java 8 support project-wide has been deprecated since Apache Kafka 3.0 and Java 11 support for the broker and tools + has been deprecated since Apache Kafka 3.7. Both will be removed in Apache Kafka 4.0. + Java 11 and later versions perform significantly better if TLS is enabled, so they are highly recommended (they also include a number of other performance improvements: G1GC, CRC32C, Compact Strings, Thread-Local Handshakes and more). - + From a security perspective, we recommend the latest released patch version as older freely available versions have disclosed security vulnerabilities. - + Typical arguments for running Kafka with OpenJDK-based Java implementations (including Oracle JDK) are: -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC diff --git a/docs/upgrade.html b/docs/upgrade.html index dc3321616d6..079a7bdd47d 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -23,6 +23,10 @@ Notable changes in 3.7.0 +Java 11 support for the broker and tools has been deprecated and will be removed in Apache Kafka 4.0. This complements +the previous deprecation of Java 8 for all components. Please refer to +https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510";>KIP-1013 for more details. + More metrics related to Tiered Storage have been introduced. They should improve the operational experience of running Tiered Storage in production. For more detailed information, please refer to https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Additional+metrics+in+Tiered+Storage";>KIP-963.
(kafka) branch trunk updated: Note that Java 11 support for broker and tools is deprecated for removal in 4.0 (#15236)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 92a67e85715 Note that Java 11 support for broker and tools is deprecated for removal in 4.0 (#15236) 92a67e85715 is described below commit 92a67e8571500a53cc864ba6df4cb9cfdac6a763 Author: Ismael Juma AuthorDate: Fri Jan 19 14:08:07 2024 -0800 Note that Java 11 support for broker and tools is deprecated for removal in 4.0 (#15236) Reviewers: Divij Vaidya --- README.md | 6 -- docs/ops.html | 10 +++--- docs/upgrade.html | 4 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 1a645dc409f..b857e7c1843 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,10 @@ You need to have [Java](http://www.oracle.com/technetwork/java/javase/downloads/ We build and test Apache Kafka with Java 8, 11, 17 and 21. We set the `release` parameter in javac and scalac to `8` to ensure the generated binaries are compatible with Java 8 or higher (independently of the Java version -used for compilation). Java 8 support has been deprecated since Apache Kafka 3.0 and will be removed in Apache -Kafka 4.0 (see [KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223) for more details). +used for compilation). Java 8 support project-wide has been deprecated since Apache Kafka 3.0, Java 11 support for +the broker and tools has been deprecated since Apache Kafka 3.7 and removal of both is planned for Apache Kafka 4.0 ( +see [KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223) and +[KIP-1013](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510) for more details). Scala 2.12 and 2.13 are supported and 2.13 is used by default. Scala 2.12 support has been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0 (see [KIP-751](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218) diff --git a/docs/ops.html b/docs/ops.html index eb0cbd78828..f943dbed630 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -1289,12 +1289,16 @@ $ bin/kafka-acls.sh \ 6.6 Java Version - Java 8, Java 11, and Java 17 are supported. Note that Java 8 support has been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0. + Java 8, Java 11, and Java 17 are supported. + + Note that Java 8 support project-wide has been deprecated since Apache Kafka 3.0 and Java 11 support for the broker and tools + has been deprecated since Apache Kafka 3.7. Both will be removed in Apache Kafka 4.0. + Java 11 and later versions perform significantly better if TLS is enabled, so they are highly recommended (they also include a number of other performance improvements: G1GC, CRC32C, Compact Strings, Thread-Local Handshakes and more). - + From a security perspective, we recommend the latest released patch version as older freely available versions have disclosed security vulnerabilities. - + Typical arguments for running Kafka with OpenJDK-based Java implementations (including Oracle JDK) are: -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC diff --git a/docs/upgrade.html b/docs/upgrade.html index dc3321616d6..079a7bdd47d 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -23,6 +23,10 @@ Notable changes in 3.7.0 +Java 11 support for the broker and tools has been deprecated and will be removed in Apache Kafka 4.0. This complements +the previous deprecation of Java 8 for all components. Please refer to +https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510";>KIP-1013 for more details. + More metrics related to Tiered Storage have been introduced. They should improve the operational experience of running Tiered Storage in production. For more detailed information, please refer to https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Additional+metrics+in+Tiered+Storage";>KIP-963.
(kafka) branch trunk updated: KAFKA-15853: Move ProcessRole to server module (#15166)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 13a83d58f89 KAFKA-15853: Move ProcessRole to server module (#15166) 13a83d58f89 is described below commit 13a83d58f897de2f55d8d3342ffb058b230a9183 Author: Omnia Ibrahim AuthorDate: Wed Jan 10 23:13:06 2024 + KAFKA-15853: Move ProcessRole to server module (#15166) Prepare to move KafkaConfig (#15103). Reviewers: Ismael Juma --- core/src/main/scala/kafka/raft/RaftManager.scala | 4 +-- .../scala/kafka/server/DynamicBrokerConfig.scala | 4 +-- core/src/main/scala/kafka/server/KafkaConfig.scala | 16 +- .../main/scala/kafka/server/KafkaRaftServer.scala | 14 ++--- .../src/main/scala/kafka/server/SharedServer.scala | 8 ++--- .../scala/unit/kafka/raft/RaftManagerTest.scala| 18 +--- .../java/org/apache/kafka/server/ProcessRole.java | 34 ++ 7 files changed, 61 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index d80a0d50137..a9e64fb967b 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -24,7 +24,6 @@ import java.util.OptionalInt import java.util.concurrent.CompletableFuture import kafka.log.LogManager import kafka.log.UnifiedLog -import kafka.server.KafkaRaftServer.ControllerRole import kafka.server.KafkaConfig import kafka.utils.CoreUtils import kafka.utils.FileLock @@ -42,6 +41,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec} import org.apache.kafka.raft.{FileBasedStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, RaftClient, RaftConfig, ReplicatedLog} +import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.common.serialization.RecordSerde import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.server.fault.FaultHandler @@ -120,7 +120,7 @@ class KafkaRaftManager[T]( .map(Paths.get(_).toAbsolutePath) .contains(Paths.get(config.metadataLogDir).toAbsolutePath) // Or this node is only a controller -val isOnlyController = config.processRoles == Set(ControllerRole) +val isOnlyController = config.processRoles == Set(ProcessRole.ControllerRole) if (differentMetadataLogDir || isOnlyController) { Some(KafkaRaftManager.lockDataDir(new File(config.metadataLogDir))) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index d63272c3731..7f607d49a4f 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -25,7 +25,6 @@ import kafka.cluster.EndPoint import kafka.log.{LogCleaner, LogManager} import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.server.DynamicBrokerConfig._ -import kafka.server.KafkaRaftServer.BrokerRole import kafka.utils.{CoreUtils, Logging, PasswordEncoder} import kafka.utils.Implicits._ import kafka.zk.{AdminZkClient, KafkaZkClient} @@ -36,6 +35,7 @@ import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable} import org.apache.kafka.common.security.authenticator.LoginManager import org.apache.kafka.common.utils.{ConfigUtils, Utils} +import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.config.{ConfigEntityName, ConfigType, ServerTopicConfigSynonyms} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin @@ -287,7 +287,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging case Some(authz: Reconfigurable) => addReconfigurable(authz) case _ => } -if (!kafkaConfig.processRoles.contains(BrokerRole)) { +if (!kafkaConfig.processRoles.contains(ProcessRole.BrokerRole)) { // only add these if the controller isn't also running the broker role // because these would already be added via the broker in that case addReconfigurable(controller.kafkaYammerMetrics) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 03d55c309e8..2afb1d64387 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -25,7 +25,6 @@ import kafka.coordinator.group.OffsetConfig import kafka.coordinator.transaction.{TransactionLog, TransactionStateManag
(kafka) branch 3.7 updated: KAFKA-15874: Add metric and request log attribute for deprecated request api versions (KIP-896) (#15032)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.7 by this push: new 1b4ad6d KAFKA-15874: Add metric and request log attribute for deprecated request api versions (KIP-896) (#15032) 1b4ad6d is described below commit 1b4ad6de349dea3ca145fa6bac0249ba5d85 Author: Ismael Juma AuthorDate: Wed Dec 20 05:13:36 2023 -0800 KAFKA-15874: Add metric and request log attribute for deprecated request api versions (KIP-896) (#15032) Breakdown of this PR: * Extend the generator to support deprecated api versions * Set deprecated api versions via the request json files * Expose the information via metrics and the request log The relevant section of the KIP: > * Introduce metric `kafka.network:type=RequestMetrics,name=DeprecatedRequestsPerSec,request=(api-name),version=(api-version),clientSoftwareName=(client-software-name),clientSoftwareVersion=(client-software-version)` > * Add boolean field `requestApiVersionDeprecated` to the request header section of the request log (alongside `requestApiKey` , `requestApiVersion`, `requestApiKeyName` , etc.). Unit tests were added to verify the new generator functionality, the new metric and the new request log attribute. Reviewers: Jason Gustafson --- .../org/apache/kafka/common/protocol/ApiKeys.java | 4 ++ .../common/message/AlterReplicaLogDirsRequest.json | 1 + .../common/message/CreateAclsRequest.json | 1 + .../message/CreateDelegationTokenRequest.json | 1 + .../common/message/CreateTopicsRequest.json| 1 + .../common/message/DeleteAclsRequest.json | 1 + .../common/message/DeleteGroupsRequest.json| 1 + .../common/message/DeleteTopicsRequest.json| 1 + .../common/message/DescribeAclsRequest.json| 1 + .../common/message/DescribeConfigsRequest.json | 1 + .../message/DescribeDelegationTokenRequest.json| 1 + .../common/message/DescribeLogDirsRequest.json | 3 +- .../message/ExpireDelegationTokenRequest.json | 1 + .../resources/common/message/FetchRequest.json | 1 + .../common/message/FindCoordinatorRequest.json | 1 + .../resources/common/message/JoinGroupRequest.json | 1 + .../common/message/ListOffsetsRequest.json | 1 + .../resources/common/message/MetadataRequest.json | 1 + .../common/message/OffsetCommitRequest.json| 1 + .../common/message/OffsetFetchRequest.json | 1 + .../message/OffsetForLeaderEpochRequest.json | 1 + .../resources/common/message/ProduceRequest.json | 1 + .../message/RenewDelegationTokenRequest.json | 1 + .../common/message/SaslHandshakeRequest.json | 1 + .../main/scala/kafka/network/RequestChannel.scala | 29 ++-- .../scala/kafka/network/RequestConvertToJson.scala | 2 + .../kafka/network/RequestConvertToJsonTest.scala | 16 +++ .../unit/kafka/network/SocketServerTest.scala | 51 +- .../test/scala/unit/kafka/utils/TestUtils.scala| 8 ++-- .../kafka/message/ApiMessageTypeGenerator.java | 13 +- .../java/org/apache/kafka/message/MessageSpec.java | 3 +- .../org/apache/kafka/message/StructRegistry.java | 1 + .../java/org/apache/kafka/message/StructSpec.java | 8 .../kafka/message/MessageDataGeneratorTest.java| 22 ++ .../apache/kafka/message/StructRegistryTest.java | 3 +- 35 files changed, 172 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index a5c6ef5ea83..ee0d773b070 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -234,6 +234,10 @@ public enum ApiKeys { return apiVersion >= oldestVersion() && apiVersion <= latestVersion(enableUnstableLastVersion); } +public boolean isVersionDeprecated(short apiVersion) { +return apiVersion >= messageType.lowestDeprecatedVersion() && apiVersion <= messageType.highestDeprecatedVersion(); +} + public Optional toApiVersion(boolean enableUnstableLastVersion) { short oldestVersion = oldestVersion(); short latestVersion = latestVersion(enableUnstableLastVersion); diff --git a/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json b/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json index 2306caaf984..c85c61d6e54 100644 --- a/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json +++ b/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json @@ -21,6 +21,7 @@ // Version 1 is the same as version 0. // V
(kafka) branch trunk updated: KAFKA-15874: Add metric and request log attribute for deprecated request api versions (KIP-896) (#15032)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 919b585da0c KAFKA-15874: Add metric and request log attribute for deprecated request api versions (KIP-896) (#15032) 919b585da0c is described below commit 919b585da0c824efa5250a9fda62b42468caabc6 Author: Ismael Juma AuthorDate: Wed Dec 20 05:13:36 2023 -0800 KAFKA-15874: Add metric and request log attribute for deprecated request api versions (KIP-896) (#15032) Breakdown of this PR: * Extend the generator to support deprecated api versions * Set deprecated api versions via the request json files * Expose the information via metrics and the request log The relevant section of the KIP: > * Introduce metric `kafka.network:type=RequestMetrics,name=DeprecatedRequestsPerSec,request=(api-name),version=(api-version),clientSoftwareName=(client-software-name),clientSoftwareVersion=(client-software-version)` > * Add boolean field `requestApiVersionDeprecated` to the request header section of the request log (alongside `requestApiKey` , `requestApiVersion`, `requestApiKeyName` , etc.). Unit tests were added to verify the new generator functionality, the new metric and the new request log attribute. Reviewers: Jason Gustafson --- .../org/apache/kafka/common/protocol/ApiKeys.java | 4 ++ .../common/message/AlterReplicaLogDirsRequest.json | 1 + .../common/message/CreateAclsRequest.json | 1 + .../message/CreateDelegationTokenRequest.json | 1 + .../common/message/CreateTopicsRequest.json| 1 + .../common/message/DeleteAclsRequest.json | 1 + .../common/message/DeleteGroupsRequest.json| 1 + .../common/message/DeleteTopicsRequest.json| 1 + .../common/message/DescribeAclsRequest.json| 1 + .../common/message/DescribeConfigsRequest.json | 1 + .../message/DescribeDelegationTokenRequest.json| 1 + .../common/message/DescribeLogDirsRequest.json | 3 +- .../message/ExpireDelegationTokenRequest.json | 1 + .../resources/common/message/FetchRequest.json | 1 + .../common/message/FindCoordinatorRequest.json | 1 + .../resources/common/message/JoinGroupRequest.json | 1 + .../common/message/ListOffsetsRequest.json | 1 + .../resources/common/message/MetadataRequest.json | 1 + .../common/message/OffsetCommitRequest.json| 1 + .../common/message/OffsetFetchRequest.json | 1 + .../message/OffsetForLeaderEpochRequest.json | 1 + .../resources/common/message/ProduceRequest.json | 1 + .../message/RenewDelegationTokenRequest.json | 1 + .../common/message/SaslHandshakeRequest.json | 1 + .../main/scala/kafka/network/RequestChannel.scala | 29 ++-- .../scala/kafka/network/RequestConvertToJson.scala | 2 + .../kafka/network/RequestConvertToJsonTest.scala | 16 +++ .../unit/kafka/network/SocketServerTest.scala | 51 +- .../test/scala/unit/kafka/utils/TestUtils.scala| 8 ++-- .../kafka/message/ApiMessageTypeGenerator.java | 13 +- .../java/org/apache/kafka/message/MessageSpec.java | 3 +- .../org/apache/kafka/message/StructRegistry.java | 1 + .../java/org/apache/kafka/message/StructSpec.java | 8 .../kafka/message/MessageDataGeneratorTest.java| 22 ++ .../apache/kafka/message/StructRegistryTest.java | 3 +- 35 files changed, 172 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index a5c6ef5ea83..ee0d773b070 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -234,6 +234,10 @@ public enum ApiKeys { return apiVersion >= oldestVersion() && apiVersion <= latestVersion(enableUnstableLastVersion); } +public boolean isVersionDeprecated(short apiVersion) { +return apiVersion >= messageType.lowestDeprecatedVersion() && apiVersion <= messageType.highestDeprecatedVersion(); +} + public Optional toApiVersion(boolean enableUnstableLastVersion) { short oldestVersion = oldestVersion(); short latestVersion = latestVersion(enableUnstableLastVersion); diff --git a/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json b/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json index 2306caaf984..c85c61d6e54 100644 --- a/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json +++ b/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json @@ -21,6 +21,7 @@ // Version 1 is the same as version 0. // V
(kafka) branch trunk updated: KAFKA-15854: Move Java classes from `kafka.server` to the `server` module (#14796)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new df78204e05b KAFKA-15854: Move Java classes from `kafka.server` to the `server` module (#14796) df78204e05b is described below commit df78204e05bb1c416d977a0a36f4c263251ae4ef Author: Ismael Juma AuthorDate: Sun Nov 19 22:09:19 2023 -0800 KAFKA-15854: Move Java classes from `kafka.server` to the `server` module (#14796) We only move Java classes that have minimal or no dependencies on Scala classes in this PR. Details: * Configured `server` module in build files. * Changed `ControllerRequestCompletionHandler` to be an interface since it has no implementations. * Cleaned up various import control files. * Minor build clean-ups for `server-common`. * Disabled `testAssignmentAggregation` when executed with Java 8, this is an existing issue (see #14794). For broader context on this change, please check: * KAFKA-15852: Move server code from `core` to `server` module Reviewers: Divij Vaidya --- build.gradle | 76 ++ checkstyle/import-control-metadata.xml | 5 -- checkstyle/import-control-server-common.xml| 4 -- checkstyle/import-control-server.xml | 61 + checkstyle/import-control-storage.xml | 4 -- .../kafka/server/builders/KafkaApisBuilder.java| 2 +- .../transaction/ProducerIdManager.scala| 2 +- .../scala/kafka/server/AlterPartitionManager.scala | 3 +- .../kafka/server/AutoTopicCreationManager.scala| 5 +- .../kafka/server/BrokerLifecycleManager.scala | 1 + .../src/main/scala/kafka/server/BrokerServer.scala | 7 +- .../main/scala/kafka/server/ConfigHandler.scala| 1 + .../server/ControllerConfigurationValidator.scala | 3 +- .../server/ControllerRegistrationManager.scala | 1 + .../main/scala/kafka/server/ControllerServer.scala | 3 +- .../scala/kafka/server/ForwardingManager.scala | 4 +- core/src/main/scala/kafka/server/KafkaApis.scala | 1 + core/src/main/scala/kafka/server/KafkaBroker.scala | 1 + core/src/main/scala/kafka/server/KafkaServer.scala | 5 +- .../server/NodeToControllerChannelManager.scala| 47 ++--- .../kafka/zk/ZkMigrationIntegrationTest.scala | 3 +- .../server/NodeToControllerRequestThreadTest.scala | 1 + .../scala/unit/kafka/cluster/PartitionTest.scala | 1 + .../transaction/ProducerIdManagerTest.scala| 2 +- .../kafka/server/AlterPartitionManagerTest.scala | 1 + .../server/AutoTopicCreationManagerTest.scala | 4 +- .../server/BrokerRegistrationRequestTest.scala | 3 +- .../ControllerConfigurationValidatorTest.scala | 4 +- .../scala/unit/kafka/server/KafkaApisTest.scala| 2 +- .../MockNodeToControllerChannelManager.scala | 7 +- .../test/scala/unit/kafka/utils/TestUtils.scala| 1 + .../apache}/kafka/server/AssignmentsManager.java | 4 +- .../apache}/kafka/server/ClientMetricsManager.java | 2 +- .../server/ControllerRequestCompletionHandler.java | 30 + .../server/NodeToControllerChannelManager.java | 37 +++ .../server}/metrics/ClientMetricsConfigs.java | 2 +- .../java/org/apache/kafka/server/package-info.java | 20 ++ .../kafka/server/AssignmentsManagerTest.java | 7 +- .../server}/metrics/ClientMetricsTestUtils.java| 2 +- settings.gradle| 1 + 40 files changed, 268 insertions(+), 102 deletions(-) diff --git a/build.gradle b/build.gradle index 3b7878a3e89..608a57d56ad 100644 --- a/build.gradle +++ b/build.gradle @@ -842,6 +842,62 @@ tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + ":test" }) {} +project(':server') { + archivesBaseName = "kafka-server" + + dependencies { +implementation project(':clients') +implementation project(':server-common') + +implementation libs.slf4jApi + +compileOnly libs.log4j + +testImplementation project(':clients').sourceSets.test.output + +testImplementation libs.mockitoCore +testImplementation libs.junitJupiter +testImplementation libs.slf4jlog4j + } + + task createVersionFile() { +def receiptFile = file("$buildDir/kafka/$buildVersionFileName") +inputs.property "commitId", commitId +inputs.property "version", version +outputs.file receiptFile + +doLast { + def data = [ +commitId: commitId, +version: version, + ] + + receiptFile.parentFile.mkdirs() + def content = data.entrySet().c
(kafka) branch trunk updated: MINOR: Improve printing topic name when created topic in TopicCommand (#14661)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 066635819a9 MINOR: Improve printing topic name when created topic in TopicCommand (#14661) 066635819a9 is described below commit 066635819a9c59d15616249e93fac43dea797474 Author: runom AuthorDate: Mon Nov 20 09:03:07 2023 +0900 MINOR: Improve printing topic name when created topic in TopicCommand (#14661) The topic name was displayed as `Optional` when the topic was created. ``` % bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 Created topic Optional[test]. ``` This PR fixed to print the topic name as `String` instead of `Optional`. ``` % bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 Created topic test. ``` Reviewers: Ismael Juma --- tools/src/main/java/org/apache/kafka/tools/TopicCommand.java | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java index e490007bd23..ae725012dc1 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java @@ -247,7 +247,7 @@ public abstract class TopicCommand { } static class CommandTopicPartition { -private final Optional name; +private final String name; private final Optional partitions; private final Optional replicationFactor; private final Map> replicaAssignment; @@ -257,7 +257,7 @@ public abstract class TopicCommand { public CommandTopicPartition(TopicCommandOptions options) { opts = options; -name = options.topic(); +name = options.topic().get(); partitions = options.partitions(); replicationFactor = options.replicationFactor(); replicaAssignment = options.replicaAssignment().orElse(Collections.emptyMap()); @@ -439,7 +439,7 @@ public abstract class TopicCommand { public void createTopic(TopicCommandOptions opts) throws Exception { CommandTopicPartition topic = new CommandTopicPartition(opts); -if (Topic.hasCollisionChars(topic.name.get())) { +if (Topic.hasCollisionChars(topic.name)) { System.out.println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could " + "collide. To avoid issues it is best to use either, but not both."); } @@ -457,9 +457,9 @@ public abstract class TopicCommand { try { NewTopic newTopic; if (topic.hasReplicaAssignment()) { -newTopic = new NewTopic(topic.name.get(), topic.replicaAssignment); +newTopic = new NewTopic(topic.name, topic.replicaAssignment); } else { -newTopic = new NewTopic(topic.name.get(), topic.partitions, topic.replicationFactor.map(Integer::shortValue)); +newTopic = new NewTopic(topic.name, topic.partitions, topic.replicationFactor.map(Integer::shortValue)); } Map configsMap = topic.configsToAdd.stringPropertyNames().stream()
(kafka) branch trunk updated: MINOR: Push down logic from TransactionManager to TxnPartitionEntry (#14591)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new fa36a7f2d66 MINOR: Push down logic from TransactionManager to TxnPartitionEntry (#14591) fa36a7f2d66 is described below commit fa36a7f2d664bac8524f830663023b83f5ee090b Author: Ismael Juma AuthorDate: Sat Oct 28 07:27:20 2023 -0700 MINOR: Push down logic from TransactionManager to TxnPartitionEntry (#14591) And encapsulate TxnPartitionEntry state. This makes it easier to understand the behavior and the paths through which the state is updated. Reviewers: Justine Olshan --- .../clients/producer/internals/ProducerBatch.java | 4 +- .../producer/internals/RecordAccumulator.java | 2 +- .../producer/internals/TransactionManager.java | 105 -- .../producer/internals/TxnPartitionEntry.java | 115 ++-- .../producer/internals/TxnPartitionMap.java| 95 +++- .../kafka/common/utils/ProducerIdAndEpoch.java | 2 +- .../clients/producer/internals/SenderTest.java | 120 ++--- .../producer/internals/TransactionManagerTest.java | 86 +++ 8 files changed, 305 insertions(+), 224 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 408b8316eb8..61432b53ab0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -473,11 +473,11 @@ public final class ProducerBatch { recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional); } -public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) { +public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence) { log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}", this.baseSequence(), this.topicPartition, baseSequence); reopened = true; - recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional); + recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional()); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 5e1795cb2a1..45332dff391 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -818,7 +818,7 @@ public class RecordAccumulator { } private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPartition tp) { -ProducerIdAndEpoch producerIdAndEpoch = null; +ProducerIdAndEpoch producerIdAndEpoch; if (transactionManager != null) { if (!transactionManager.isSendToPartitionAllowed(tp)) return true; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 3eaaca04aa6..1d343484388 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -48,7 +48,6 @@ import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator; import org.apache.kafka.common.message.InitProducerIdRequestData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; @@ -84,7 +83,6 @@ import java.util.OptionalInt; import java.util.OptionalLong; import java.util.PriorityQueue; import java.util.Set; -import java.util.SortedSet; import java.util.function.Supplier; /** @@ -92,7 +90,6 @@ import java.util.function.Supplier; */ public class TransactionManager { private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1; -static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1; private final Logger log; private final String transactionalId;
[kafka] branch trunk updated: KAFKA-14767: Fix missing commitId build error after git gc (#13315)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ffcb6d4a1a7 KAFKA-14767: Fix missing commitId build error after git gc (#13315) ffcb6d4a1a7 is described below commit ffcb6d4a1a733c7a7f8b2a5692ad0621c442e12c Author: Greg Harris AuthorDate: Sun Oct 22 11:08:01 2023 -0700 KAFKA-14767: Fix missing commitId build error after git gc (#13315) git gc moves commit hashes from individual .git/refs/heads/ to .git/packed-refs which is not read by the determineCommitId function. Replace the existing lookup within the .git directory with a GrGit lookup that handles packed and unpacked refs transparently. Reviewers: Ismael Juma --- build.gradle | 16 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/build.gradle b/build.gradle index 8fc01156a86..92698f62658 100644 --- a/build.gradle +++ b/build.gradle @@ -109,6 +109,7 @@ ext { throw new GradleException("Unexpected value for scalaOptimizerMode property. Expected one of $scalaOptimizerValues, but received: $userScalaOptimizerMode") generatedDocsDir = new File("${project.rootDir}/docs/generated") + repo = file("$rootDir/.git").isDirectory() ? Grgit.open(currentDir: project.getRootDir()) : null commitId = determineCommitId() } @@ -164,16 +165,8 @@ def determineCommitId() { def takeFromHash = 16 if (project.hasProperty('commitId')) { commitId.take(takeFromHash) - } else if (file("$rootDir/.git/HEAD").exists()) { -def headRef = file("$rootDir/.git/HEAD").text -if (headRef.contains('ref: ')) { - headRef = headRef.replaceAll('ref: ', '').trim() - if (file("$rootDir/.git/$headRef").exists()) { -file("$rootDir/.git/$headRef").text.trim().take(takeFromHash) - } -} else { - headRef.trim().take(takeFromHash) -} + } else if (repo != null) { +repo.head().id.take(takeFromHash) } else { "unknown" } @@ -181,7 +174,7 @@ def determineCommitId() { apply from: file('wrapper.gradle') -if (file('.git').exists()) { +if (repo != null) { rat { dependsOn subprojects.collect { it.tasks.matching { @@ -195,7 +188,6 @@ if (file('.git').exists()) { // Exclude everything under the directory that git should be ignoring via .gitignore or that isn't checked in. These // restrict us only to files that are checked in or are staged. -def repo = Grgit.open(currentDir: project.getRootDir()) excludes = new ArrayList(repo.clean(ignore: false, directories: true, dryRun: true)) // And some of the files that we have checked in should also be excluded from this check excludes.addAll([
[kafka] branch trunk updated: MINOR: Rewrite/Move KafkaNetworkChannel to the `raft` module (#14559)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 69e591db3a7 MINOR: Rewrite/Move KafkaNetworkChannel to the `raft` module (#14559) 69e591db3a7 is described below commit 69e591db3a7329a8bb984f068596d8658a8618b3 Author: Ismael Juma AuthorDate: Mon Oct 16 20:10:31 2023 -0700 MINOR: Rewrite/Move KafkaNetworkChannel to the `raft` module (#14559) This is now possible since `InterBrokerSend` was moved from `core` to `server-common`. Also rewrite/move `KafkaNetworkChannelTest`. The scala version of `KafkaNetworkChannelTest` passed with the changes here (before I deleted it). Reviewers: Justine Olshan , José Armando GarcÃa Sancio --- checkstyle/import-control.xml | 1 + .../scala/kafka/raft/KafkaNetworkChannel.scala | 191 core/src/main/scala/kafka/raft/RaftManager.scala | 2 +- .../unit/kafka/raft/KafkaNetworkChannelTest.scala | 316 .../org/apache/kafka/raft/KafkaNetworkChannel.java | 183 .../java/org/apache/kafka/raft/NetworkChannel.java | 6 +- .../apache/kafka/raft/KafkaNetworkChannelTest.java | 323 + 7 files changed, 510 insertions(+), 512 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 888e8a41ae8..42488c3225f 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -406,6 +406,7 @@ + diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala deleted file mode 100644 index 7c00961d1dc..000 --- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.raft - -import kafka.utils.Logging -import org.apache.kafka.clients.{ClientResponse, KafkaClient} -import org.apache.kafka.common.Node -import org.apache.kafka.common.message._ -import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} -import org.apache.kafka.common.requests._ -import org.apache.kafka.common.utils.Time -import org.apache.kafka.raft.RaftConfig.InetAddressSpec -import org.apache.kafka.raft.{NetworkChannel, RaftRequest, RaftResponse, RaftUtil} -import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler} - -import java.util -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.AtomicInteger -import scala.collection.mutable - -object KafkaNetworkChannel { - - private[raft] def buildRequest(requestData: ApiMessage): AbstractRequest.Builder[_ <: AbstractRequest] = { -requestData match { - case voteRequest: VoteRequestData => -new VoteRequest.Builder(voteRequest) - case beginEpochRequest: BeginQuorumEpochRequestData => -new BeginQuorumEpochRequest.Builder(beginEpochRequest) - case endEpochRequest: EndQuorumEpochRequestData => -new EndQuorumEpochRequest.Builder(endEpochRequest) - case fetchRequest: FetchRequestData => -new FetchRequest.SimpleBuilder(fetchRequest) - case fetchSnapshotRequest: FetchSnapshotRequestData => -new FetchSnapshotRequest.Builder(fetchSnapshotRequest) - case _ => -throw new IllegalArgumentException(s"Unexpected type for requestData: $requestData") -} - } - -} - -private[raft] class RaftSendThread( - name: String, - networkClient: KafkaClient, - requestTimeoutMs: Int, - time: Time, - isInterruptible: Boolean = true -) extends InterBrokerSendThread( - name, - networkClient, - requestTimeoutMs, - time, - isInterruptible -) { - private val queue = new ConcurrentLinkedQueue[RequestAndCompletionHandler]() - - def generateRequests(): util.Collection[RequestAndCompletionHandler] = { -val list = new util.ArrayList[RequestAndCompletionHandler]() -while (true) { - val request = queue.poll() - if (request == null) { -
[kafka] branch spotbugs deleted (was f32913ab227)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch spotbugs in repository https://gitbox.apache.org/repos/asf/kafka.git was f32913ab227 MINOR: Upgrade spotbugs dependency The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[kafka] branch add-github-stale-action deleted (was b86d549f0af)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch add-github-stale-action in repository https://gitbox.apache.org/repos/asf/kafka.git was b86d549f0af Update stale.yml The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[kafka] branch stale-patch deleted (was d13ce5819b2)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch stale-patch in repository https://gitbox.apache.org/repos/asf/kafka.git was d13ce5819b2 Increase Github API operations for stale PR check The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[kafka] branch producer-idempotence-upgrade-docs-tweaks-2 deleted (was 8b410edb7a2)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch producer-idempotence-upgrade-docs-tweaks-2 in repository https://gitbox.apache.org/repos/asf/kafka.git was 8b410edb7a2 Fix id/href The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[kafka] branch minor-stale-pr-action deleted (was 303b87556fd)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch minor-stale-pr-action in repository https://gitbox.apache.org/repos/asf/kafka.git was 303b87556fd Update Stale message The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[kafka] branch java-21-readme deleted (was 94c6cfa66ca)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch java-21-readme in repository https://gitbox.apache.org/repos/asf/kafka.git was 94c6cfa66ca MINOR: Replace Java 20 with Java 21 in `README.md` The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[kafka] branch jenkinsfile-jdk-names deleted (was 62cf394a26a)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch jenkinsfile-jdk-names in repository https://gitbox.apache.org/repos/asf/kafka.git was 62cf394a26a Fix maven name too The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[kafka] branch build-dep-update-3.5 deleted (was fa495feddca)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch build-dep-update-3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git was fa495feddca MINOR: Update build and test dependencies for 3.5 This change permanently discards the following revisions: discard fa495feddca MINOR: Update build and test dependencies for 3.5
[kafka] 01/01: MINOR: Replace Java 20 with Java 21 in `README.md`
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch java-21-readme in repository https://gitbox.apache.org/repos/asf/kafka.git commit 94c6cfa66ca281fa572e75dfa0114432b5b80659 Author: Ismael Juma AuthorDate: Mon Sep 25 20:45:10 2023 -0700 MINOR: Replace Java 20 with Java 21 in `README.md` I intended to include this in 99e6f12dd099, but somehow missed it. I will wait for KAFKA-15943 before updating the site docs with Java 21 (we never added Java 20 there). --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 49dd49ef659..1a645dc409f 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ See our [web site](https://kafka.apache.org) for details on the project. You need to have [Java](http://www.oracle.com/technetwork/java/javase/downloads/index.html) installed. -We build and test Apache Kafka with Java 8, 11, 17 and 20. We set the `release` parameter in javac and scalac +We build and test Apache Kafka with Java 8, 11, 17 and 21. We set the `release` parameter in javac and scalac to `8` to ensure the generated binaries are compatible with Java 8 or higher (independently of the Java version used for compilation). Java 8 support has been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0 (see [KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223) for more details).
[kafka] branch java-21-readme created (now 94c6cfa66ca)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch java-21-readme in repository https://gitbox.apache.org/repos/asf/kafka.git at 94c6cfa66ca MINOR: Replace Java 20 with Java 21 in `README.md` This branch includes the following new commits: new 94c6cfa66ca MINOR: Replace Java 20 with Java 21 in `README.md` The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[kafka] branch trunk updated: KAFKA-15485: Support Java 21 (3/3) (#14433)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 99e6f12dd09 KAFKA-15485: Support Java 21 (3/3) (#14433) 99e6f12dd09 is described below commit 99e6f12dd09952fccde8f4826b65cadab332c2dc Author: Ismael Juma AuthorDate: Mon Sep 25 05:17:08 2023 -0700 KAFKA-15485: Support Java 21 (3/3) (#14433) * Update CI to build with Java 21 instead of Java 20 * Disable spotbugs when building with Java 21 as it doesn't support it yet (filed KAFKA-15492 for addressing this) * Disable SslTransportLayerTest.testValidEndpointIdentificationCN with Java 21 (same as Java 20) Reviewers: Divij Vaidya --- Jenkinsfile| 6 +++--- build.gradle | 18 -- .../kafka/common/network/SslTransportLayerTest.java| 2 +- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index d5d8909047f..50b7f6a298e 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -156,10 +156,10 @@ pipeline { } } -stage('JDK 20 and Scala 2.13') { +stage('JDK 21 and Scala 2.13') { agent { label 'ubuntu' } tools { -jdk 'jdk_20_latest' +jdk 'jdk_21_latest' } options { timeout(time: 8, unit: 'HOURS') @@ -171,7 +171,7 @@ pipeline { steps { doValidation() doTest(env) -echo 'Skipping Kafka Streams archetype test for Java 20' +echo 'Skipping Kafka Streams archetype test for Java 21' } } } diff --git a/build.gradle b/build.gradle index bb594af3dc5..f21be5ca68f 100644 --- a/build.gradle +++ b/build.gradle @@ -232,7 +232,10 @@ subprojects { apply plugin: 'java-library' apply plugin: 'checkstyle' - apply plugin: "com.github.spotbugs" + + // spotbugs doesn't support Java 21 yet + if (!JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_21)) +apply plugin: "com.github.spotbugs" // We use the shadow plugin for the jmh-benchmarks module and the `-all` jar can get pretty large, so // don't publish it @@ -702,12 +705,15 @@ subprojects { test.dependsOn('checkstyleMain', 'checkstyleTest') - spotbugs { -toolVersion = versions.spotbugs -excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml") -ignoreFailures = false + // spotbugs doesn't support Java 21 yet + if (!JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_21)) { +spotbugs { + toolVersion = versions.spotbugs + excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml") + ignoreFailures = false +} +test.dependsOn('spotbugsMain') } - test.dependsOn('spotbugsMain') tasks.withType(com.github.spotbugs.snom.SpotBugsTask) { reports { diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index eb104a2dc04..f49bf868a46 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -195,7 +195,7 @@ public class SslTransportLayerTest { */ @ParameterizedTest @ArgumentsSource(SslTransportLayerArgumentsProvider.class) -@DisabledOnJre(value = JRE.JAVA_20, disabledReason = "KAFKA-15117") +@DisabledOnJre(value = {JRE.JAVA_20, JRE.JAVA_21}, disabledReason = "KAFKA-15117") public void testValidEndpointIdentificationCN(Args args) throws Exception { args.serverCertStores = certBuilder(true, "localhost", args.useInlinePem).build(); args.clientCertStores = certBuilder(false, "localhost", args.useInlinePem).build();
[kafka] branch trunk updated: MINOR: Update to Scala 2.13.12 (#14430)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7ba6d7a0b43 MINOR: Update to Scala 2.13.12 (#14430) 7ba6d7a0b43 is described below commit 7ba6d7a0b439cd7be7918b77c6b08425d6b37526 Author: Ismael Juma AuthorDate: Sun Sep 24 06:05:12 2023 -0700 MINOR: Update to Scala 2.13.12 (#14430) It offers a quickfix action for certain errors, includes a number of bug fixes and it introduces a new warning by default (https://github.com/scala/scala/pull/10462). In addition to the scala version bump, we also fix the new compiler warnings and bump the scalafmt version (the previous version failed with the new scala version). Release notes: https://github.com/scala/scala/releases/tag/v2.13.12 Reviewers: Divij Vaidya , Satish Duggana --- LICENSE-binary | 4 ++-- bin/kafka-run-class.sh | 2 +- bin/windows/kafka-run-class.bat | 2 +- checkstyle/.scalafmt.conf| 4 ++-- core/src/main/scala/kafka/log/LogCleaner.scala | 4 ++-- .../test/scala/other/kafka/ReplicationQuotasTestRig.scala| 7 +++ core/src/test/scala/unit/kafka/log/LogLoaderTest.scala | 4 ++-- core/src/test/scala/unit/kafka/log/TimeIndexTest.scala | 2 +- .../src/test/scala/unit/kafka/network/SocketServerTest.scala | 4 ++-- .../test/scala/unit/kafka/server/ReplicaManagerTest.scala| 12 ++-- gradle.properties| 2 +- gradle/dependencies.gradle | 4 ++-- 12 files changed, 25 insertions(+), 26 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index 73271b802f2..d453408a079 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -255,9 +255,9 @@ reflections-0.10.2 reload4j-1.2.25 rocksdbjni-7.9.2 scala-collection-compat_2.13-2.10.0 -scala-library-2.13.11 +scala-library-2.13.12 scala-logging_2.13-3.9.4 -scala-reflect-2.13.11 +scala-reflect-2.13.12 scala-java8-compat_2.13-1.0.2 snappy-java-1.1.10.3 swagger-annotations-2.2.8 diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 9ab96d7f2e1..8e66c49391d 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -48,7 +48,7 @@ should_include_file() { base_dir=$(dirname $0)/.. if [ -z "$SCALA_VERSION" ]; then - SCALA_VERSION=2.13.11 + SCALA_VERSION=2.13.12 if [[ -f "$base_dir/gradle.properties" ]]; then SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2` fi diff --git a/bin/windows/kafka-run-class.bat b/bin/windows/kafka-run-class.bat index 42903fba956..18310057f26 100755 --- a/bin/windows/kafka-run-class.bat +++ b/bin/windows/kafka-run-class.bat @@ -27,7 +27,7 @@ set BASE_DIR=%CD% popd IF ["%SCALA_VERSION%"] EQU [""] ( - set SCALA_VERSION=2.13.11 + set SCALA_VERSION=2.13.12 ) IF ["%SCALA_BINARY_VERSION%"] EQU [""] ( diff --git a/checkstyle/.scalafmt.conf b/checkstyle/.scalafmt.conf index a6fae4ab32d..54533046741 100644 --- a/checkstyle/.scalafmt.conf +++ b/checkstyle/.scalafmt.conf @@ -12,11 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -version = 3.5.9 +version = 3.7.14 runner.dialect = scala213 docstrings.style = Asterisk docstrings.wrap = false maxColumn = 120 continuationIndent.defnSite = 2 assumeStandardLibraryStripMargin = true -rewrite.rules = [SortImports, RedundantBraces, RedundantParens, SortModifiers] \ No newline at end of file +rewrite.rules = [SortImports, RedundantBraces, RedundantParens, SortModifiers] diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index ff8a687b5ee..454cbaa2b35 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -747,7 +747,7 @@ private[log] class Cleaner(val id: Int, val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata) if (batch.isControlBatch) - discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime + discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= this.currentTime else discardBatchRecords = canDiscardBatch @@ -784,7 +784,7 @@ private[log] class Cleaner(val id: Int, else if (batch.isControlBatch) true else - Cl
[kafka] branch trunk updated: MINOR: Upgrade gradle, plugins and test libraries (#14431)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d1ad1d7b701 MINOR: Upgrade gradle, plugins and test libraries (#14431) d1ad1d7b701 is described below commit d1ad1d7b7013ea2c24a3f71c485a2a914adf1348 Author: Ismael Juma AuthorDate: Sun Sep 24 06:01:28 2023 -0700 MINOR: Upgrade gradle, plugins and test libraries (#14431) To prepare Java 21 support, upgrade gradle, its plugins and test libraries. Release notes for major and minor updates included below. The highlight is faster Java compilation by not shutting down the daemon at the end of the build. Gradle's internal performance tests show up to a 30% build time improvement for builds that are dominated by compiling Java sources. Mockito turns out to be a complex case where we use one of 3 different versions depending on the Scala and Java versions used. In addition, the default mocking strategy changed from `subclass` to `inline` in Mockito 5.0. We now use `inline` across the board (we previously used both `subclass` and `inline`). See comments in the relevant parts of the code for more details. * Gradle 8.3 release notes: https://docs.gradle.org/8.3/release-notes.html * jmh 1.37: virtual thread support and various bug fixes * JUnit 5.10.0 release notes: https://junit.org/junit5/docs/5.10.0/release-notes/index.html * Mockito 5.x release notes: * https://github.com/mockito/mockito/releases/tag/v5.0.0 * https://github.com/mockito/mockito/releases/tag/v5.1.0 * https://github.com/mockito/mockito/releases/tag/v5.2.0 * https://github.com/mockito/mockito/releases/tag/v5.3.0 * https://github.com/mockito/mockito/releases/tag/v5.4.0 * https://github.com/mockito/mockito/releases/tag/v5.5.0 * EasyMock 5.2.0 release notes: https://github.com/easymock/easymock/releases/tag/easymock-5.2.0 Reviewers: Divij Vaidya --- build.gradle | 21 --- gradle/dependencies.gradle | 35 ++-- gradle/wrapper/gradle-wrapper.properties | 5 +++-- gradlew | 7 +-- 4 files changed, 41 insertions(+), 27 deletions(-) diff --git a/build.gradle b/build.gradle index 76e8c862399..bb594af3dc5 100644 --- a/build.gradle +++ b/build.gradle @@ -31,15 +31,15 @@ buildscript { } plugins { - id 'com.github.ben-manes.versions' version '0.47.0' + id 'com.github.ben-manes.versions' version '0.48.0' id 'idea' id 'jacoco' id 'java-library' id 'org.owasp.dependencycheck' version '8.2.1' - id 'org.nosphere.apache.rat' version "0.8.0" - id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.8" + id 'org.nosphere.apache.rat' version "0.8.1" + id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.16" - id "com.github.spotbugs" version '5.0.13' apply false + id "com.github.spotbugs" version '5.1.3' apply false id 'org.scoverage' version '7.0.1' apply false id 'com.github.johnrengelman.shadow' version '8.1.1' apply false id 'com.diffplug.spotless' version '6.14.0' apply false // 6.14.1 and newer require Java 11 at compile time, so we can't upgrade until AK 4.0 @@ -909,7 +909,6 @@ project(':core') { testImplementation project(':storage:api').sourceSets.test.output testImplementation libs.bcpkix testImplementation libs.mockitoCore -testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc. testImplementation(libs.apacheda) { exclude group: 'xml-apis', module: 'xml-apis' // `mina-core` is a transitive dependency for `apacheds` and `apacheda`. @@ -1194,7 +1193,6 @@ project(':metadata') { testImplementation libs.jqwik testImplementation libs.hamcrest testImplementation libs.mockitoCore -testImplementation libs.mockitoInline testImplementation libs.slf4jlog4j testImplementation project(':clients').sourceSets.test.output testImplementation project(':raft').sourceSets.test.output @@ -1354,7 +1352,7 @@ project(':clients') { testImplementation libs.jose4j testImplementation libs.junitJupiter testImplementation libs.log4j -testImplementation libs.mockitoInline +testImplementation libs.mockitoCore testRuntimeOnly libs.slf4jlog4j testRuntimeOnly libs.jacksonDatabind @@ -1579,7 +1577,6 @@ project(':server-common
[kafka] branch trunk updated: KAFKA-15485: Fix "this-escape" compiler warnings introduced by JDK 21 (1/N) (#14427)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 98febb989ab KAFKA-15485: Fix "this-escape" compiler warnings introduced by JDK 21 (1/N) (#14427) 98febb989ab is described below commit 98febb989abd1bdb624420f21122c477f2614a08 Author: Ismael Juma AuthorDate: Sun Sep 24 05:59:29 2023 -0700 KAFKA-15485: Fix "this-escape" compiler warnings introduced by JDK 21 (1/N) (#14427) This is one of the steps required for kafka to compile with Java 21. For each case, one of the following fixes were applied: 1. Suppress warning if fixing would potentially result in an incompatible change (for public classes) 2. Add final to one or more methods so that the escape is not possible 3. Replace method calls with direct field access. In addition, we also fix a couple of compiler warnings related to deprecated references in the `core` module. See the following for more details regarding the new lint warning: https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831 Reviewers: Divij Vaidya , Satish Duggana , Chris Egerton --- .../clients/consumer/internals/AbstractCoordinator.java | 2 +- .../consumer/internals/DefaultBackgroundThread.java | 4 ++-- .../consumer/internals/MembershipManagerImpl.java| 2 +- .../consumer/internals/OffsetsRequestManager.java| 3 ++- .../org/apache/kafka/clients/producer/KafkaProducer.java | 2 +- .../org/apache/kafka/common/config/AbstractConfig.java | 2 +- .../java/org/apache/kafka/common/config/ConfigDef.java | 5 +++-- .../java/org/apache/kafka/common/metrics/Metrics.java| 1 + .../org/apache/kafka/common/protocol/types/Schema.java | 1 + .../kafka/common/record/LazyDownConversionRecords.java | 2 +- .../apache/kafka/common/record/MemoryRecordsBuilder.java | 2 +- .../apache/kafka/common/requests/DeleteAclsResponse.java | 2 +- .../kafka/common/requests/DescribeAclsResponse.java | 2 +- .../security/authenticator/SaslClientAuthenticator.java | 1 + .../internals/OAuthBearerSaslClientProvider.java | 1 + .../internals/OAuthBearerSaslServerProvider.java | 1 + .../internals/unsecured/OAuthBearerUnsecuredJws.java | 16 .../plain/internals/PlainSaslServerProvider.java | 2 +- .../scram/internals/ScramSaslClientProvider.java | 2 +- .../scram/internals/ScramSaslServerProvider.java | 2 +- .../kafka/common/utils/ImplicitLinkedHashCollection.java | 1 + .../java/org/apache/kafka/common/utils/KafkaThread.java | 2 ++ .../org/apache/kafka/common/utils/PureJavaCrc32C.java| 2 +- .../org/apache/kafka/clients/admin/MockAdminClient.java | 4 +++- .../org/apache/kafka/common/network/NioEchoServer.java | 1 + .../org/apache/kafka/common/network/PlaintextSender.java | 1 + .../java/org/apache/kafka/common/network/SslSender.java | 1 + .../common/security/ssl/DefaultSslEngineFactoryTest.java | 1 + .../kafka/common/security/ssl/mock/TestProvider.java | 1 + .../org/apache/kafka/common/utils/MockScheduler.java | 1 + .../apache/kafka/connect/json/JsonConverterConfig.java | 1 + .../kafka/connect/mirror/MirrorConnectorConfig.java | 1 + .../apache/kafka/connect/mirror/MirrorMakerConfig.java | 1 + .../kafka/connect/runtime/SourceConnectorConfig.java | 1 + .../java/org/apache/kafka/connect/runtime/Worker.java| 1 + .../org/apache/kafka/connect/runtime/WorkerConfig.java | 1 + .../apache/kafka/connect/runtime/WorkerConnector.java| 6 +++--- .../org/apache/kafka/connect/runtime/WorkerInfo.java | 4 ++-- .../connect/runtime/distributed/DistributedConfig.java | 1 + .../apache/kafka/connect/runtime/isolation/Plugins.java | 1 + .../apache/kafka/connect/runtime/rest/RestServer.java| 6 +++--- .../kafka/connect/storage/KafkaConfigBackingStore.java | 1 + .../src/main/java/kafka/log/remote/RemoteLogManager.java | 7 --- .../org/apache/kafka/message/MessageDataGenerator.java | 2 +- .../apache/kafka/image/loader/MetadataBatchLoader.java | 2 +- .../org/apache/kafka/raft/MockExpirationService.java | 1 + .../apache/kafka/server/fault/FaultHandlerException.java | 2 ++ .../org/apache/kafka/server/util/ShutdownableThread.java | 1 + .../org/apache/kafka/server/util/timer/TimerTask.java| 2 +- .../apache/kafka/server/util/timer/TimerTaskEntry.java | 1 + .../java/org/apache/kafka/timeline/TimelineInteger.java | 1 + .../java/org/apache/kafka/timeline/TimelineLong.java | 1 + .../java/org/apache/kafka/timeline/TimelineObject.java | 1 + .../test/java/org/apache/kafka/server/util/MockTime.java | 1 + .../storage/FileBasedRemoteLogMetadataCache.java | 3 ++- .../s
[kafka] branch trunk updated: MINOR: Fix MiniKdc Java 17 issue in system tests (#14011)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ac6a536c7c1 MINOR: Fix MiniKdc Java 17 issue in system tests (#14011) ac6a536c7c1 is described below commit ac6a536c7c1e9ea8c5889669b59816d84247224e Author: Maros Orsak AuthorDate: Mon Aug 7 15:19:55 2023 +0200 MINOR: Fix MiniKdc Java 17 issue in system tests (#14011) Kafka system tests with Java version 17 are failing on this issue: ```python TimeoutError("MiniKdc didn't finish startup",) Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.6/site-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.6/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/sanity_checks/test_verifiable_producer.py", line 74, in test_simple_run self.kafka.start() File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 635, in start self.start_minikdc_if_necessary(add_principals) File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 596, in start_minikdc_if_necessary self.minikdc.start() File "/usr/local/lib/python3.6/site-packages/ducktape/services/service.py", line 265, in start self.start_node(node, **kwargs) File "/opt/kafka-dev/tests/kafkatest/services/security/minikdc.py", line 114, in start_node monitor.wait_until("MiniKdc Running", timeout_sec=60, backoff_sec=1, err_msg="MiniKdc didn't finish startup") File "/usr/local/lib/python3.6/site-packages/ducktape/cluster/remoteaccount.py", line 754, in wait_until allow_fail=True) == 0, **kwargs) File "/usr/local/lib/python3.6/site-packages/ducktape/utils/util.py", line 58, in wait_until raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from last_exception ducktape.errors.TimeoutError: MiniKdc didn't finish startup ``` Specifically, when one runs the test cases and looks at the logs of the MiniKdc: ```java Exception in thread "main" java.lang.IllegalAccessException: class kafka.security.minikdc.MiniKdc cannot access class sun.security.krb5.Config (in module java.security.jgss) because module java.security.jgss does not export sun.security.krb5 to unnamed module @24959ca4 at java.base/jdk.internal.reflect.Reflection.newIllegalAccessException(Reflection.java:392) at java.base/java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:674) at java.base/java.lang.reflect.Method.invoke(Method.java:560) at kafka.security.minikdc.MiniKdc.refreshJvmKerberosConfig(MiniKdc.scala:268) at kafka.security.minikdc.MiniKdc.initJvmKerberosConfig(MiniKdc.scala:245) at kafka.security.minikdc.MiniKdc.start(MiniKdc.scala:123) at kafka.security.minikdc.MiniKdc$.start(MiniKdc.scala:375) at kafka.security.minikdc.MiniKdc$.main(MiniKdc.scala:366) at kafka.security.minikdc.MiniKdc.main(MiniKdc.scala) ``` This error is caused by the fact that sun.security module is no longer supported in Java 16 and higher. Related to the [1]. There are two ways how to solve it, and I present one of them. The second way is to export the ENV variable during the deployment of the containers using Ducktape in [2]. [1] - https://openjdk.org/jeps/396 [2] - https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak#L308 Reviewers: Ismael Juma , Luke Chen --- tests/kafkatest/services/security/minikdc.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/kafkatest/services/security/minikdc.py b/tests/kafkatest/services/security/minikdc.py index 23327db9883..29ae0fb51b9 100644 --- a/tests/kafkatest/services/security/minikdc.py +++ b/tests/kafkatest/services/security/minikdc.py @@ -107,6 +107,8 @@ class MiniKdc(KafkaPathResolverMixin, Service): cmd = "for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_libs_jar cmd += " for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_dependant_test_libs_jar cmd += " export CLASSPATH;" +# avoids java.lang.IllegalAccessException due to usage of internal `sun.security.krb5.Config` in `MiniKdc` +cmd += " export KAFKA_OPTS=\"--add-exports java.security.jgss/sun.sec
[kafka] branch trunk updated: KAFKA-15141: Initialize logger statically on hot codepaths (#13949)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 29f36d733b4 KAFKA-15141: Initialize logger statically on hot codepaths (#13949) 29f36d733b4 is described below commit 29f36d733b4898257b148ddc4270c1b2a291cb7a Author: gaurav-narula <97168911+gaurav-nar...@users.noreply.github.com> AuthorDate: Wed Jul 19 20:24:40 2023 +0100 KAFKA-15141: Initialize logger statically on hot codepaths (#13949) Log4j based loggers use `org.apache.logging.log4j.spi.AbstractLoggerAdapter::getContext` which invokes StackLocatorUtil to walk the stacktrace. This operation is quite CPU intensive and is performed each time during instantiation. To avoid walking the stack often, this change uses a static variable to initialize the logger for a few classes which seem to be instantiated frequently. Reviewers: Divij Vaidya , Ismael Juma --- .../main/scala/kafka/server/DelayedProduce.scala| 9 +++-- core/src/main/scala/kafka/server/FetchSession.scala | 21 + 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 48a0ccbcff4..2f1261ada23 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -19,7 +19,7 @@ package kafka.server import java.util.concurrent.TimeUnit import java.util.concurrent.locks.Lock - +import com.typesafe.scalalogging.Logger import com.yammer.metrics.core.Meter import kafka.utils.Implicits._ import kafka.utils.Pool @@ -47,6 +47,10 @@ case class ProduceMetadata(produceRequiredAcks: Short, override def toString = s"[requiredAcks: $produceRequiredAcks, partitionStatus: $produceStatus]" } +object DelayedProduce { + private final val logger = Logger(classOf[DelayedProduce]) +} + /** * A delayed produce operation that can be created by the replica manager and watched * in the produce operation purgatory @@ -58,6 +62,8 @@ class DelayedProduce(delayMs: Long, lockOpt: Option[Lock] = None) extends DelayedOperation(delayMs, lockOpt) { + override lazy val logger = DelayedProduce.logger + // first update the acks pending variable according to the error code produceMetadata.produceStatus.forKeyValue { (topicPartition, status) => if (status.responseStatus.error == Errors.NONE) { @@ -147,4 +153,3 @@ object DelayedProduceMetrics { partitionExpirationMeters.getAndMaybePut(partition).mark() } } - diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala index 8725a09f090..3f399311172 100644 --- a/core/src/main/scala/kafka/server/FetchSession.scala +++ b/core/src/main/scala/kafka/server/FetchSession.scala @@ -17,6 +17,7 @@ package kafka.server +import com.typesafe.scalalogging.Logger import kafka.utils.Logging import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.message.FetchResponseData @@ -356,12 +357,19 @@ class SessionErrorContext(val error: Errors, } } +object SessionlessFetchContext { + private final val logger = Logger(classOf[SessionlessFetchContext]) +} + /** * The fetch context for a sessionless fetch request. * * @param fetchData The partition data from the fetch request. */ class SessionlessFetchContext(val fetchData: util.Map[TopicIdPartition, FetchRequest.PartitionData]) extends FetchContext { + + override lazy val logger = SessionlessFetchContext.logger + override def getFetchOffset(part: TopicIdPartition): Option[Long] = Option(fetchData.get(part)).map(_.fetchOffset) @@ -379,6 +387,10 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicIdPartition, FetchReq } } +object FullFetchContext { + private final val logger = Logger(classOf[FullFetchContext]) +} + /** * The fetch context for a full fetch request. * @@ -395,6 +407,9 @@ class FullFetchContext(private val time: Time, private val fetchData: util.Map[TopicIdPartition, FetchRequest.PartitionData], private val usesTopicIds: Boolean, private val isFromFollower: Boolean) extends FetchContext { + + override lazy val logger = FullFetchContext.logger + override def getFetchOffset(part: TopicIdPartition): Option[Long] = Option(fetchData.get(part)).map(_.fetchOffset) @@ -423,6 +438,10 @@ class FullFetchContext(private val time: Time, } } +object IncrementalFetchContext { + private val logger = Logger(classOf[IncrementalFetchContext]) +} + /** * The fetch context for an incremental fetch request. * @@ -436,6 +455,8 @@ class IncrementalFetchConte
[kafka] branch trunk updated: MINOR: Add JDK 20 CI build and remove some branch builds (#12948)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 1f4cbc5d532 MINOR: Add JDK 20 CI build and remove some branch builds (#12948) 1f4cbc5d532 is described below commit 1f4cbc5d53259031123b6e9e6bb9a5bbe1e084e8 Author: Ismael Juma AuthorDate: Fri Jun 30 01:12:00 2023 -0700 MINOR: Add JDK 20 CI build and remove some branch builds (#12948) It's good for us to add support for Java 20 in preparation for Java 21 - the next LTS. Given that Scala 2.12 support has been deprecated, a Scala 2.12 variant is not included. Also remove some branch builds that add load to the CI, but have low value: JDK 8 & Scala 2.13 (JDK 8 support has been deprecated), JDK 11 & Scala 2.12 (Scala 2.12 support has been deprecated) and JDK 17 & Scala 2.12 (Scala 2.12 support has been deprecated). A newer version of Mockito (4.9.0 -> 4.11.0) is required for Java 20 support, but we only use it with Scala 2.13+ since it causes compilation errors with Scala 2.12. Similarly, we upgrade easymock when the Java version is 16 or newer as it's incompatible with powermock (which doesn't support Java 16 or newer). Filed KAFKA-15117 for a test that fails with Java 20 (SslTransportLayerTest.testValidEndpointIdentificationCN). Finally, fixed some lossy conversions that were added after #13582 was submitted. Reviewers: Ismael Juma --- Jenkinsfile| 66 ++ build.gradle | 3 + .../kafka/common/utils/ChunkedBytesStream.java | 14 +++-- .../common/network/SslTransportLayerTest.java | 3 + .../kafka/connect/mirror/OffsetSyncStoreTest.java | 2 +- .../kafka/server/KafkaRequestHandlerTest.scala | 2 +- gradle/dependencies.gradle | 28 +++-- .../CachingInMemoryKeyValueStoreTest.java | 2 +- .../CachingPersistentWindowStoreTest.java | 2 +- ...imeOrderedCachingPersistentWindowStoreTest.java | 2 +- .../internals/TimeOrderedWindowStoreTest.java | 2 +- 11 files changed, 47 insertions(+), 79 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index fbd3fce4b5f..70f956bb4fc 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -155,79 +155,23 @@ pipeline { echo 'Skipping Kafka Streams archetype test for Java 17' } } - -// To avoid excessive Jenkins resource usage, we only run the stages -// above at the PR stage. The ones below are executed after changes -// are pushed to trunk and/or release branches. We achieve this via -// the `when` clause. - -stage('JDK 8 and Scala 2.13') { - when { -not { changeRequest() } -beforeAgent true - } - agent { label 'ubuntu' } - tools { -jdk 'jdk_1.8_latest' -maven 'maven_3_latest' - } - options { -timeout(time: 8, unit: 'HOURS') -timestamps() - } - environment { -SCALA_VERSION=2.13 - } - steps { -doValidation() -doTest(env) -tryStreamsArchetype() - } -} - -stage('JDK 11 and Scala 2.12') { - when { -not { changeRequest() } -beforeAgent true - } - agent { label 'ubuntu' } - tools { -jdk 'jdk_11_latest' - } - options { -timeout(time: 8, unit: 'HOURS') -timestamps() - } - environment { -SCALA_VERSION=2.12 - } - steps { -doValidation() -doTest(env) -echo 'Skipping Kafka Streams archetype test for Java 11' - } -} -stage('JDK 17 and Scala 2.12') { - when { -not { changeRequest() } -beforeAgent true - } +stage('JDK 20 and Scala 2.13') { agent { label 'ubuntu' } tools { -jdk 'jdk_17_latest' +jdk 'jdk_20_latest' } options { -timeout(time: 8, unit: 'HOURS') +timeout(time: 8, unit: 'HOURS') timestamps() } environment { -SCALA_VERSION=2.12 +SCALA_VERSION=2.13 } steps { doValidation() doTest(env) -echo 'Skipping Kafka Streams archetype test for Java 17' +ech
[kafka] branch trunk updated: MINOR: Fix lossy conversions flagged by Java 20 (#13582)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9c8aaa2c35a MINOR: Fix lossy conversions flagged by Java 20 (#13582) 9c8aaa2c35a is described below commit 9c8aaa2c35aabb09bd2d5c3d28d1b4587818b419 Author: Ismael Juma AuthorDate: Thu Jun 22 08:05:55 2023 -0700 MINOR: Fix lossy conversions flagged by Java 20 (#13582) An example of the warning: > warning: [lossy-conversions] implicit cast from long to int in compound assignment is possibly lossy There should be no change in behavior as part of these changes - runtime logic ensured we didn't run into issues due to the lossy conversions. Reviewers: Divij Vaidya --- .../org/apache/kafka/common/record/CompressionType.java | 16 +--- .../org/apache/kafka/common/record/DefaultRecord.java| 2 +- .../apache/kafka/common/record/DefaultRecordBatch.java | 2 +- .../apache/kafka/common/record/DefaultRecordsSend.java | 2 +- .../java/org/apache/kafka/common/record/FileRecords.java | 7 --- .../common/record/LazyDownConversionRecordsSend.java | 5 +++-- .../org/apache/kafka/common/record/LegacyRecord.java | 4 ++-- .../org/apache/kafka/common/record/MemoryRecords.java| 8 +++- .../org/apache/kafka/common/record/MultiRecordsSend.java | 2 +- .../java/org/apache/kafka/common/record/RecordsSend.java | 8 .../apache/kafka/common/record/TransferableRecords.java | 2 +- .../apache/kafka/common/record/UnalignedFileRecords.java | 7 --- .../kafka/common/record/UnalignedMemoryRecords.java | 8 +++- .../kafka/common/serialization/ShortDeserializer.java| 2 +- .../main/java/org/apache/kafka/common/utils/Utils.java | 2 +- .../org/apache/kafka/common/compress/KafkaLZ4Test.java | 2 +- .../org/apache/kafka/common/metrics/stats/MeterTest.java | 2 +- .../org/apache/kafka/common/record/FileRecordsTest.java | 10 +- .../kafka/streams/processor/internals/TaskExecutor.java | 2 +- 19 files changed, 47 insertions(+), 46 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 70ffc0ec1bc..a4ebf1648ef 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -37,7 +37,7 @@ import java.util.zip.GZIPOutputStream; * The compression type to use */ public enum CompressionType { -NONE(0, "none", 1.0f) { +NONE((byte) 0, "none", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { return buffer; @@ -50,7 +50,7 @@ public enum CompressionType { }, // Shipped with the JDK -GZIP(1, "gzip", 1.0f) { +GZIP((byte) 1, "gzip", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { try { @@ -92,7 +92,7 @@ public enum CompressionType { // To ensure this, we only reference compression library code from classes that are only invoked when actual usage // happens. -SNAPPY(2, "snappy", 1.0f) { +SNAPPY((byte) 2, "snappy", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { return SnappyFactory.wrapForOutput(buffer); @@ -114,7 +114,7 @@ public enum CompressionType { } }, -LZ4(3, "lz4", 1.0f) { +LZ4((byte) 3, "lz4", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { try { @@ -144,7 +144,7 @@ public enum CompressionType { } }, -ZSTD(4, "zstd", 1.0f) { +ZSTD((byte) 4, "zstd", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { return ZstdFactory.wrapForOutput(buffer); @@ -169,11 +169,13 @@ public enum CompressionType { }; -public final int id; +// compression type is represented by two bits in the attributes field of the record batch header, so `byte` is +// large enough +public final byte id; public final String name; public final float rate; -CompressionType(int id, String name, float rate) { +CompressionType(byte id, String name, float rate) { this.id = id; this.name = name; this.rate = rate; diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/Default
[kafka] branch trunk updated: MINOR: Upgrade Scala for Java 20/21 support (#13840)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new dfaae317b82 MINOR: Upgrade Scala for Java 20/21 support (#13840) dfaae317b82 is described below commit dfaae317b82035323ba9c693b3ad7f02a6a58395 Author: Ismael Juma AuthorDate: Tue Jun 20 10:29:23 2023 -0700 MINOR: Upgrade Scala for Java 20/21 support (#13840) Upgrade to Scala 2.13.11 and Scala 2.12.18. A minor test change was required to fix compilation with Scala 2.13.11. Scala 2.13 release notes: * https://github.com/scala/scala/releases/tag/v2.13.11 Scala 2.12 release notes: * https://github.com/scala/scala/releases/tag/v2.12.16 * https://github.com/scala/scala/releases/tag/v2.12.17 * https://github.com/scala/scala/releases/tag/v2.12.18 Reviewers: Justine Olshan , Josep Prat --- LICENSE-binary | 4 ++-- bin/kafka-run-class.sh | 2 +- bin/windows/kafka-run-class.bat| 2 +- core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | 7 --- gradle.properties | 2 +- gradle/dependencies.gradle | 4 ++-- 6 files changed, 11 insertions(+), 10 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index 249947286cc..916d4192dca 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -253,9 +253,9 @@ plexus-utils-3.3.0 reload4j-1.2.25 rocksdbjni-7.1.2 scala-collection-compat_2.13-2.10.0 -scala-library-2.13.10 +scala-library-2.13.11 scala-logging_2.13-3.9.4 -scala-reflect-2.13.10 +scala-reflect-2.13.11 scala-java8-compat_2.13-1.0.2 snappy-java-1.1.10.1 swagger-annotations-2.2.8 diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 025fd8d79d6..9ab96d7f2e1 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -48,7 +48,7 @@ should_include_file() { base_dir=$(dirname $0)/.. if [ -z "$SCALA_VERSION" ]; then - SCALA_VERSION=2.13.10 + SCALA_VERSION=2.13.11 if [[ -f "$base_dir/gradle.properties" ]]; then SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2` fi diff --git a/bin/windows/kafka-run-class.bat b/bin/windows/kafka-run-class.bat index 3783b3b4951..42903fba956 100755 --- a/bin/windows/kafka-run-class.bat +++ b/bin/windows/kafka-run-class.bat @@ -27,7 +27,7 @@ set BASE_DIR=%CD% popd IF ["%SCALA_VERSION%"] EQU [""] ( - set SCALA_VERSION=2.13.10 + set SCALA_VERSION=2.13.11 ) IF ["%SCALA_BINARY_VERSION%"] EQU [""] ( diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 767709997c8..c3866476a69 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2499,15 +2499,16 @@ class ReplicaManagerTest { time: Time, threadNamePrefix: Option[String], replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = { -new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, replicationQuotaManager, () => metadataCache.metadataVersion(), () => 1) { +val rm = this +new ReplicaFetcherManager(config, rm, metrics, time, threadNamePrefix, replicationQuotaManager, () => metadataCache.metadataVersion(), () => 1) { override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = { val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${sourceBroker.id}, " + s"fetcherId=$fetcherId] ") val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id) val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, config, - replicaManager, quotaManager.follower, () => config.interBrokerProtocolVersion, () => 1) -new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, config, failedPartitions, replicaManager, + rm, quotaManager.follower, () => config.interBrokerProtocolVersion, () => 1) +new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, config, failedPartitions, rm, quotaManager.follower, logContext.logPrefix, () => config.interBrokerProtocolVersion) { override def doWork()
[kafka] branch trunk updated (9a36da12b73 -> a7d0b3f7537)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 9a36da12b73 KAFKA-14462; [8/N] Add ConsumerGroupMember (#13538) add a7d0b3f7537 MINOR: Upgrade gradle to 8.1.1 (#13625) No new revisions were added by this update. Summary of changes: build.gradle | 4 ++-- gradle/dependencies.gradle | 2 +- gradle/wrapper/gradle-wrapper.properties | 4 ++-- gradlew | 9 + 4 files changed, 10 insertions(+), 9 deletions(-)
[kafka] branch 3.5 updated (a36940d3ae2 -> dfbc8814efc)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git from a36940d3ae2 MINOR: improve ProductionExceptionHandler test (#13576) add 239bbdc2009 MINOR: fix some flaky KRaft-related tests (#13543) (#13543) add 74ae5f9a621 KAFKA-8115: Reduce flakiness in Trogdor JsonRestServer shutdown (#12830) add dfbc8814efc Update zstd to 1.5.5 (#13567) No new revisions were added by this update. Summary of changes: .../src/main/scala/kafka/server/SharedServer.scala | 33 +++--- gradle/dependencies.gradle | 2 +- .../apache/kafka/queue/KafkaEventQueueTest.java| 4 +-- .../apache/kafka/trogdor/rest/JsonRestServer.java | 10 +-- 4 files changed, 27 insertions(+), 22 deletions(-)
[kafka] branch trunk updated: Update zstd to 1.5.5 (#13567)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9c12e462106 Update zstd to 1.5.5 (#13567) 9c12e462106 is described below commit 9c12e462106343fbc6af5873074d48f98687af39 Author: Ismael Juma AuthorDate: Sat Apr 15 12:30:22 2023 -0700 Update zstd to 1.5.5 (#13567) 1.5.4 is a large release that offers significant performance improvements across multiple scenarios, as well as new features. 1.5.5 is a smaller release that corrects a rare corruption bug and improves performance in some scenarios. It looks like 1.5.3 was retracted or never released. Zstandard release notes: * 1.5.4: https://github.com/facebook/zstd/releases/tag/v1.5.4 * 1.5.5: https://github.com/facebook/zstd/releases/tag/v1.5.5 zstd-jni diff: https://github.com/luben/zstd-jni/compare/v1.5.2-1...v1.5.5-1 Reviewers: Rajini Sivaram , Divij Vaidya --- gradle/dependencies.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index b3308b6a135..40d408949db 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -126,7 +126,7 @@ versions += [ swaggerJaxrs2: "2.2.8", zinc: "1.8.0", zookeeper: "3.6.4", - zstd: "1.5.2-1" + zstd: "1.5.5-1" ] libs += [ activation: "javax.activation:activation:$versions.activation",
[kafka] branch trunk updated: KAFKA-8115: Reduce flakiness in Trogdor JsonRestServer shutdown (#12830)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new fe375dce547 KAFKA-8115: Reduce flakiness in Trogdor JsonRestServer shutdown (#12830) fe375dce547 is described below commit fe375dce54717f0699e9ab860c4a33669d017f22 Author: Greg Harris AuthorDate: Sat Apr 15 12:21:56 2023 -0700 KAFKA-8115: Reduce flakiness in Trogdor JsonRestServer shutdown (#12830) The GRACEFUL_SHUTDOWN_TIMEOUT_MS for the Trogdor JsonRestServer is 100ms. In heavily loaded CI environments, this timeout can be exceeded. When this happens, it causes the jettyServer.stop() and jettyServer.destroy() calls to throw exceptions, which prevents shutdownExecutor.shutdown() from running. This has the effect of causing the JsonRestServer::waitForShutdown method to block for 1 day, which exceeds the 120s timeout on the CoordinatorTest (and any other test relying on MiniTrogdorCluster). This change makes it such that the graceful shutdown timeout is less likely to be exceeded, and when it is, the timeout does not cause the waitForShutdown method to block for much longer than the graceful shutdown timeout. Reviewers: Ismael Juma --- .../java/org/apache/kafka/trogdor/rest/JsonRestServer.java | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java index e5388f8a6f9..cc3bbc0cc6d 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java @@ -56,7 +56,7 @@ import java.util.concurrent.TimeUnit; public class JsonRestServer { private static final Logger log = LoggerFactory.getLogger(JsonRestServer.class); -private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 100; +private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 10 * 1000; private final ScheduledExecutorService shutdownExecutor; @@ -142,9 +142,13 @@ public class JsonRestServer { } catch (Exception e) { log.error("Unable to stop REST server", e); } finally { -jettyServer.destroy(); +try { +jettyServer.destroy(); +} catch (Exception e) { +log.error("Unable to destroy REST server", e); +} +shutdownExecutor.shutdown(); } -shutdownExecutor.shutdown(); return null; }); }
[kafka] branch trunk updated: KAFKA-14792: Race condition in LazyIndex.get() (#13359)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 77215eded7b KAFKA-14792: Race condition in LazyIndex.get() (#13359) 77215eded7b is described below commit 77215eded7b23aa06f3a4919233df175d5aa3359 Author: Ismael Juma AuthorDate: Tue Mar 7 15:56:24 2023 -0800 KAFKA-14792: Race condition in LazyIndex.get() (#13359) `LazyIndex.get()` has a race condition that can result in a ClassCastException being thrown in some cases. This was introduced when `LazyIndex` was rewritten from Scala to Java. I didn't include a test since it's a bit overkill to add a concurrent test for this. Reviewers: Jun Rao --- .../kafka/storage/internals/log/LazyIndex.java | 23 +- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java index 1172bb596e7..9d0725e0e33 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java @@ -166,20 +166,25 @@ public class LazyIndex { @SuppressWarnings("unchecked") public T get() throws IOException { -if (indexWrapper instanceof IndexValue) -return ((IndexValue) indexWrapper).index; -else if (indexWrapper instanceof IndexFile) { +IndexWrapper wrapper = indexWrapper; +if (wrapper instanceof IndexValue) +return ((IndexValue) wrapper).index; +else { lock.lock(); try { -IndexFile indexFile = (IndexFile) indexWrapper; -IndexValue indexValue = new IndexValue<>(loadIndex(indexFile.file)); -indexWrapper = indexValue; -return indexValue.index; +if (indexWrapper instanceof IndexValue) +return ((IndexValue) indexWrapper).index; +else if (indexWrapper instanceof IndexFile) { +IndexFile indexFile = (IndexFile) indexWrapper; +IndexValue indexValue = new IndexValue<>(loadIndex(indexFile.file)); +indexWrapper = indexValue; +return indexValue.index; +} else +throw new IllegalStateException("Unexpected type for indexWrapper " + indexWrapper.getClass()); } finally { lock.unlock(); } -} else -throw new IllegalStateException("Unexpected type for indexWrapper " + indexWrapper.getClass()); +} } public void updateParentDir(File parentDir) {
[kafka] branch trunk updated: MINOR: Update gradle to 8.0.2 and update several gradle plugins (#13339)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e348da4095e MINOR: Update gradle to 8.0.2 and update several gradle plugins (#13339) e348da4095e is described below commit e348da4095e71f9bd6f6e1ce1e7bd52bfe605b5f Author: Dejan Stojadinović AuthorDate: Sun Mar 5 07:25:00 2023 +0100 MINOR: Update gradle to 8.0.2 and update several gradle plugins (#13339) Also removed workaround from `build.gradle` that is no longer required after the update to Gradle 8.0.2. Related links: - zinc release notes: https://github.com/sbt/zinc/releases/tag/v1.8.0 - gradle release notes: https://github.com/gradle/gradle/releases/tag/v8.0.2 - gradle diff: https://github.com/gradle/gradle/compare/v8.0.1...v8.0.2 plugins version upgrade details: - 'com.github.ben-manes.versions' 0.44.0 -> 0.46.0 - 'org.owasp.dependencycheck' 8.0.2 -> 8.1.2 - 'io.swagger.core.v3.swagger-gradle-plugin' 2.2.0 -> 2.2.8 - 'org.gradle.test-retry'1.5.1 -> 1.5.2 - 'com.github.johnrengelman.shadow' 7.1.2 -> 8.1.0 Reviewers: Ismael Juma --- build.gradle | 17 +++-- gradle/dependencies.gradle | 4 ++-- gradle/wrapper/gradle-wrapper.properties | 4 ++-- gradlew | 2 +- 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/build.gradle b/build.gradle index 3cb9f807f36..d090124559a 100644 --- a/build.gradle +++ b/build.gradle @@ -31,17 +31,17 @@ buildscript { } plugins { - id 'com.github.ben-manes.versions' version '0.44.0' + id 'com.github.ben-manes.versions' version '0.46.0' id 'idea' id 'java-library' - id 'org.owasp.dependencycheck' version '8.0.2' + id 'org.owasp.dependencycheck' version '8.1.2' id 'org.nosphere.apache.rat' version "0.8.0" - id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.0" + id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.8" id "com.github.spotbugs" version '5.0.13' apply false - id 'org.gradle.test-retry' version '1.5.1' apply false + id 'org.gradle.test-retry' version '1.5.2' apply false id 'org.scoverage' version '7.0.1' apply false - id 'com.github.johnrengelman.shadow' version '7.1.2' apply false + id 'com.github.johnrengelman.shadow' version '8.1.0' apply false id 'com.diffplug.spotless' version '6.14.0' apply false // 6.14.1 and newer require Java 11 at compile time, so we can't upgrade until AK 4.0 } @@ -681,11 +681,8 @@ subprojects { } // Scalac 2.12 `-release` requires Java 9 or higher, but Scala 2.13 doesn't have that restriction -if (versions.baseScala == "2.13") - scalaCompileOptions.additionalParameters += ["-release:" + minJavaVersion] // Use `:` here to workaround Gradle bug (see https://github.com/gradle/gradle/issues/23962#issuecomment-1437348400) -else if (JavaVersion.current().isJava9Compatible()) - scalaCompileOptions.additionalParameters += ["-release", String.valueOf(minJavaVersion)] // Don't use `:` here as it breaks compilation with Scala 2.12 - +if (versions.baseScala == "2.13" || JavaVersion.current().isJava9Compatible()) + scalaCompileOptions.additionalParameters += ["-release", String.valueOf(minJavaVersion)] configure(scalaCompileOptions.forkOptions) { memoryMaximumSize = defaultMaxHeapSize diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 14a2ce223c8..3b01f4b0b3e 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -62,7 +62,7 @@ versions += [ checkstyle: "8.36.2", commonsCli: "1.4", dropwizardMetrics: "4.1.12.1", - gradle: "8.0.1", + gradle: "8.0.2", grgit: "4.1.1", httpclient: "4.5.13", easymock: "4.3", @@ -124,7 +124,7 @@ versions += [ spotbugs: "4.7.3", swaggerAnnotations: "2.2.0", swaggerJaxrs2: "2.2.0", - zinc: "1.7.2", + zinc: "1.8.0", zookeeper: "3.6.4", zstd: "1.5.2-1" ] diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 454aca12422..c3b1b519904 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b
[kafka] branch trunk updated: MINOR: srcJar should depend on processMessages task (#13316)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new dcc17999515 MINOR: srcJar should depend on processMessages task (#13316) dcc17999515 is described below commit dcc179995153c22c6248702976b60755b0b9fda8 Author: Ismael Juma AuthorDate: Tue Feb 28 07:16:09 2023 -0800 MINOR: srcJar should depend on processMessages task (#13316) This fixes the following `./gradlew install` issue: ```text * What went wrong: A problem was found with the configuration of task ':storage:srcJar' (type 'Jar'). - Gradle detected a problem with the following location: '/Users/ijuma/src/kafka/storage/src/generated/java'. Reason: Task ':storage:srcJar' uses this output of task ':storage:processMessages' without declaring an explicit or implicit dependency. This can lead to incorrect results being produced, depending on what order the tasks are executed. Possible solutions: 1. Declare task ':storage:processMessages' as an input of ':storage:srcJar'. 2. Declare an explicit dependency on ':storage:processMessages' from ':storage:srcJar' using Task#dependsOn. 3. Declare an explicit dependency on ':storage:processMessages' from ':storage:srcJar' using Task#mustRunAfter. Please refer to https://docs.gradle.org/8.0.1/userguide/validation_problems.html#implicit_dependency for more details about this problem. ``` Reviewers: David Jacot --- build.gradle | 7 +++ 1 file changed, 7 insertions(+) diff --git a/build.gradle b/build.gradle index 396506d1a5b..2ed096ab586 100644 --- a/build.gradle +++ b/build.gradle @@ -982,6 +982,7 @@ project(':core') { } compileJava.dependsOn 'processMessages' + srcJar.dependsOn 'processMessages' task genProtocolErrorDocs(type: JavaExec) { classpath = sourceSets.main.runtimeClasspath @@ -1213,6 +1214,7 @@ project(':metadata') { } compileJava.dependsOn 'processMessages' + srcJar.dependsOn 'processMessages' sourceSets { main { @@ -1287,6 +1289,7 @@ project(':group-coordinator') { } compileJava.dependsOn 'processMessages' + srcJar.dependsOn 'processMessages' } project(':examples') { @@ -1425,6 +1428,7 @@ project(':clients') { } compileJava.dependsOn 'processMessages' + srcJar.dependsOn 'processMessages' compileTestJava.dependsOn 'processTestMessages' @@ -1527,6 +1531,7 @@ project(':raft') { } compileJava.dependsOn 'processMessages' + srcJar.dependsOn 'processMessages' jar { dependsOn createVersionFile @@ -1749,6 +1754,7 @@ project(':storage') { } compileJava.dependsOn 'processMessages' + srcJar.dependsOn 'processMessages' jar { dependsOn createVersionFile @@ -1987,6 +1993,7 @@ project(':streams') { } compileJava.dependsOn 'processMessages' + srcJar.dependsOn 'processMessages' javadoc { include "**/org/apache/kafka/streams/**"
[kafka] branch trunk updated: MINOR: Enable spotless for streams-scala when Java 11+ is used (#13311)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 4441a01bb0d MINOR: Enable spotless for streams-scala when Java 11+ is used (#13311) 4441a01bb0d is described below commit 4441a01bb0d7cd2f4c5affd13ab65c79f7e342f4 Author: Ismael Juma AuthorDate: Mon Feb 27 00:28:19 2023 -0800 MINOR: Enable spotless for streams-scala when Java 11+ is used (#13311) Also re-enable it in CI. We do this by adjusting the `Jenkinsfile` to use a more general task (`./gradlew check -x test`). Reviewers: Chia-Ping Tsai , Dejan Stojadinović --- Jenkinsfile | 4 ++-- build.gradle | 24 ++-- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 9f296060b50..fbd3fce4b5f 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -18,9 +18,9 @@ */ def doValidation() { + // Run all the tasks associated with `check` except for `test` - the latter is executed via `doTest` sh """ -./retry_zinc ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala compileTestJava compileTestScala \ -checkstyleMain checkstyleTest spotbugsMain rat \ +./retry_zinc ./gradlew -PscalaVersion=$SCALA_VERSION clean check -x test \ --profile --continue -PxmlSpotBugsReport=true -PkeepAliveMode="session" """ } diff --git a/build.gradle b/build.gradle index 7ee6aea850c..ea62fbfc242 100644 --- a/build.gradle +++ b/build.gradle @@ -14,6 +14,7 @@ // limitations under the License. import org.ajoberstar.grgit.Grgit +import org.gradle.api.JavaVersion import java.nio.charset.StandardCharsets @@ -30,26 +31,18 @@ buildscript { } plugins { - id 'com.diffplug.spotless' version '6.13.0' // newer versions require Java 11, so we can't use them until Kafka 4.0 id 'com.github.ben-manes.versions' version '0.44.0' id 'idea' id 'java-library' id 'org.owasp.dependencycheck' version '8.0.2' id 'org.nosphere.apache.rat' version "0.8.0" + id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.0" id "com.github.spotbugs" version '5.0.13' apply false id 'org.gradle.test-retry' version '1.5.1' apply false id 'org.scoverage' version '7.0.1' apply false id 'com.github.johnrengelman.shadow' version '7.1.2' apply false - id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.0" -} - -spotless { - scala { -target 'streams/**/*.scala' - scalafmt("$versions.scalafmt").configFile('checkstyle/.scalafmt.conf').scalaMajorVersion(versions.baseScala) -licenseHeaderFile 'checkstyle/java.header', 'package' - } + id 'com.diffplug.spotless' version '6.14.0' apply false // 6.14.1 and newer require Java 11 at compile time, so we can't upgrade until AK 4.0 } ext { @@ -2116,6 +2109,17 @@ project(':streams:streams-scala') { dependsOn 'copyDependantLibs' } + // spotless 6.14 requires Java 11 at runtime + if (JavaVersion.current().isJava11Compatible()) { +apply plugin: 'com.diffplug.spotless' +spotless { + scala { +target '**/*.scala' + scalafmt("$versions.scalafmt").configFile('../../checkstyle/.scalafmt.conf').scalaMajorVersion(versions.baseScala) +licenseHeaderFile '../../checkstyle/java.header', 'package' + } +} + } } project(':streams:test-utils') {
[kafka] branch trunk updated (ca8d0bba914 -> 72dd401e51d)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from ca8d0bba914 MINOR: update Stream docs with regard to `upgrad.from` config for 0.10.1.2 release (#13074) add 72dd401e51d KAFKA-14680: Upgrade gradle version from 7.6 to 8.0.1 (#13205) No new revisions were added by this update. Summary of changes: build.gradle | 28 gradle/dependencies.gradle | 4 ++-- gradle/wrapper/gradle-wrapper.properties | 5 +++-- gradlew | 14 +- 4 files changed, 30 insertions(+), 21 deletions(-)
[kafka] branch trunk updated (61ece48a862 -> 97efdc65f08)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 61ece48a862 MINOR: stabilize LeaderElectionTest#testLeaderElectionAndEpoch (#13290) add 97efdc65f08 KAFKA-14728: Don't run 'spotlessScalaCheck' in CI (#13263) No new revisions were added by this update. Summary of changes: Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
svn commit: r59929 - /release/kafka/3.4.0/
Author: ijuma Date: Mon Feb 6 16:47:55 2023 New Revision: 59929 Log: Release 3.4.0 Added: release/kafka/3.4.0/ release/kafka/3.4.0/RELEASE_NOTES.html release/kafka/3.4.0/RELEASE_NOTES.html.asc (with props) release/kafka/3.4.0/RELEASE_NOTES.html.md5 release/kafka/3.4.0/RELEASE_NOTES.html.sha1 release/kafka/3.4.0/RELEASE_NOTES.html.sha512 release/kafka/3.4.0/kafka-3.4.0-src.tgz (with props) release/kafka/3.4.0/kafka-3.4.0-src.tgz.asc (with props) release/kafka/3.4.0/kafka-3.4.0-src.tgz.md5 release/kafka/3.4.0/kafka-3.4.0-src.tgz.sha1 release/kafka/3.4.0/kafka-3.4.0-src.tgz.sha512 release/kafka/3.4.0/kafka_2.12-3.4.0-site-docs.tgz (with props) release/kafka/3.4.0/kafka_2.12-3.4.0-site-docs.tgz.asc (with props) release/kafka/3.4.0/kafka_2.12-3.4.0-site-docs.tgz.md5 release/kafka/3.4.0/kafka_2.12-3.4.0-site-docs.tgz.sha1 release/kafka/3.4.0/kafka_2.12-3.4.0-site-docs.tgz.sha512 release/kafka/3.4.0/kafka_2.12-3.4.0.tgz (with props) release/kafka/3.4.0/kafka_2.12-3.4.0.tgz.asc (with props) release/kafka/3.4.0/kafka_2.12-3.4.0.tgz.md5 release/kafka/3.4.0/kafka_2.12-3.4.0.tgz.sha1 release/kafka/3.4.0/kafka_2.12-3.4.0.tgz.sha512 release/kafka/3.4.0/kafka_2.13-3.4.0-site-docs.tgz (with props) release/kafka/3.4.0/kafka_2.13-3.4.0-site-docs.tgz.asc (with props) release/kafka/3.4.0/kafka_2.13-3.4.0-site-docs.tgz.md5 release/kafka/3.4.0/kafka_2.13-3.4.0-site-docs.tgz.sha1 release/kafka/3.4.0/kafka_2.13-3.4.0-site-docs.tgz.sha512 release/kafka/3.4.0/kafka_2.13-3.4.0.tgz (with props) release/kafka/3.4.0/kafka_2.13-3.4.0.tgz.asc (with props) release/kafka/3.4.0/kafka_2.13-3.4.0.tgz.md5 release/kafka/3.4.0/kafka_2.13-3.4.0.tgz.sha1 release/kafka/3.4.0/kafka_2.13-3.4.0.tgz.sha512 Added: release/kafka/3.4.0/RELEASE_NOTES.html == --- release/kafka/3.4.0/RELEASE_NOTES.html (added) +++ release/kafka/3.4.0/RELEASE_NOTES.html Mon Feb 6 16:47:55 2023 @@ -0,0 +1,156 @@ +Release Notes - Kafka - Version 3.4.0 +Below is a summary of the JIRA issues addressed in the 3.4.0 release of Kafka. For full documentation of the +release, a guide to get started, and information about the project, see the https://kafka.apache.org/";>Kafka +project site. + +Note about upgrades: Please carefully review the +https://kafka.apache.org/34/documentation.html#upgrade";>upgrade documentation for this release thoroughly +before upgrading your cluster. The upgrade notes discuss any critical information about incompatibilities and breaking +changes, performance changes, and any other changes that might impact your production deployment of Kafka. + +The documentation for the most recent release can be found at +https://kafka.apache.org/documentation.html";>https://kafka.apache.org/documentation.html. +New Feature + +[https://issues.apache.org/jira/browse/KAFKA-10360";>KAFKA-10360] - Disabling JmxReporter registration +[https://issues.apache.org/jira/browse/KAFKA-13602";>KAFKA-13602] - Allow to broadcast a result record +[https://issues.apache.org/jira/browse/KAFKA-13715";>KAFKA-13715] - Add "generation" field into consumer protocol +[https://issues.apache.org/jira/browse/KAFKA-14286";>KAFKA-14286] - Time based cluster metadata snapshots + +Improvement + +[https://issues.apache.org/jira/browse/KAFKA-10149";>KAFKA-10149] - Do not prevent automatic preferred election when reassignment in progress +[https://issues.apache.org/jira/browse/KAFKA-12878";>KAFKA-12878] - Support --bootstrap-server kafka-streams-application-reset +[https://issues.apache.org/jira/browse/KAFKA-12960";>KAFKA-12960] - WindowStore and SessionStore do not enforce strict retention time +[https://issues.apache.org/jira/browse/KAFKA-13401";>KAFKA-13401] - Introduce a new Interface to manage Kafka resources in MM2 +[https://issues.apache.org/jira/browse/KAFKA-13731";>KAFKA-13731] - Standalone Connect workers should not require connector configs to start +[https://issues.apache.org/jira/browse/KAFKA-13764";>KAFKA-13764] - Potential improvements for Connect incremental rebalancing logic +[https://issues.apache.org/jira/browse/KAFKA-13809";>KAFKA-13809] - FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks +[https://issues.apache.org/jira/browse/KAFKA-14017";>KAFKA-14017] - File source connector should implement KIP-618 APIs +[https://issues.apache.org/jira/browse/KAFKA-14095";>KAFKA-14095] - Improve handling of sync offset failures in MirrorMaker +[https://issues.apache.org/jira/browse/KAFKA-14097";>KAFKA-14097] - Separate configuration for producer ID expiry +[https://issues.apache.org/jira/browse/KAFKA-14098";>KAFKA-14098]
svn commit: r59928 - /dev/kafka/3.3.1/
Author: ijuma Date: Mon Feb 6 16:47:31 2023 New Revision: 59928 Log: Delete 3.3.1 Removed: dev/kafka/3.3.1/
[kafka] 01/01: MINOR: Update build and test dependencies for 3.5
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch build-dep-update-3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git commit fa495feddca48192be6933f80a82308163796fa6 Author: Ismael Juma AuthorDate: Sun Feb 5 19:02:46 2023 -0800 MINOR: Update build and test dependencies for 3.5 --- build.gradle | 14 +++--- gradle/dependencies.gradle | 10 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/build.gradle b/build.gradle index ef1872b7d7a..a1f6488edbf 100644 --- a/build.gradle +++ b/build.gradle @@ -30,16 +30,16 @@ buildscript { } plugins { - id 'com.diffplug.spotless' version '6.10.0' - id 'com.github.ben-manes.versions' version '0.42.0' + id 'com.diffplug.spotless' version '6.13.0' + id 'com.github.ben-manes.versions' version '0.44.0' id 'idea' id 'java-library' - id 'org.owasp.dependencycheck' version '7.1.1' - id 'org.nosphere.apache.rat' version "0.7.1" + id 'org.owasp.dependencycheck' version '8.0.2' + id 'org.nosphere.apache.rat' version "0.8.0" - id "com.github.spotbugs" version '5.0.9' apply false - id 'org.gradle.test-retry' version '1.4.0' apply false - id 'org.scoverage' version '7.0.0' apply false + id "com.github.spotbugs" version '5.0.13' apply false + id 'org.gradle.test-retry' version '1.5.1' apply false + id 'org.scoverage' version '7.0.1' apply false id 'com.github.johnrengelman.shadow' version '7.1.2' apply false id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.0" } diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index a1e9752cebf..ce36f093e4d 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -65,7 +65,7 @@ versions += [ gradle: "7.6", grgit: "4.1.1", httpclient: "4.5.13", - easymock: "4.3", + easymock: "5.1.0", jackson: "2.13.4", jacksonDatabind: "2.13.4.2", jacoco: "0.8.7", @@ -73,7 +73,7 @@ versions += [ jetty: "9.4.48.v20220622", jersey: "2.34", jline: "3.21.0", - jmh: "1.35", + jmh: "1.36", hamcrest: "2.2", scalaLogging: "3.9.4", jaxAnnotation: "1.3.2", @@ -82,8 +82,8 @@ versions += [ jfreechart: "1.0.0", jopt: "5.0.4", jose4j: "0.7.9", - junit: "5.9.0", - jqwik: "1.6.5", + junit: "5.9.2", + jqwik: "1.7.2", kafka_0100: "0.10.0.1", kafka_0101: "0.10.1.1", kafka_0102: "0.10.2.2", @@ -106,7 +106,7 @@ versions += [ lz4: "1.8.0", mavenArtifact: "3.8.4", metrics: "2.2.0", - mockito: "4.6.1", + mockito: "4.11.0", netty: "4.1.86.Final", powermock: "2.0.9", reflections: "0.9.12",
[kafka] branch build-dep-update-3.5 created (now fa495feddca)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch build-dep-update-3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git at fa495feddca MINOR: Update build and test dependencies for 3.5 This branch includes the following new commits: new fa495feddca MINOR: Update build and test dependencies for 3.5 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[kafka] branch trunk updated: MINOR: Remove some connect tests from Java 17 block list (#13121)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 01aee55fc4f MINOR: Remove some connect tests from Java 17 block list (#13121) 01aee55fc4f is described below commit 01aee55fc4f5a09e1905cafe411df74bf944fd4d Author: Ismael Juma AuthorDate: Sat Jan 21 07:42:50 2023 -0800 MINOR: Remove some connect tests from Java 17 block list (#13121) Most were converted not to use PowerMock, but some no longer exist. Reviewers: Chris Egerton , Christo Lolov --- build.gradle | 9 +++-- 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/build.gradle b/build.gradle index e535d291269..8441ddeaf05 100644 --- a/build.gradle +++ b/build.gradle @@ -410,14 +410,11 @@ subprojects { if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_16)) { testsToExclude.addAll([ // connect tests - "**/ConnectorPluginsResourceTest.*", - "**/DistributedHerderTest.*", "**/FileOffsetBakingStoreTest.*", + "**/DistributedHerderTest.*", "**/KafkaConfigBackingStoreTest.*", - "**/KafkaBasedLogTest.*", "**/OffsetStorageWriterTest.*", "**/StandaloneHerderTest.*", - "**/SourceTaskOffsetCommitterTest.*", + "**/KafkaBasedLogTest.*", "**/StandaloneHerderTest.*", "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*", - "**/WorkerSourceTaskTest.*", "**/AbstractWorkerSourceTaskTest.*", - "**/WorkerTaskTest.*" + "**/WorkerSourceTaskTest.*", "**/AbstractWorkerSourceTaskTest.*" ]) }
[kafka] branch trunk updated: MINOR: Include the inner exception stack trace when re-throwing an exception (#12229)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b2bc72dc794 MINOR: Include the inner exception stack trace when re-throwing an exception (#12229) b2bc72dc794 is described below commit b2bc72dc794a28f0cdaf63d806bafd8871b9cddb Author: Divij Vaidya AuthorDate: Mon Jan 16 00:03:23 2023 +0100 MINOR: Include the inner exception stack trace when re-throwing an exception (#12229) While wrapping the caught exception into a custom one, information about the caught exception is being lost, including information about the stack trace of the exception. When re-throwing an exception, we either include the original exception or the relevant information is added to the exception message. Reviewers: Ismael Juma , Luke Chen , dengziming , Matthew de Detrich --- .../java/org/apache/kafka/common/protocol/types/CompactArrayOf.java | 2 +- .../main/java/org/apache/kafka/common/protocol/types/TaggedFields.java | 2 +- .../org/apache/kafka/common/security/scram/internals/ScramMessages.java | 2 +- .../org/apache/kafka/streams/processor/internals/StateManagerUtil.java | 2 +- .../java/org/apache/kafka/streams/state/internals/AbstractSegments.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java index 4e9f8f8a981..6a252a8a4cc 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java @@ -119,7 +119,7 @@ public class CompactArrayOf extends DocumentedType { type.validate(obj); return array; } catch (ClassCastException e) { -throw new SchemaException("Not an Object[]."); +throw new SchemaException("Not an Object[]. Found class " + item.getClass().getSimpleName()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java index 129f80c90ba..26fb65830e6 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java @@ -165,7 +165,7 @@ public class TaggedFields extends DocumentedType { } return objects; } catch (ClassCastException e) { -throw new SchemaException("Not a NavigableMap."); +throw new SchemaException("Not a NavigableMap. Found class " + item.getClass().getSimpleName()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMessages.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMessages.java index 05512962906..8532d73fca4 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMessages.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMessages.java @@ -154,7 +154,7 @@ public class ScramMessages { if (this.iterations <= 0) throw new SaslException("Invalid SCRAM server first message format: invalid iterations " + iterations); } catch (NumberFormatException e) { -throw new SaslException("Invalid SCRAM server first message format: invalid iterations"); +throw new SaslException("Invalid SCRAM server first message format: invalid iterations", e); } this.nonce = matcher.group("nonce"); String salt = matcher.group("salt"); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java index 328f0e5fb65..16cf1ef34ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java @@ -176,7 +176,7 @@ final class StateManagerUtil { return new TaskId(topicGroupId, partition, namedTopology); } catch (final Exception e) { -throw new TaskIdFormatException(taskIdStr); +throw new TaskIdFormatException(taskIdStr, e); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java inde
[kafka] branch trunk updated: MINOR: Fix docs to state that sendfile implemented in `TransferableRecords` instead of `MessageSet` (#13109)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 8d2e157b379 MINOR: Fix docs to state that sendfile implemented in `TransferableRecords` instead of `MessageSet` (#13109) 8d2e157b379 is described below commit 8d2e157b37902055bd5a8f4bd0f6ac29080910eb Author: drgnchan <40224023+drgnc...@users.noreply.github.com> AuthorDate: Sun Jan 15 15:06:18 2023 +0800 MINOR: Fix docs to state that sendfile implemented in `TransferableRecords` instead of `MessageSet` (#13109) Reviewers: Ismael Juma --- docs/implementation.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/implementation.html b/docs/implementation.html index 11cf365750b..878868544f9 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -18,7 +18,7 @@
[kafka] branch trunk updated: KAFKA-14568: Move FetchDataInfo and related to storage module (#13085)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 111f02cc74c KAFKA-14568: Move FetchDataInfo and related to storage module (#13085) 111f02cc74c is described below commit 111f02cc74cd3a742f9b6b6cd7f183ec2a494354 Author: Federico Valeri AuthorDate: Fri Jan 13 06:32:23 2023 +0100 KAFKA-14568: Move FetchDataInfo and related to storage module (#13085) Part of KAFKA-14470: Move log layer to storage module. Reviewers: Ismael Juma Co-authored-by: Ismael Juma --- .../apache/kafka/common/requests/FetchRequest.java | 28 +- core/src/main/scala/kafka/api/Request.scala| 41 core/src/main/scala/kafka/cluster/Partition.scala | 6 +- .../coordinator/group/GroupMetadataManager.scala | 6 +- .../transaction/TransactionStateManager.scala | 6 +- core/src/main/scala/kafka/log/LocalLog.scala | 26 ++--- core/src/main/scala/kafka/log/LogSegment.scala | 9 +- core/src/main/scala/kafka/log/UnifiedLog.scala | 12 +-- .../main/scala/kafka/raft/KafkaMetadataLog.scala | 8 +- .../src/main/scala/kafka/server/DelayedFetch.scala | 9 +- .../main/scala/kafka/server/FetchDataInfo.scala| 95 -- core/src/main/scala/kafka/server/KafkaApis.scala | 32 +++ .../scala/kafka/server/LocalLeaderEndPoint.scala | 22 ++--- .../main/scala/kafka/server/ReplicaManager.scala | 42 .../kafka/tools/ReplicaVerificationTool.scala | 17 ++-- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../kafka/server/DelayedFetchTest.scala| 21 ++-- .../src/test/scala/other/kafka/StressTestLog.scala | 6 +- .../unit/kafka/cluster/PartitionLockTest.scala | 18 ++-- .../scala/unit/kafka/cluster/PartitionTest.scala | 38 .../group/GroupMetadataManagerTest.scala | 22 ++--- .../TransactionCoordinatorConcurrencyTest.scala| 8 +- .../transaction/TransactionStateManagerTest.scala | 18 ++-- .../unit/kafka/log/BrokerCompressionTest.scala | 6 +- .../test/scala/unit/kafka/log/LocalLogTest.scala | 4 +- .../scala/unit/kafka/log/LogConcurrencyTest.scala | 6 +- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 4 +- .../test/scala/unit/kafka/log/LogManagerTest.scala | 6 +- .../test/scala/unit/kafka/log/LogTestUtils.scala | 6 +- .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 33 --- .../scala/unit/kafka/server/KafkaApisTest.scala| 12 +-- .../server/ReplicaAlterLogDirsThreadTest.scala | 83 .../server/ReplicaManagerConcurrencyTest.scala | 18 ++-- .../kafka/server/ReplicaManagerQuotasTest.scala| 38 .../unit/kafka/server/ReplicaManagerTest.scala | 64 ++--- .../unit/kafka/tools/DumpLogSegmentsTest.scala | 6 +- .../test/scala/unit/kafka/utils/TestUtils.scala| 4 +- .../kafka/server/log/internals/FetchDataInfo.java | 50 ++ .../kafka/server/log/internals/FetchIsolation.java | 40 .../kafka/server/log/internals/FetchParams.java| 106 + .../server/log/internals/FetchPartitionData.java | 58 +++ .../integration/utils/IntegrationTestUtils.java| 4 +- 42 files changed, 583 insertions(+), 457 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 09242bfc4bf..2510f1e607c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -40,13 +40,16 @@ import java.util.Optional; import java.util.stream.Collectors; public class FetchRequest extends AbstractRequest { - public static final int CONSUMER_REPLICA_ID = -1; // default values for older versions where a request level limit did not exist public static final int DEFAULT_RESPONSE_MAX_BYTES = Integer.MAX_VALUE; public static final long INVALID_LOG_START_OFFSET = -1L; +public static final int ORDINARY_CONSUMER_ID = -1; +public static final int DEBUGGING_CONSUMER_ID = -2; +public static final int FUTURE_LOCAL_REPLICA_ID = -3; + private final FetchRequestData data; private volatile LinkedHashMap fetchData = null; private volatile List toForget = null; @@ -429,6 +432,29 @@ public class FetchRequest extends AbstractRequest { return new FetchRequest(new FetchRequestData(new ByteBufferAccessor(buffer), version), version); } +// Broker ids are non-negative int. +public static boolean isValidBrokerId(int brokerId) { +return brokerId >= 0; +} + +public static boolean isConsumer(int replicaId) { +return replicaId < 0 &&
[kafka] branch trunk updated: KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (#13092)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 8ac644d2b19 KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (#13092) 8ac644d2b19 is described below commit 8ac644d2b198a386f67b213db35bb9cbd8eda073 Author: Ismael Juma AuthorDate: Tue Jan 10 23:51:58 2023 -0800 KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (#13092) There were some concurrency inconsistencies in `KafkaScheduler` flagged by spotBugs that had to be fixed, summary of changes below: * Executor is `volatile` * We always synchronize and check `isStarted` as the first thing within the critical section when a mutating operation is performed. * We don't synchronize (but ensure the executor is not null in a safe way) in read-only operations that operate on the executor. With regards to `MockScheduler/MockTask`: * Set the type of `nextExecution` to `AtomicLong` and replaced inconsistent synchronization * Extracted logic into `MockTask.rescheduleIfPeriodic` Tweaked the `Scheduler` interface a bit: * Removed `unit` parameter since we always used `ms` except one invocation * Introduced a couple of `scheduleOnce` overloads to replace the usage of default arguments in Scala * Pulled up `resizeThreadPool` to the interface and removed `isStarted` from the interface. Other cleanups: * Removed spotBugs exclusion affecting `kafka.log.LogConfig`, which no longer exists. For broader context, see: * KAFKA-14470: Move log layer to storage module Reviewers: Jun Rao --- build.gradle | 2 + .../kafka/server/builders/LogManagerBuilder.java | 2 +- .../server/builders/ReplicaManagerBuilder.java | 2 +- .../scala/kafka/controller/KafkaController.scala | 19 +- .../coordinator/group/GroupMetadataManager.scala | 22 +-- .../transaction/TransactionCoordinator.scala | 3 +- .../transaction/TransactionStateManager.scala | 13 +- core/src/main/scala/kafka/log/LocalLog.scala | 5 +- core/src/main/scala/kafka/log/LogLoader.scala | 3 +- core/src/main/scala/kafka/log/LogManager.scala | 51 +++--- core/src/main/scala/kafka/log/UnifiedLog.scala | 15 +- .../main/scala/kafka/raft/KafkaMetadataLog.scala | 9 +- core/src/main/scala/kafka/raft/RaftManager.scala | 5 +- .../scala/kafka/server/AlterPartitionManager.scala | 9 +- .../src/main/scala/kafka/server/BrokerServer.scala | 5 +- core/src/main/scala/kafka/server/KafkaBroker.scala | 4 +- core/src/main/scala/kafka/server/KafkaServer.scala | 4 +- .../main/scala/kafka/server/ReplicaManager.scala | 7 +- .../kafka/server/ZkAlterPartitionManager.scala | 9 +- .../main/scala/kafka/utils/KafkaScheduler.scala| 165 -- .../scala/kafka/zookeeper/ZooKeeperClient.scala| 10 +- .../kafka/server/LocalLeaderEndPointTest.scala | 3 +- .../scala/other/kafka/TestLinearWriteSpeed.scala | 1 + .../scala/unit/kafka/cluster/PartitionTest.scala | 3 +- .../AbstractCoordinatorConcurrencyTest.scala | 1 + .../coordinator/group/GroupCoordinatorTest.scala | 1 + .../group/GroupMetadataManagerTest.scala | 3 +- .../transaction/TransactionCoordinatorTest.scala | 2 +- .../transaction/TransactionStateManagerTest.scala | 3 +- .../test/scala/unit/kafka/log/LocalLogTest.scala | 3 +- .../scala/unit/kafka/log/LogConcurrencyTest.scala | 3 +- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 3 +- .../test/scala/unit/kafka/log/LogTestUtils.scala | 3 +- .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 1 + .../kafka/server/AlterPartitionManagerTest.scala | 3 +- .../kafka/server/DynamicBrokerConfigTest.scala | 5 +- .../server/HighwatermarkPersistenceTest.scala | 3 +- .../kafka/server/ReplicaManagerQuotasTest.scala| 1 + .../unit/kafka/server/ReplicaManagerTest.scala | 3 +- .../scala/unit/kafka/utils/MockScheduler.scala | 149 .../src/test/scala/unit/kafka/utils/MockTime.scala | 1 + .../scala/unit/kafka/utils/SchedulerTest.scala | 25 +-- .../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 3 +- gradle/spotbugs-exclude.xml| 14 +- .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 6 +- .../partition/PartitionMakeFollowerBenchmark.java | 6 +- .../UpdateFollowerFetchStateBenchmark.java | 6 +- .../apache/kafka/jmh/server/CheckpointBench.java | 6 +- .../kafka/jmh/server/PartitionCreationBench.java | 6 +- .../apache/kafka/server/util/KafkaScheduler.java | 189 .../org/apache/kafka/server/util/Scheduler.java| 59 +++ .../apache/kafka/server
[kafka] branch trunk updated: KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module (#13043)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2dec39d6e49 KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module (#13043) 2dec39d6e49 is described below commit 2dec39d6e49da4cfb502da3e84d4f9c50508e809 Author: Satish Duggana AuthorDate: Sun Jan 8 09:43:38 2023 +0530 KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module (#13043) For broader context on this change, see: * KAFKA-14470: Move log layer to storage module. Reviewers: Ismael Juma --- core/src/main/scala/kafka/log/LogCleaner.scala | 9 +- core/src/main/scala/kafka/log/LogSegment.scala | 3 +- .../scala/kafka/log/ProducerStateManager.scala | 346 + core/src/main/scala/kafka/log/UnifiedLog.scala | 19 +- .../main/scala/kafka/tools/DumpLogSegments.scala | 2 +- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 8 +- .../unit/kafka/log/ProducerStateManagerTest.scala | 77 +++-- .../kafka/server/log/internals/BatchMetadata.java | 79 + .../kafka/server/log/internals/LastRecord.java | 59 .../server/log/internals/ProducerAppendInfo.java | 239 ++ .../server/log/internals/ProducerStateEntry.java | 150 + .../kafka/server/log/internals/TxnMetadata.java| 51 +++ 12 files changed, 665 insertions(+), 377 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 83b1b0e81b6..5a098790a31 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{BufferSupplier, Time} -import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex} +import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex} import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer @@ -680,9 +680,10 @@ private[log] class Cleaner(val id: Int, // 3) The last entry in the log is a transaction marker. We retain this marker since it has the //last producer epoch, which is needed to ensure fencing. lastRecordsOfActiveProducers.get(batch.producerId).exists { lastRecord => -lastRecord.lastDataOffset match { - case Some(offset) => batch.lastOffset == offset - case None => batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch +if (lastRecord.lastDataOffset.isPresent) { + batch.lastOffset == lastRecord.lastDataOffset.getAsLong +} else { + batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch } } } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index d289df2ec47..53b51cb16ac 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult} import java.util.Optional +import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ import scala.math._ @@ -249,7 +250,7 @@ class LogSegment private[log] (val log: FileRecords, if (batch.hasProducerId) { val producerId = batch.producerId val appendInfo = producerStateManager.prepareUpdate(producerId, origin = AppendOrigin.REPLICATION) - val maybeCompletedTxn = appendInfo.append(batch, firstOffsetMetadataOpt = None) + val maybeCompletedTxn = appendInfo.append(batch, Optional.empty()).asScala producerStateManager.update(appendInfo) maybeCompletedTxn.foreach { completedTxn => val lastStableOffset = producerStateManager.lastStableOffset(completedTxn) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index da9f17c2c22..2dc7748152d 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/lo
[kafka] branch 3.3 updated: KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata (#13073)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 27847e0c78a KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata (#13073) 27847e0c78a is described below commit 27847e0c78a43f51cdf37d12253e683738d630a2 Author: Edoardo Comar AuthorDate: Wed Jan 4 22:24:16 2023 + KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata (#13073) ZkMetadataCache.getClusterMetadata returns a Cluster object where the aliveNodes were missing their rack info. Problem: when ZkMetadataCache updates the metadataSnapshot, includes the rack in aliveBrokers but not in aliveNodes Trivial fix with matching assertion in existing unit test. Note that the Cluster object returned from `MetadataCache.getClusterMetadata(...)` is passed to `ClientQuotaCallback.updateClusterMetadata(...)` so it is used, though not by clients, but by service providers. Reviewers: Ismael Juma --- core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala | 2 +- core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index d69785f90f6..235c15db674 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -365,7 +365,7 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea broker.endpoints.forEach { ep => val listenerName = new ListenerName(ep.listener) endPoints += new EndPoint(ep.host, ep.port, listenerName, SecurityProtocol.forId(ep.securityProtocol)) - nodes.put(listenerName, new Node(broker.id, ep.host, ep.port)) + nodes.put(listenerName, new Node(broker.id, ep.host, ep.port, broker.rack())) } aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) aliveNodes(broker.id) = nodes.asScala diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index d92c76f7118..4d4df4243a6 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -596,7 +596,7 @@ class MetadataCacheTest { val brokers = Seq( new UpdateMetadataBroker() .setId(0) -.setRack("") +.setRack("r") .setEndpoints(Seq(new UpdateMetadataEndpoint() .setHost("foo") .setPort(9092) @@ -627,7 +627,7 @@ class MetadataCacheTest { brokers.asJava, Collections.emptyMap()).build() MetadataCacheTest.updateCache(cache, updateMetadataRequest) -val expectedNode0 = new Node(0, "foo", 9092) +val expectedNode0 = new Node(0, "foo", 9092, "r") val expectedNode1 = new Node(1, "", -1) val cluster = cache.getClusterMetadata("clusterId", listenerName)
[kafka] branch 3.4 updated: KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata (#13073)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.4 by this push: new b857fdec873 KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata (#13073) b857fdec873 is described below commit b857fdec87392127cb177bb95fd224836f28701a Author: Edoardo Comar AuthorDate: Wed Jan 4 22:24:16 2023 + KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata (#13073) ZkMetadataCache.getClusterMetadata returns a Cluster object where the aliveNodes were missing their rack info. Problem: when ZkMetadataCache updates the metadataSnapshot, includes the rack in aliveBrokers but not in aliveNodes Trivial fix with matching assertion in existing unit test. Note that the Cluster object returned from `MetadataCache.getClusterMetadata(...)` is passed to `ClientQuotaCallback.updateClusterMetadata(...)` so it is used, though not by clients, but by service providers. Reviewers: Ismael Juma --- core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala | 2 +- core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index d774cd41a5c..feaaf1c43f1 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -398,7 +398,7 @@ class ZkMetadataCache( broker.endpoints.forEach { ep => val listenerName = new ListenerName(ep.listener) endPoints += new EndPoint(ep.host, ep.port, listenerName, SecurityProtocol.forId(ep.securityProtocol)) - nodes.put(listenerName, new Node(broker.id, ep.host, ep.port)) + nodes.put(listenerName, new Node(broker.id, ep.host, ep.port, broker.rack())) } aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) aliveNodes(broker.id) = nodes.asScala diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 7dadd5bf759..d2df68f6da9 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -595,7 +595,7 @@ class MetadataCacheTest { val brokers = Seq( new UpdateMetadataBroker() .setId(0) -.setRack("") +.setRack("r") .setEndpoints(Seq(new UpdateMetadataEndpoint() .setHost("foo") .setPort(9092) @@ -626,7 +626,7 @@ class MetadataCacheTest { brokers.asJava, Collections.emptyMap()).build() MetadataCacheTest.updateCache(cache, updateMetadataRequest) -val expectedNode0 = new Node(0, "foo", 9092) +val expectedNode0 = new Node(0, "foo", 9092, "r") val expectedNode1 = new Node(1, "", -1) val cluster = cache.getClusterMetadata("clusterId", listenerName)
[kafka] branch trunk updated: KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata (#13073)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ef3b581ff0c KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata (#13073) ef3b581ff0c is described below commit ef3b581ff0c4eba1fbf1e1d60507365ef141f0c3 Author: Edoardo Comar AuthorDate: Wed Jan 4 22:24:16 2023 + KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata (#13073) ZkMetadataCache.getClusterMetadata returns a Cluster object where the aliveNodes were missing their rack info. Problem: when ZkMetadataCache updates the metadataSnapshot, includes the rack in aliveBrokers but not in aliveNodes Trivial fix with matching assertion in existing unit test. Note that the Cluster object returned from `MetadataCache.getClusterMetadata(...)` is passed to `ClientQuotaCallback.updateClusterMetadata(...)` so it is used, though not by clients, but by service providers. Reviewers: Ismael Juma --- core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala | 2 +- core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index d774cd41a5c..feaaf1c43f1 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -398,7 +398,7 @@ class ZkMetadataCache( broker.endpoints.forEach { ep => val listenerName = new ListenerName(ep.listener) endPoints += new EndPoint(ep.host, ep.port, listenerName, SecurityProtocol.forId(ep.securityProtocol)) - nodes.put(listenerName, new Node(broker.id, ep.host, ep.port)) + nodes.put(listenerName, new Node(broker.id, ep.host, ep.port, broker.rack())) } aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) aliveNodes(broker.id) = nodes.asScala diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 7dadd5bf759..d2df68f6da9 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -595,7 +595,7 @@ class MetadataCacheTest { val brokers = Seq( new UpdateMetadataBroker() .setId(0) -.setRack("") +.setRack("r") .setEndpoints(Seq(new UpdateMetadataEndpoint() .setHost("foo") .setPort(9092) @@ -626,7 +626,7 @@ class MetadataCacheTest { brokers.asJava, Collections.emptyMap()).build() MetadataCacheTest.updateCache(cache, updateMetadataRequest) -val expectedNode0 = new Node(0, "foo", 9092) +val expectedNode0 = new Node(0, "foo", 9092, "r") val expectedNode1 = new Node(1, "", -1) val cluster = cache.getClusterMetadata("clusterId", listenerName)
[kafka] branch trunk updated: KAFKA-14550: Move SnapshotFile and CorruptSnapshotException to storage module (#13039)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 026105d05f6 KAFKA-14550: Move SnapshotFile and CorruptSnapshotException to storage module (#13039) 026105d05f6 is described below commit 026105d05f66d15544db6468f800fba6a6d4e171 Author: Satish Duggana AuthorDate: Mon Jan 2 21:01:40 2023 +0530 KAFKA-14550: Move SnapshotFile and CorruptSnapshotException to storage module (#13039) For broader context on this change, see: * KAFKA-14470: Move log layer to storage module Reviewers: Ismael Juma --- .../java/org/apache/kafka/common/utils/Utils.java | 8 +++ .../org/apache/kafka/common/utils/UtilsTest.java | 9 +++ core/src/main/scala/kafka/log/LocalLog.scala | 5 +- core/src/main/scala/kafka/log/LogLoader.scala | 8 +-- core/src/main/scala/kafka/log/LogSegment.scala | 10 +-- .../scala/kafka/log/ProducerStateManager.scala | 60 -- core/src/main/scala/kafka/log/UnifiedLog.scala | 2 - .../scala/kafka/log/remote/RemoteIndexCache.scala | 4 +- .../main/scala/kafka/tools/DumpLogSegments.scala | 2 +- core/src/main/scala/kafka/utils/CoreUtils.scala| 9 --- .../test/scala/unit/kafka/log/LogCleanerTest.scala | 12 ++-- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 12 ++-- .../scala/unit/kafka/utils/CoreUtilsTest.scala | 8 --- .../log/internals/CorruptSnapshotException.java| 26 .../kafka/server/log/internals/LogFileUtils.java | 35 +++ .../kafka/server/log/internals/SnapshotFile.java | 71 ++ 16 files changed, 186 insertions(+), 95 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index a9c510bac3f..9249d7f96aa 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1488,4 +1488,12 @@ public final class Utils { return Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).format(dateTimeFormatter); } +/** + * Replace the given string suffix with the new suffix. If the string doesn't end with the given suffix throw an exception. + */ +public static String replaceSuffix(String str, String oldSuffix, String newSuffix) { +if (!str.endsWith(oldSuffix)) +throw new IllegalArgumentException("Expected string to end with " + oldSuffix + " but string is " + str); +return str.substring(0, str.length() - oldSuffix.length()) + newSuffix; +} } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index d10fd37a71b..50e6f179f61 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -926,4 +926,13 @@ public class UtilsTest { assertEquals(String.format("2020-11-09 12:34:05,123 %s", requiredOffsetFormat), Utils.toLogDateTimeFormat(timestampWithMilliSeconds.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli())); assertEquals(String.format("2020-11-09 12:34:05,000 %s", requiredOffsetFormat), Utils.toLogDateTimeFormat(timestampWithSeconds.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli())); } + +@Test +public void testReplaceSuffix() { +assertEquals("blah.foo.text", Utils.replaceSuffix("blah.foo.txt", ".txt", ".text")); +assertEquals("blah.foo", Utils.replaceSuffix("blah.foo.txt", ".txt", "")); +assertEquals("txt.txt", Utils.replaceSuffix("txt.txt.txt", ".txt", "")); +assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt")); +} + } diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index b62fead3b91..28c809309d3 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeEx import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.server.log.internals.LogFileUtils.offsetFromFileName import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, LogOffsetMetadata, OffsetPosition} import scala.jdk.CollectionConverters._ @@ -714,10 +715,6 @@ object LocalLog extends Logging { private[log]
[kafka] branch trunk updated: MINOR: Use `LogConfig.validate` instead of `validateValues` in `KafkaMetadataLog` (#13051)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new da56c1cf299 MINOR: Use `LogConfig.validate` instead of `validateValues` in `KafkaMetadataLog` (#13051) da56c1cf299 is described below commit da56c1cf2992a868be02b94120ce22a3978aecd0 Author: Ismael Juma AuthorDate: Wed Dec 28 10:34:08 2022 -0800 MINOR: Use `LogConfig.validate` instead of `validateValues` in `KafkaMetadataLog` (#13051) `LogConfig.validateValues` may fail or incorrectly succeed if the properties don't include defaults. During the conversion of `LogConfig` to Java (#13049), it became clear that the `asInstanceOf[Long]` calls in `LogConfig.validateValues` were converting `null` to `0` when this method was invoked from `KafkaMetadataLog`. This means that it would be possible for it to validate successfully in cases where it should not. Reviewers: José Armando GarcÃa Sancio --- core/src/main/scala/kafka/raft/KafkaMetadataLog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index a892d9c235b..28953ab1846 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -565,7 +565,7 @@ object KafkaMetadataLog extends Logging { // Disable time and byte retention when deleting segments props.setProperty(LogConfig.RetentionMsProp, "-1") props.setProperty(LogConfig.RetentionBytesProp, "-1") -LogConfig.validateValues(props) +LogConfig.validate(props) val defaultLogConfig = LogConfig(props) if (config.logSegmentBytes < config.logSegmentMinBytes) {
[kafka] branch trunk updated: KAFKA-14554: Move ClassLoaderAwareRemoteStorageManagerTest to storage module (#13048)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9f026dee6b4 KAFKA-14554: Move ClassLoaderAwareRemoteStorageManagerTest to storage module (#13048) 9f026dee6b4 is described below commit 9f026dee6b4e1475f9be0e617bb09d8ea84ad353 Author: Federico Valeri AuthorDate: Wed Dec 28 15:42:26 2022 +0100 KAFKA-14554: Move ClassLoaderAwareRemoteStorageManagerTest to storage module (#13048) Reviewers: Ismael Juma --- .../ClassLoaderAwareRemoteStorageManagerTest.scala | 45 - .../ClassLoaderAwareRemoteStorageManagerTest.java | 46 ++ 2 files changed, 46 insertions(+), 45 deletions(-) diff --git a/core/src/test/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManagerTest.scala b/core/src/test/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManagerTest.scala deleted file mode 100644 index 54d0aee9447..000 --- a/core/src/test/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManagerTest.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.log.remote - -import org.apache.kafka.server.log.remote.storage.{ClassLoaderAwareRemoteStorageManager, RemoteStorageManager} -import org.junit.jupiter.api.Test -import org.mockito.Mockito.mock -import org.mockito.Mockito.when -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertNotEquals -import org.mockito.ArgumentMatchers.any - -import java.util.Collections - -class ClassLoaderAwareRemoteStorageManagerTest { - - @Test - def testWithClassLoader(): Unit = { -val dummyClassLoader = new DummyClassLoader() -val delegate = mock(classOf[RemoteStorageManager]) -val rsm = new ClassLoaderAwareRemoteStorageManager(delegate, dummyClassLoader) -when(delegate.configure(any())).thenAnswer(_ => - assertEquals(dummyClassLoader, Thread.currentThread().getContextClassLoader)) - -assertNotEquals(dummyClassLoader, Thread.currentThread().getContextClassLoader) -rsm.configure(Collections.emptyMap()) -assertNotEquals(dummyClassLoader, Thread.currentThread().getContextClassLoader) - } - - private class DummyClassLoader extends ClassLoader -} diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManagerTest.java new file mode 100644 index 000..f53a511b160 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManagerTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +public class ClassLoaderAwareRemoteStorageManagerTest { +@Test +public void testWithClassLoader() { +final DummyClassLoa
[kafka] branch trunk updated: KAFKA-14543: Move LogOffsetMetadata to storage module (#13038)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 8184ada6a5c KAFKA-14543: Move LogOffsetMetadata to storage module (#13038) 8184ada6a5c is described below commit 8184ada6a5c98cd637e8d323c56dbe6b20d582bd Author: Mickael Maison AuthorDate: Wed Dec 28 02:12:02 2022 +0100 KAFKA-14543: Move LogOffsetMetadata to storage module (#13038) Reviewers: Ismael Juma , dengziming , Satish Duggana , Federico Valeri --- core/src/main/scala/kafka/cluster/Partition.scala | 2 +- core/src/main/scala/kafka/cluster/Replica.scala| 6 +- core/src/main/scala/kafka/log/LocalLog.scala | 6 +- core/src/main/scala/kafka/log/LogLoader.scala | 5 +- core/src/main/scala/kafka/log/LogSegment.scala | 6 +- .../scala/kafka/log/ProducerStateManager.scala | 8 +- core/src/main/scala/kafka/log/UnifiedLog.scala | 34 +++ .../main/scala/kafka/raft/KafkaMetadataLog.scala | 2 +- .../src/main/scala/kafka/server/DelayedFetch.scala | 3 +- .../main/scala/kafka/server/FetchDataInfo.scala| 3 +- .../scala/kafka/server/LogOffsetMetadata.scala | 84 - .../main/scala/kafka/server/ReplicaManager.scala | 10 +- .../kafka/server/DelayedFetchTest.scala| 11 ++- .../scala/unit/kafka/cluster/ReplicaTest.scala | 4 +- .../group/GroupMetadataManagerTest.scala | 12 +-- .../TransactionCoordinatorConcurrencyTest.scala| 5 +- .../transaction/TransactionStateManagerTest.scala | 12 +-- .../test/scala/unit/kafka/log/LocalLogTest.scala | 4 +- .../unit/kafka/log/ProducerStateManagerTest.scala | 31 +++ .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 14 +-- .../kafka/server/AbstractFetcherThreadTest.scala | 3 +- .../unit/kafka/server/IsrExpirationTest.scala | 14 +-- .../kafka/server/ReplicaManagerQuotasTest.scala| 24 ++--- .../unit/kafka/server/ReplicaManagerTest.scala | 4 +- .../UpdateFollowerFetchStateBenchmark.java | 2 +- .../server/log/internals/LogOffsetMetadata.java| 103 + 26 files changed, 217 insertions(+), 195 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index c4df79219a4..fe8792189ba 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -44,7 +44,7 @@ import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.server.log.internals.AppendOrigin +import org.apache.kafka.server.log.internals.{AppendOrigin, LogOffsetMetadata} import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 0321488af4d..7a60632a54b 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -18,9 +18,9 @@ package kafka.cluster import kafka.log.UnifiedLog -import kafka.server.LogOffsetMetadata import kafka.utils.Logging import org.apache.kafka.common.TopicPartition +import org.apache.kafka.server.log.internals.LogOffsetMetadata import java.util.concurrent.atomic.AtomicReference @@ -69,7 +69,7 @@ case class ReplicaState( object ReplicaState { val Empty: ReplicaState = ReplicaState( -logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata, +logEndOffsetMetadata = LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, logStartOffset = UnifiedLog.UnknownOffset, lastFetchLeaderLogEndOffset = 0L, lastFetchTimeMs = 0L, @@ -139,7 +139,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log if (isNewLeader) { ReplicaState( logStartOffset = UnifiedLog.UnknownOffset, - logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata, + logEndOffsetMetadata = LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, lastFetchLeaderLogEndOffset = UnifiedLog.UnknownOffset, lastFetchTimeMs = 0L, lastCaughtUpTimeMs = lastCaughtUpTimeMs diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index 10d5217069b..b62fead3b91 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -23,14 +23,14 @@ import java.text.NumberFormat import java.util.concurrent.atomic.AtomicLong import java.util.regex.Pattern import kafka.metrics.KafkaMetricsGroup -import kafka.server.{FetchDataInfo, LogOffsetMetadata} +import kafka.server.FetchDataInfo import
[kafka] branch trunk updated: MINOR: Remove unused config variables (#13047)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f67627efc8e MINOR: Remove unused config variables (#13047) f67627efc8e is described below commit f67627efc8ecba6f14edf1bfa788352e71f70f6c Author: Ismael Juma AuthorDate: Tue Dec 27 14:12:30 2022 -0800 MINOR: Remove unused config variables (#13047) ` MaxIdMapSnapshots` and `ControllerMessageQueueSize` never had any use. `TransactionalIdExpirationMsDefault` is a duplicate of `TransactionalIdExpirationMs` (the latter is used). Also move `DeleteTopicEnable` and `CompressionType` to the right section. Reviewers: Ron Dagostino --- core/src/main/scala/kafka/log/LogConfig.scala | 1 - core/src/main/scala/kafka/server/KafkaConfig.scala | 13 ++--- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 66743b212e1..a4d489bda45 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -71,7 +71,6 @@ object Defaults { val MessageTimestampDifferenceMaxMs = kafka.server.Defaults.LogMessageTimestampDifferenceMaxMs val LeaderReplicationThrottledReplicas = Collections.emptyList[String]() val FollowerReplicationThrottledReplicas = Collections.emptyList[String]() - val MaxIdMapSnapshots = kafka.server.Defaults.MaxIdMapSnapshots val MessageDownConversionEnable = kafka.server.Defaults.MessageDownConversionEnable } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index d0da5c6eb6d..13839bb88ba 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -84,6 +84,7 @@ object Defaults { val MetadataSnapshotMaxIntervalMs = TimeUnit.HOURS.toMillis(1); val MetadataMaxIdleIntervalMs = 500 val MetadataMaxRetentionBytes = 100 * 1024 * 1024 + val DeleteTopicEnable = true /** KRaft mode configs */ val EmptyNodeId: Int = -1 @@ -154,10 +155,10 @@ object Defaults { val AutoCreateTopicsEnable = true val MinInSyncReplicas = 1 val MessageDownConversionEnable = true + val CompressionType = BrokerCompressionType.PRODUCER.name /** * Replication configuration ***/ val ControllerSocketTimeoutMs = RequestTimeoutMs - val ControllerMessageQueueSize = Int.MaxValue val DefaultReplicationFactor = 1 val ReplicaLagTimeMaxMs = 3L val ReplicaSocketTimeoutMs = 30 * 1000 @@ -230,14 +231,6 @@ object Defaults { val NumControllerQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples val ControllerQuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds - /** * Transaction Configuration ***/ - val TransactionalIdExpirationMsDefault = 60480 - - val DeleteTopicEnable = true - - val CompressionType = BrokerCompressionType.PRODUCER.name - - val MaxIdMapSnapshots = 2 /** * Kafka Metrics Configuration ***/ val MetricNumSamples = 2 val MetricSampleWindowMs = 3 @@ -483,7 +476,6 @@ object KafkaConfig { val LogMessageTimestampTypeProp = LogConfigPrefix + "message.timestamp.type" val LogMessageTimestampDifferenceMaxMsProp = LogConfigPrefix + "message.timestamp.difference.max.ms" - val LogMaxIdMapSnapshotsProp = LogConfigPrefix + "max.id.map.snapshots" val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir" val AutoCreateTopicsEnableProp = "auto.create.topics.enable" val MinInSyncReplicasProp = "min.insync.replicas" @@ -914,7 +906,6 @@ object KafkaConfig { /** * Replication configuration ***/ val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels" - val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels" val DefaultReplicationFactorDoc = "The default replication factors for automatically created topics" val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time," + " the leader will remove the follower from isr"
[kafka] branch trunk updated (06af8fc6309 -> 871289c5c4b)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 06af8fc6309 KAFKA-14549: Move LogDirFailureChannel to storage module (#13041) add 871289c5c4b KAFKA-14476: Move OffsetMap and related to storage module (#13042) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/common/utils/ByteUtils.java | 10 + .../apache/kafka/common/utils/ByteUtilsTest.java | 13 ++ core/src/main/scala/kafka/log/LogCleaner.scala | 8 +- core/src/main/scala/kafka/log/OffsetMap.scala | 201 - core/src/main/scala/kafka/utils/CoreUtils.scala| 10 - .../test/scala/unit/kafka/log/LogCleanerTest.scala | 2 +- .../test/scala/unit/kafka/log/OffsetMapTest.scala | 2 +- .../scala/unit/kafka/utils/CoreUtilsTest.scala | 10 - .../{TxnIndexSearchResult.java => OffsetMap.java} | 21 +- .../server/log/internals/SkimpyOffsetMap.java | 242 + 10 files changed, 283 insertions(+), 236 deletions(-) delete mode 100755 core/src/main/scala/kafka/log/OffsetMap.scala mode change 100755 => 100644 core/src/test/scala/unit/kafka/log/LogCleanerTest.scala copy storage/src/main/java/org/apache/kafka/server/log/internals/{TxnIndexSearchResult.java => OffsetMap.java} (67%) create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/SkimpyOffsetMap.java
[kafka] branch trunk updated: KAFKA-14549: Move LogDirFailureChannel to storage module (#13041)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 06af8fc6309 KAFKA-14549: Move LogDirFailureChannel to storage module (#13041) 06af8fc6309 is described below commit 06af8fc6309db3b917929afaccb427f8b90c7004 Author: Federico Valeri AuthorDate: Fri Dec 23 16:13:43 2022 +0100 KAFKA-14549: Move LogDirFailureChannel to storage module (#13041) For broader context on this change, please check: * KAFKA-14470: Move log layer to storage module Reviewers: dengziming , Mickael Maison , Satish Duggana , Ismael Juma --- .../kafka/server/builders/LogManagerBuilder.java | 2 +- .../server/builders/ReplicaManagerBuilder.java | 2 +- core/src/main/scala/kafka/log/LocalLog.scala | 4 +- core/src/main/scala/kafka/log/LogCleaner.scala | 4 +- .../main/scala/kafka/log/LogCleanerManager.scala | 2 +- core/src/main/scala/kafka/log/LogLoader.scala | 4 +- core/src/main/scala/kafka/log/LogManager.scala | 1 + core/src/main/scala/kafka/log/UnifiedLog.scala | 4 +- .../main/scala/kafka/raft/KafkaMetadataLog.scala | 4 +- .../src/main/scala/kafka/server/BrokerServer.scala | 1 + core/src/main/scala/kafka/server/KafkaServer.scala | 1 + .../scala/kafka/server/LogDirFailureChannel.scala | 62 - .../scala/kafka/server/PartitionMetadataFile.scala | 2 +- .../main/scala/kafka/server/ReplicaManager.scala | 2 +- .../CheckpointFileWithFailureHandler.scala | 2 +- .../checkpoints/LeaderEpochCheckpointFile.scala| 2 +- .../server/checkpoints/OffsetCheckpointFile.scala | 2 +- .../kafka/server/LocalLeaderEndPointTest.scala | 2 +- .../src/test/scala/other/kafka/StressTestLog.scala | 3 +- .../scala/other/kafka/TestLinearWriteSpeed.scala | 3 +- .../unit/kafka/cluster/PartitionLockTest.scala | 2 +- .../scala/unit/kafka/cluster/PartitionTest.scala | 2 +- .../log/AbstractLogCleanerIntegrationTest.scala| 3 +- .../unit/kafka/log/BrokerCompressionTest.scala | 3 +- .../test/scala/unit/kafka/log/LocalLogTest.scala | 3 +- .../unit/kafka/log/LogCleanerManagerTest.scala | 4 +- .../test/scala/unit/kafka/log/LogCleanerTest.scala | 4 +- .../scala/unit/kafka/log/LogConcurrencyTest.scala | 3 +- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 4 +- .../test/scala/unit/kafka/log/LogManagerTest.scala | 3 +- .../test/scala/unit/kafka/log/LogTestUtils.scala | 3 +- .../server/HighwatermarkPersistenceTest.scala | 1 + .../unit/kafka/server/IsrExpirationTest.scala | 1 + .../server/ReplicaManagerConcurrencyTest.scala | 2 +- .../kafka/server/ReplicaManagerQuotasTest.scala| 1 + .../unit/kafka/server/ReplicaManagerTest.scala | 2 +- ...ffsetCheckpointFileWithFailureHandlerTest.scala | 2 +- .../server/epoch/OffsetsForLeaderEpochTest.scala | 1 + .../unit/kafka/tools/DumpLogSegmentsTest.scala | 4 +- .../scala/unit/kafka/utils/SchedulerTest.scala | 3 +- .../test/scala/unit/kafka/utils/TestUtils.scala| 1 + .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 2 +- .../partition/PartitionMakeFollowerBenchmark.java | 2 +- .../UpdateFollowerFetchStateBenchmark.java | 2 +- .../apache/kafka/jmh/server/CheckpointBench.java | 2 +- .../kafka/jmh/server/PartitionCreationBench.java | 2 +- .../server/log/internals/LogDirFailureChannel.java | 77 ++ 47 files changed, 140 insertions(+), 108 deletions(-) diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java index de43d55203f..2e5e293b120 100644 --- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java @@ -22,11 +22,11 @@ import kafka.log.LogConfig; import kafka.log.LogManager; import kafka.log.ProducerStateManagerConfig; import kafka.server.BrokerTopicStats; -import kafka.server.LogDirFailureChannel; import kafka.server.metadata.ConfigRepository; import kafka.utils.Scheduler; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.log.internals.LogDirFailureChannel; import scala.collection.JavaConverters; import java.io.File; diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index e6b46f41c7e..bc7ebd8a2e7 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -27,7 +27,6 @@ import kafka.server.DelayedFetch; import kafka.server.DelayedOperationPurgatory; import kafka.server.DelayedProduce; import kafka.server.KafkaConfig
[kafka] branch trunk updated (ca15735fa7b -> e8232edd24c)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from ca15735fa7b MINOR: remove onChange call in stream assignor assign method (#13034) add e8232edd24c KAFKA-14477: Move LogValidator and related to storage module (#13012) No new revisions were added by this update. Summary of changes: build.gradle |1 + checkstyle/suppressions.xml| 10 +- .../apache/kafka/common/utils/PrimitiveRef.java| 23 + .../kafka/common/utils/PrimitiveRefTest.java | 24 +- core/src/main/scala/kafka/cluster/Partition.scala |1 + .../kafka/common/RecordValidationException.scala | 28 - .../coordinator/group/GroupMetadataManager.scala |7 +- .../transaction/TransactionStateManager.scala |8 +- core/src/main/scala/kafka/log/LogSegment.scala |4 +- core/src/main/scala/kafka/log/LogValidator.scala | 592 -- .../scala/kafka/log/ProducerStateManager.scala |8 +- core/src/main/scala/kafka/log/UnifiedLog.scala | 79 +- .../main/scala/kafka/raft/KafkaMetadataLog.scala |5 +- core/src/main/scala/kafka/server/KafkaApis.scala |6 +- .../main/scala/kafka/server/ReplicaManager.scala |4 +- .../kafka/server/LocalLeaderEndPointTest.scala |6 +- .../unit/kafka/cluster/PartitionLockTest.scala |3 +- .../scala/unit/kafka/cluster/PartitionTest.scala | 13 +- .../AbstractCoordinatorConcurrencyTest.scala |4 +- .../coordinator/group/GroupCoordinatorTest.scala | 10 +- .../group/GroupMetadataManagerTest.scala | 45 +- .../transaction/TransactionStateManagerTest.scala | 14 +- .../unit/kafka/log/LogCleanerManagerTest.scala |4 +- .../test/scala/unit/kafka/log/LogCleanerTest.scala | 66 +- .../test/scala/unit/kafka/log/LogTestUtils.scala |4 +- .../scala/unit/kafka/log/LogValidatorTest.scala| 1246 ++-- .../unit/kafka/log/ProducerStateManagerTest.scala | 40 +- .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 22 +- .../scala/unit/kafka/server/KafkaApisTest.scala| 10 +- .../server/ReplicaManagerConcurrencyTest.scala |5 +- .../unit/kafka/server/ReplicaManagerTest.scala | 17 +- .../unit/kafka/tools/DumpLogSegmentsTest.scala |6 +- gradle/spotbugs-exclude.xml|7 + .../kafka/jmh/record/BaseRecordBatchBenchmark.java |5 +- .../CompressedRecordBatchValidationBenchmark.java | 21 +- ...UncompressedRecordBatchValidationBenchmark.java | 17 +- .../kafka/server/log/internals/AppendOrigin.java | 45 + .../kafka/server/log/internals/LogValidator.java | 631 ++ ...hResult.java => RecordValidationException.java} | 23 +- 39 files changed, 1603 insertions(+), 1461 deletions(-) copy server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionValidatorTest.java => clients/src/test/java/org/apache/kafka/common/utils/PrimitiveRefTest.java (61%) delete mode 100644 core/src/main/scala/kafka/common/RecordValidationException.scala delete mode 100644 core/src/main/scala/kafka/log/LogValidator.scala create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/AppendOrigin.java create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/LogValidator.java copy storage/src/main/java/org/apache/kafka/server/log/internals/{TxnIndexSearchResult.java => RecordValidationException.java} (56%)
[kafka] branch trunk updated: MINOR: Avoid unnecessary allocations in index binary search (#13024)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new aad5b0a4633 MINOR: Avoid unnecessary allocations in index binary search (#13024) aad5b0a4633 is described below commit aad5b0a463306701276b7a48efd1ccf0cee0ad62 Author: Ismael Juma AuthorDate: Wed Dec 21 05:10:44 2022 -0800 MINOR: Avoid unnecessary allocations in index binary search (#13024) * MINOR: Avoid unnecessary allocations in index binary search * Fix bug due to inverse usage of SearchType. Also improve name clarity. --- .../kafka/server/log/internals/AbstractIndex.java | 57 +- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java index 36e7e50be07..d5a510c94ca 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java @@ -40,14 +40,8 @@ import java.util.concurrent.locks.ReentrantLock; */ public abstract class AbstractIndex implements Closeable { -private static class BinarySearchResult { -public final int largestLowerBound; -public final int smallestUpperBound; - -private BinarySearchResult(int largestLowerBound, int smallestUpperBound) { -this.largestLowerBound = largestLowerBound; -this.smallestUpperBound = smallestUpperBound; -} +private enum SearchResultType { +LARGEST_LOWER_BOUND, SMALLEST_UPPER_BOUND } private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class); @@ -447,14 +441,14 @@ public abstract class AbstractIndex implements Closeable { * @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty */ protected int largestLowerBoundSlotFor(ByteBuffer idx, long target, IndexSearchType searchEntity) { -return indexSlotRangeFor(idx, target, searchEntity).largestLowerBound; +return indexSlotRangeFor(idx, target, searchEntity, SearchResultType.LARGEST_LOWER_BOUND); } /** * Find the smallest entry greater than or equal the target key or value. If none can be found, -1 is returned. */ protected int smallestUpperBoundSlotFor(ByteBuffer idx, long target, IndexSearchType searchEntity) { -return indexSlotRangeFor(idx, target, searchEntity).smallestUpperBound; +return indexSlotRangeFor(idx, target, searchEntity, SearchResultType.SMALLEST_UPPER_BOUND); } /** @@ -484,27 +478,36 @@ public abstract class AbstractIndex implements Closeable { } /** - * Lookup lower and upper bounds for the given target. + * Lookup lower or upper bounds for the given target. */ -private BinarySearchResult indexSlotRangeFor(ByteBuffer idx, long target, IndexSearchType searchEntity) { +private int indexSlotRangeFor(ByteBuffer idx, long target, IndexSearchType searchEntity, + SearchResultType searchResultType) { // check if the index is empty if (entries == 0) -return new BinarySearchResult(-1, -1); +return -1; int firstHotEntry = Math.max(0, entries - 1 - warmEntries()); // check if the target offset is in the warm section of the index if (compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) { -return binarySearch(idx, target, searchEntity, firstHotEntry, entries - 1); +return binarySearch(idx, target, searchEntity, +searchResultType, firstHotEntry, entries - 1); } // check if the target offset is smaller than the least offset -if (compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) -return new BinarySearchResult(-1, 0); +if (compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) { +switch (searchResultType) { +case LARGEST_LOWER_BOUND: +return -1; +case SMALLEST_UPPER_BOUND: +return 0; +} +} -return binarySearch(idx, target, searchEntity, 0, firstHotEntry); +return binarySearch(idx, target, searchEntity, searchResultType, 0, firstHotEntry); } -private BinarySearchResult binarySearch(ByteBuffer idx, long target, IndexSearchType searchEntity, int begin, int end) { +private int binarySearch(ByteBuffer idx, long target, IndexSearchType searchEntity, + SearchResultType searchResultType, int begin, int end) { // binary search for
[kafka] branch trunk updated: KAFKA-14475: Move TimeIndex/LazyIndex to storage module (#13010)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new c4f10364cda KAFKA-14475: Move TimeIndex/LazyIndex to storage module (#13010) c4f10364cda is described below commit c4f10364cda6435f88688b1bfd56c5c52448f821 Author: Ismael Juma AuthorDate: Tue Dec 20 19:08:40 2022 -0800 KAFKA-14475: Move TimeIndex/LazyIndex to storage module (#13010) For broader context on this change, please check: * KAFKA-14470: Move log layer to storage module Reviewers: Jun Rao , Satish Duggana --- core/src/main/scala/kafka/log/LazyIndex.scala | 166 core/src/main/scala/kafka/log/LogSegment.scala | 13 +- core/src/main/scala/kafka/log/TimeIndex.scala | 228 .../scala/kafka/log/remote/RemoteIndexCache.scala | 12 +- .../main/scala/kafka/tools/DumpLogSegments.scala | 4 +- .../test/scala/unit/kafka/log/LogTestUtils.scala | 6 +- .../test/scala/unit/kafka/log/TimeIndexTest.scala | 6 +- .../kafka/log/remote/RemoteIndexCacheTest.scala| 8 +- .../kafka/log/remote/RemoteLogManagerTest.scala| 6 +- .../kafka/server/log/internals/LazyIndex.java | 245 ++ .../kafka/server/log/internals/TimeIndex.java | 286 + 11 files changed, 558 insertions(+), 422 deletions(-) diff --git a/core/src/main/scala/kafka/log/LazyIndex.scala b/core/src/main/scala/kafka/log/LazyIndex.scala deleted file mode 100644 index 6725d034a1b..000 --- a/core/src/main/scala/kafka/log/LazyIndex.scala +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import java.io.File -import java.nio.file.{Files, NoSuchFileException} -import java.util.concurrent.locks.ReentrantLock -import LazyIndex._ -import kafka.utils.CoreUtils.inLock -import kafka.utils.threadsafe -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.log.internals.{AbstractIndex, OffsetIndex} - -/** - * A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading - * (i.e. memory mapping) the underlying index until it is accessed for the first time via the - * `get` method. - * - * In addition, this class exposes a number of methods (e.g. updateParentDir, renameTo, close, - * etc.) that provide the desired behavior without causing the index to be loaded. If the index - * had previously been loaded, the methods in this class simply delegate to the relevant method in - * the index. - * - * This is an important optimization with regards to broker start-up and shutdown time if it has a - * large number of segments. - * - * Methods of this class are thread safe. Make sure to check `AbstractIndex` subclasses - * documentation to establish their thread safety. - * - * @param loadIndex A function that takes a `File` pointing to an index and returns a loaded - * `AbstractIndex` instance. - */ -@threadsafe -class LazyIndex[T <: AbstractIndex] private (@volatile private var indexWrapper: IndexWrapper, loadIndex: File => T) { - - private val lock = new ReentrantLock() - - def file: File = indexWrapper.file - - def get: T = { -indexWrapper match { - case indexValue: IndexValue[_] => indexValue.index.asInstanceOf[T] - case _: IndexFile => -inLock(lock) { - indexWrapper match { -case indexValue: IndexValue[_] => indexValue.index.asInstanceOf[T] -case indexFile: IndexFile => - val indexValue = new IndexValue(loadIndex(indexFile.file)) - indexWrapper = indexValue - indexValue.index - } -} -} - } - - def updateParentDir(parentDir: File): Unit = { -inLock(lock) { - indexWrapper.updateParentDir(parentDir) -} - } - - def renameTo(f: File): Unit = { -inLock(lock) { - indexWrapper.renameTo(f) -} - } - - def deleteIfExists(): Boolean = { -inLock(lock) { - indexWrapper.deleteIfExists() -} - } - -
[kafka] branch trunk updated (82c9216c775 -> 7b634c7034c)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 82c9216c775 KAFKA-14474: Move OffsetIndex to storage module (#13009) add 7b634c7034c KAFKA-14521: Replace BrokerCompressionCodec with BrokerCompressionType (#13011) No new revisions were added by this update. Summary of changes: .../kafka/coordinator/group/GroupCoordinator.scala | 6 +- .../coordinator/group/GroupMetadataManager.scala | 2 +- .../kafka/coordinator/group/OffsetConfig.scala | 8 +- .../transaction/TransactionStateManager.scala | 5 +- core/src/main/scala/kafka/log/LogConfig.scala | 6 +- core/src/main/scala/kafka/log/LogValidator.scala | 40 +++-- core/src/main/scala/kafka/log/UnifiedLog.scala | 32 ++-- .../scala/kafka/message/CompressionCodec.scala | 108 - core/src/main/scala/kafka/server/KafkaApis.scala | 4 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 16 +- .../main/scala/kafka/tools/ConsoleProducer.scala | 9 +- .../scala/kafka/utils/VerifiableProperties.scala | 19 --- .../server/DynamicBrokerReconfigurationTest.scala | 4 +- .../scala/other/kafka/TestLinearWriteSpeed.scala | 6 +- .../group/GroupMetadataManagerTest.scala | 2 +- .../unit/kafka/log/BrokerCompressionTest.scala | 21 +-- .../scala/unit/kafka/log/LogValidatorTest.scala| 173 ++--- .../kafka/server/AbstractFetcherThreadTest.scala | 5 +- .../scala/unit/kafka/server/FetchRequestTest.scala | 12 +- .../scala/unit/kafka/server/KafkaConfigTest.scala | 9 +- .../unit/kafka/server/ProduceRequestTest.scala | 5 +- .../CompressedRecordBatchValidationBenchmark.java | 5 +- .../kafka/server/record/BrokerCompressionType.java | 85 ++ .../server/record/BrokerCompressionTypeTest.java | 31 ++-- 24 files changed, 277 insertions(+), 336 deletions(-) delete mode 100644 core/src/main/scala/kafka/message/CompressionCodec.scala create mode 100644 server-common/src/main/java/org/apache/kafka/server/record/BrokerCompressionType.java copy clients/src/test/java/org/apache/kafka/common/config/ConfigResourceTest.java => server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java (50%)
[kafka] branch trunk updated (44b3177a087 -> 82c9216c775)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 44b3177a087 KAFKA-14457; Controller metrics should only expose committed data (#12994) add 82c9216c775 KAFKA-14474: Move OffsetIndex to storage module (#13009) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/log/LazyIndex.scala | 2 +- core/src/main/scala/kafka/log/LocalLog.scala | 2 +- core/src/main/scala/kafka/log/LogSegment.scala | 5 +- core/src/main/scala/kafka/log/OffsetIndex.scala| 207 - .../scala/kafka/log/remote/RemoteIndexCache.scala | 2 +- .../main/scala/kafka/tools/DumpLogSegments.scala | 6 +- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 2 +- .../scala/unit/kafka/log/OffsetIndexTest.scala | 34 +-- .../kafka/log/remote/RemoteIndexCacheTest.scala| 6 +- .../kafka/log/remote/RemoteLogManagerTest.scala| 7 +- .../kafka/server/log/internals/OffsetIndex.java| 246 + 11 files changed, 280 insertions(+), 239 deletions(-) delete mode 100755 core/src/main/scala/kafka/log/OffsetIndex.scala create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/OffsetIndex.java
[kafka] branch trunk updated (26fcf73feb1 -> d521f8110ec)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 26fcf73feb1 MINOR: Use `withClassLoader` in a couple more places (#13018) add d521f8110ec KAFKA-14473: Move AbstractIndex to storage module (#13007) No new revisions were added by this update. Summary of changes: .../common/IndexOffsetOverflowException.scala | 25 - core/src/main/scala/kafka/log/AbstractIndex.scala | 440 core/src/main/scala/kafka/log/LazyIndex.scala | 2 +- core/src/main/scala/kafka/log/OffsetIndex.scala| 35 +- core/src/main/scala/kafka/log/TimeIndex.scala | 39 +- gradle/spotbugs-exclude.xml| 7 + .../kafka/server/log/internals/AbstractIndex.java | 552 + .../internals/IndexOffsetOverflowException.java| 12 +- .../{IndexEntry.java => IndexSearchType.java} | 5 +- 9 files changed, 603 insertions(+), 514 deletions(-) delete mode 100644 core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala delete mode 100644 core/src/main/scala/kafka/log/AbstractIndex.scala create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java copy clients/src/main/java/org/apache/kafka/common/protocol/types/SchemaException.java => storage/src/main/java/org/apache/kafka/server/log/internals/IndexOffsetOverflowException.java (72%) copy storage/src/main/java/org/apache/kafka/server/log/internals/{IndexEntry.java => IndexSearchType.java} (91%)
[kafka] branch trunk updated: KAFKA-14472: Move TransactionIndex and related to storage module (#12996)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e2678d57d09 KAFKA-14472: Move TransactionIndex and related to storage module (#12996) e2678d57d09 is described below commit e2678d57d0919aaa97effe2ee7591cd6b85b5303 Author: Ismael Juma AuthorDate: Mon Dec 19 11:31:37 2022 -0800 KAFKA-14472: Move TransactionIndex and related to storage module (#12996) For broader context on this change, please check: * KAFKA-14470: Move log layer to storage module Reviewers: Jun Rao , Satish Duggana --- core/src/main/scala/kafka/log/LocalLog.scala | 10 +- core/src/main/scala/kafka/log/LogCleaner.scala | 3 +- core/src/main/scala/kafka/log/LogSegment.scala | 2 +- .../scala/kafka/log/ProducerStateManager.scala | 3 +- .../main/scala/kafka/log/TransactionIndex.scala| 264 - core/src/main/scala/kafka/log/UnifiedLog.scala | 21 +- .../scala/kafka/log/remote/RemoteIndexCache.scala | 2 +- .../main/scala/kafka/tools/DumpLogSegments.scala | 3 +- .../test/scala/unit/kafka/log/LogCleanerTest.scala | 5 +- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 1 + .../test/scala/unit/kafka/log/LogSegmentTest.scala | 4 +- .../test/scala/unit/kafka/log/LogTestUtils.scala | 6 +- .../unit/kafka/log/ProducerStateManagerTest.scala | 9 +- .../unit/kafka/log/TransactionIndexTest.scala | 80 --- .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 1 + .../kafka/server/log/internals/AbortedTxn.java | 117 + .../kafka/server/log/internals/CompletedTxn.java | 75 ++ .../server/log/internals/TransactionIndex.java | 264 + .../server/log/internals/TxnIndexSearchResult.java | 30 +++ 19 files changed, 556 insertions(+), 344 deletions(-) diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index 5617251..68bb9d9f8b0 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeEx import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.server.log.internals.OffsetPosition +import org.apache.kafka.server.log.internals.{AbortedTxn, OffsetPosition} import scala.jdk.CollectionConverters._ import scala.collection.{Seq, immutable} @@ -448,7 +448,7 @@ class LocalLog(@volatile private var _dir: File, } val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction] -def accumulator(abortedTxns: List[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction) +def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction) collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator) FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata, @@ -459,13 +459,13 @@ class LocalLog(@volatile private var _dir: File, private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long, startingSegment: LogSegment, - accumulator: List[AbortedTxn] => Unit): Unit = { + accumulator: Seq[AbortedTxn] => Unit): Unit = { val higherSegments = segments.higherSegments(startingSegment.baseOffset).iterator var segmentEntryOpt = Option(startingSegment) while (segmentEntryOpt.isDefined) { val segment = segmentEntryOpt.get val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset) - accumulator(searchResult.abortedTransactions) + accumulator(searchResult.abortedTransactions.asScala) if (searchResult.isComplete) return segmentEntryOpt = nextOption(higherSegments) @@ -475,7 +475,7 @@ class LocalLog(@volatile private var _dir: File, private[log] def collectAbortedTransactions(logStartOffset: Long, baseOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { val segmentEntry = segments.floorSegment(baseOffset) val allAbortedTxns = ListBuffer.empty[AbortedTxn] -def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns +def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns segmentEntry.foreach(segment => collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, accumulator)) allAbortedTxns.toList } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogClea
[kafka] branch trunk updated: KAFKA-14466: Move ClassloaderAwareRemoteStorageManager to storage module (#13013)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e3cb2deff6e KAFKA-14466: Move ClassloaderAwareRemoteStorageManager to storage module (#13013) e3cb2deff6e is described below commit e3cb2deff6e983aeb1744d4403af4b3c9aa85e57 Author: Satish Duggana AuthorDate: Mon Dec 19 20:07:33 2022 +0530 KAFKA-14466: Move ClassloaderAwareRemoteStorageManager to storage module (#13013) Reviewers: Ismael Juma --- .../ClassLoaderAwareRemoteStorageManager.scala | 76 --- .../scala/kafka/log/remote/RemoteLogManager.scala | 2 +- .../ClassLoaderAwareRemoteStorageManagerTest.scala | 2 +- .../kafka/log/remote/RemoteLogManagerTest.scala| 10 +- .../kafka/server/log/internals/StorageAction.java | 28 ++ .../ClassLoaderAwareRemoteLogMetadataManager.java | 28 ++ .../ClassLoaderAwareRemoteStorageManager.java | 103 + 7 files changed, 145 insertions(+), 104 deletions(-) diff --git a/core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala b/core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala deleted file mode 100644 index d35c70ed85e..000 --- a/core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.log.remote - -import org.apache.kafka.server.log.remote.storage.{LogSegmentData, RemoteLogSegmentMetadata, RemoteStorageManager} - -import java.io.InputStream -import java.util - -/** - * A wrapper class of RemoteStorageManager that sets the context class loader when calling RSM methods. - */ -class ClassLoaderAwareRemoteStorageManager(val rsm: RemoteStorageManager, - val rsmClassLoader: ClassLoader) extends RemoteStorageManager { - - def withClassLoader[T](fun: => T): T = { -val originalClassLoader = Thread.currentThread.getContextClassLoader -Thread.currentThread.setContextClassLoader(rsmClassLoader) -try { - fun -} finally { - Thread.currentThread.setContextClassLoader(originalClassLoader) -} - } - - def delegate(): RemoteStorageManager = { -rsm - } - - override def close(): Unit = withClassLoader { -rsm.close() - } - - override def configure(configs: util.Map[String, _]): Unit = withClassLoader { -rsm.configure(configs) - } - - override def copyLogSegmentData(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, - logSegmentData: LogSegmentData): Unit = withClassLoader { -rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData) - } - - override def fetchLogSegment(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, - startPosition: Int): InputStream = withClassLoader { -rsm.fetchLogSegment(remoteLogSegmentMetadata, startPosition) - } - - override def fetchLogSegment(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, - startPosition: Int, - endPosition: Int): InputStream = withClassLoader { -rsm.fetchLogSegment(remoteLogSegmentMetadata, startPosition, endPosition) - } - - override def fetchIndex(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, - indexType: RemoteStorageManager.IndexType): InputStream = withClassLoader { -rsm.fetchIndex(remoteLogSegmentMetadata, indexType) - } - - override def deleteLogSegmentData(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Unit = withClassLoader { -rsm.deleteLogSegmentData(remoteLogSegmentMetadata) - } -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala b/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala index 6558094842a..8324028c5ed 100644 --- a/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala +++ b/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala @
[kafka] branch trunk updated (95dc9d9eede -> c0b28fde66f)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 95dc9d9eede Move IndexEntry and related to storage module (#12993) add c0b28fde66f MINOR: Use INFO logging for tools and trogdor tests (#13006) No new revisions were added by this update. Summary of changes: tools/src/test/resources/log4j.properties | 2 +- trogdor/src/test/resources/log4j.properties | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-)
[kafka] branch trunk updated: Move IndexEntry and related to storage module (#12993)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 95dc9d9eede Move IndexEntry and related to storage module (#12993) 95dc9d9eede is described below commit 95dc9d9eede40deb303c9c2b3365bfb0abdd3330 Author: Ismael Juma AuthorDate: Sat Dec 17 10:07:11 2022 -0800 Move IndexEntry and related to storage module (#12993) For broader context on this change, please check: * KAFKA-14470: Move log layer to storage module Reviewers: dengziming --- core/src/main/scala/kafka/log/AbstractIndex.scala | 2 +- core/src/main/scala/kafka/log/IndexEntry.scala | 52 core/src/main/scala/kafka/log/LocalLog.scala | 3 +- core/src/main/scala/kafka/log/LogLoader.scala | 1 + core/src/main/scala/kafka/log/LogSegment.scala | 15 ++--- core/src/main/scala/kafka/log/OffsetIndex.scala| 8 +-- core/src/main/scala/kafka/log/TimeIndex.scala | 10 ++-- .../main/scala/kafka/log/TransactionIndex.scala| 1 + .../scala/unit/kafka/log/OffsetIndexTest.scala | 40 ++--- .../test/scala/unit/kafka/log/TimeIndexTest.scala | 18 +++--- .../unit/kafka/log/TransactionIndexTest.scala | 1 + .../log/internals/CorruptIndexException.java | 15 +++-- .../kafka/server/log/internals/IndexEntry.java | 14 +++-- .../kafka/server/log/internals/OffsetPosition.java | 70 ++ .../server/log/internals/TimestampOffset.java | 70 ++ 15 files changed, 209 insertions(+), 111 deletions(-) diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index 37cd4b9f55c..1c8032115d8 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -22,11 +22,11 @@ import java.nio.channels.FileChannel import java.nio.file.Files import java.nio.{ByteBuffer, MappedByteBuffer} import java.util.concurrent.locks.{Lock, ReentrantLock} - import kafka.common.IndexOffsetOverflowException import kafka.utils.CoreUtils.inLock import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem, Utils} +import org.apache.kafka.server.log.internals.IndexEntry /** * The abstract index class which holds entry format agnostic methods. diff --git a/core/src/main/scala/kafka/log/IndexEntry.scala b/core/src/main/scala/kafka/log/IndexEntry.scala deleted file mode 100644 index 705366e32f5..000 --- a/core/src/main/scala/kafka/log/IndexEntry.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import org.apache.kafka.common.requests.ListOffsetsResponse - -sealed trait IndexEntry { - // We always use Long for both key and value to avoid boxing. - def indexKey: Long - def indexValue: Long -} - -/** - * The mapping between a logical log offset and the physical position - * in some log file of the beginning of the message set entry with the - * given offset. - */ -case class OffsetPosition(offset: Long, position: Int) extends IndexEntry { - override def indexKey = offset - override def indexValue = position.toLong -} - - -/** - * The mapping between a timestamp to a message offset. The entry means that any message whose timestamp is greater - * than that timestamp must be at or after that offset. - * @param timestamp The max timestamp before the given offset. - * @param offset The message offset. - */ -case class TimestampOffset(timestamp: Long, offset: Long) extends IndexEntry { - override def indexKey = timestamp - override def indexValue = offset -} - -object TimestampOffset { - val Unknown = TimestampOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, ListOffsetsResponse.UNKNOWN_OFFSET) -} diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index b0e7b0e446e..5617251 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -
[kafka] branch trunk updated (f9a09fdd294 -> 88725669e7c)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from f9a09fdd294 MINOR: Small refactor in DescribeGroupsResponse (#12970) add 88725669e7c MINOR: Move MetadataQuorumCommand from `core` to `tools` (#12951) No new revisions were added by this update. Summary of changes: bin/kafka-metadata-quorum.sh | 2 +- bin/windows/kafka-metatada-quorum.bat | 2 +- build.gradle | 7 +- checkstyle/import-control.xml | 1 + .../java/org/apache/kafka/common/utils/Utils.java | 14 +- .../org/apache/kafka/common/utils/UtilsTest.java | 7 + .../scala/kafka/admin/MetadataQuorumCommand.scala | 172 -- .../kafka/admin/MetadataQuorumCommandTest.scala| 192 .../org/apache/kafka/server/util/ToolsUtils.java | 19 +- .../apache/kafka/tools/MetadataQuorumCommand.java | 195 + .../org/apache/kafka/tools/TerseException.java | 15 +- .../apache/kafka/tools/TransactionsCommand.java| 42 ++--- .../tools/MetadataQuorumCommandErrorTest.java | 48 + .../kafka/tools/MetadataQuorumCommandTest.java | 161 + .../org/apache/kafka/tools/ToolsTestUtils.java | 51 ++ .../kafka/tools/TransactionsCommandTest.java | 12 +- 16 files changed, 530 insertions(+), 410 deletions(-) delete mode 100644 core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala delete mode 100644 core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala create mode 100644 tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java copy clients/src/main/java/org/apache/kafka/common/errors/GroupMaxSizeReachedException.java => tools/src/main/java/org/apache/kafka/tools/TerseException.java (67%) create mode 100644 tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandErrorTest.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
[kafka] branch trunk updated (fc7fe8f7654 -> 13c9c78a1f4)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from fc7fe8f7654 MINOR: Remove KafkaTimer (#12950) add 13c9c78a1f4 MINOR: Remove unnecessary scalac warnings suppression (#12953) No new revisions were added by this update. Summary of changes: build.gradle | 3 --- core/src/main/scala/kafka/log/LogConfig.scala | 4 ++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 1 - 3 files changed, 2 insertions(+), 6 deletions(-)
[kafka] branch trunk updated (8bb897655f1 -> fc7fe8f7654)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 8bb897655f1 MINOR: Optimize metric recording when quota check not required (#12933) add fc7fe8f7654 MINOR: Remove KafkaTimer (#12950) No new revisions were added by this update. Summary of changes: .../kafka/controller/ControllerEventManager.scala | 9 ++-- .../scala/kafka/controller/KafkaController.scala | 8 +-- core/src/main/scala/kafka/log/LogSegment.scala | 8 +-- core/src/main/scala/kafka/metrics/KafkaTimer.scala | 35 - .../scala/unit/kafka/metrics/KafkaTimerTest.scala | 59 -- 5 files changed, 15 insertions(+), 104 deletions(-) delete mode 100644 core/src/main/scala/kafka/metrics/KafkaTimer.scala delete mode 100644 core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
[kafka] branch trunk updated (93df7f5485e -> 6ae08c4ee82)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 93df7f5485e MINOR: Remove unused variables and other minor clean-ups (#12952) add 6ae08c4ee82 KAFKA-14256: Upgrade from Scala 2.13.8 to 2.13.10 (#12675) No new revisions were added by this update. Summary of changes: LICENSE-binary | 4 ++-- bin/kafka-run-class.sh | 2 +- bin/windows/kafka-run-class.bat | 2 +- build.gradle| 3 +++ core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala | 6 +++--- gradle.properties | 2 +- gradle/dependencies.gradle | 4 ++-- 7 files changed, 13 insertions(+), 10 deletions(-)
[kafka] branch trunk updated: MINOR: Remove unused variables and other minor clean-ups (#12952)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 93df7f5485e MINOR: Remove unused variables and other minor clean-ups (#12952) 93df7f5485e is described below commit 93df7f5485e8118a32341c722542fed54ce141fa Author: Ismael Juma AuthorDate: Sun Dec 4 20:52:33 2022 -0800 MINOR: Remove unused variables and other minor clean-ups (#12952) * Remove whitespace before package declaration * Avoid unnecessary postfix language usage Reviewers: Luke Chen --- .../kafka/admin/BrokerApiVersionsCommand.scala | 2 -- .../scala/kafka/admin/ConsumerGroupCommand.scala | 2 +- .../main/scala/kafka/admin/FeatureCommand.scala| 29 +- .../main/scala/kafka/network/RequestChannel.scala | 2 +- .../main/scala/kafka/utils/CommandLineUtils.scala | 2 +- .../scala/kafka/raft/KafkaMetadataLogTest.scala| 2 +- .../kafka/server/ServerGenerateClusterIdTest.scala | 4 +-- 7 files changed, 18 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala index 957cb2ce8bb..ea6ffeea65b 100644 --- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala +++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala @@ -203,8 +203,6 @@ object BrokerApiVersionsCommand { private object AdminClient { val DefaultConnectionMaxIdleMs = 9 * 60 * 1000 val DefaultRequestTimeoutMs = 5000 -val DefaultSocketConnectionSetupMs = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG -val DefaultSocketConnectionSetupMaxMs = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG val DefaultMaxInFlightRequestsPerConnection = 100 val DefaultReconnectBackoffMs = 50 val DefaultReconnectBackoffMax = 50 diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index f3b67a030f9..b428c447d77 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -214,7 +214,7 @@ object ConsumerGroupCommand extends Logging { private def printGroupStates(groupsAndStates: List[(String, String)]): Unit = { // find proper columns width var maxGroupLen = 15 - for ((groupId, state) <- groupsAndStates) { + for ((groupId, _) <- groupsAndStates) { maxGroupLen = Math.max(maxGroupLen, groupId.length) } println(s"%${-maxGroupLen}s %s".format("GROUP", "STATE")) diff --git a/core/src/main/scala/kafka/admin/FeatureCommand.scala b/core/src/main/scala/kafka/admin/FeatureCommand.scala index 3ed99022ed1..26228c652c8 100644 --- a/core/src/main/scala/kafka/admin/FeatureCommand.scala +++ b/core/src/main/scala/kafka/admin/FeatureCommand.scala @@ -155,15 +155,11 @@ object FeatureCommand { feature: String, level: Short ): String = { -if (feature.equals(MetadataVersion.FEATURE_NAME)) { - try { -MetadataVersion.fromFeatureLevel(level).version() - } catch { -case e: Throwable => s"UNKNOWN [${level}]" - } -} else { +if (feature.equals(MetadataVersion.FEATURE_NAME)) + try MetadataVersion.fromFeatureLevel(level).version + catch { case _: Throwable => s"UNKNOWN [$level]"} +else level.toString -} } def handleDescribe( @@ -172,8 +168,7 @@ object FeatureCommand { ): Unit = { val featureMetadata = admin.describeFeatures().featureMetadata().get() val featureList = new java.util.TreeSet[String](featureMetadata.supportedFeatures().keySet()) - featureList.forEach { - case feature => + featureList.forEach { feature => val finalizedLevel = featureMetadata.finalizedFeatures().asScala.get(feature) match { case None => 0.toShort case Some(v) => v.maxVersionLevel() @@ -218,12 +213,12 @@ object FeatureCommand { } val name = input.substring(0, equalsIndex).trim val levelString = input.substring(equalsIndex + 1).trim -val level = try { - levelString.toShort -} catch { - case e: Throwable => throw new TerseFailure(s"Can't parse feature=level string ${input}: " + -s"unable to parse ${levelString} as a short.") -} +val level = + try levelString.toShort + catch { +case _: Throwable => throw new TerseFailure(s"Can't parse feature=level string ${input}: " + + s"unable to parse ${levelString} as a short.") + } (name, level) } @@ -239,7 +234,7 @@ object FeatureCommand { val version = try { MetadataVersio
[kafka] branch trunk updated (7b7e40a536a -> 3929c2651da)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 7b7e40a536a KAFKA-14304 Add RPC changes, records, and config from KIP-866 (#12928) add 3929c2651da MINOR: Remove unused `ApiUtils` (#12949) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/api/ApiUtils.scala | 78 -- .../test/scala/unit/kafka/api/ApiUtilsTest.scala | 71 2 files changed, 149 deletions(-) delete mode 100644 core/src/main/scala/kafka/api/ApiUtils.scala delete mode 100644 core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
[kafka] branch trunk updated (b2b9ecdd614 -> 52bb677bbe7)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from b2b9ecdd614 MINOR: try-finally around super call in http.py (#12924) add 52bb677bbe7 MINOR: Reuse gradle daemon for scala compilation by default (#12280) No new revisions were added by this update. Summary of changes: Jenkinsfile | 4 ++-- README.md| 4 build.gradle | 11 ++- 3 files changed, 16 insertions(+), 3 deletions(-)
[kafka] branch trunk updated (8d65271a0b7 -> 528a4ba1a75)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 8d65271a0b7 MINOR: Update Gradle to 7.6 (#12918) add 528a4ba1a75 MINOR: Remove config/kraft/README.md from rat exclusion list (#12923) No new revisions were added by this update. Summary of changes: build.gradle | 1 - 1 file changed, 1 deletion(-)
[kafka] branch 12349-test-cleanup created (now a087351c213)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch 12349-test-cleanup in repository https://gitbox.apache.org/repos/asf/kafka.git at a087351c213 Reduce timer to 0 No new revisions were added by this update.
[kafka] branch trunk updated (b2d8354e10b -> 8d65271a0b7)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from b2d8354e10b KAFKA-14414: Fix request/response header size calculation (#12917) add 8d65271a0b7 MINOR: Update Gradle to 7.6 (#12918) No new revisions were added by this update. Summary of changes: gradle/dependencies.gradle | 2 +- gradle/wrapper/gradle-wrapper.properties | 4 ++-- gradlew | 2 +- retry_zinc | 7 +++ 4 files changed, 7 insertions(+), 8 deletions(-)
[kafka] branch gradle-7.6 updated (44fffdfd26f -> 449725a2041)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch gradle-7.6 in repository https://gitbox.apache.org/repos/asf/kafka.git from 44fffdfd26f MINOR: Update Gradle to 7.6 add 449725a2041 Use logs for buildoutput.log No new revisions were added by this update. Summary of changes: retry_zinc | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-)
[kafka] branch gradle-7.6 updated (5c98725deee -> 44fffdfd26f)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch gradle-7.6 in repository https://gitbox.apache.org/repos/asf/kafka.git omit 5c98725deee wip omit 3bd8a58aa06 Remove explicit `buildoutput.log` delete This update removed existing revisions from the reference, leaving the reference pointing at a previous point in the repository history. * -- * -- N refs/heads/gradle-7.6 (44fffdfd26f) \ O -- O -- O (5c98725deee) Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: retry_zinc | 9 + 1 file changed, 5 insertions(+), 4 deletions(-)
[kafka] branch gradle-7.6 updated (864dc5f9826 -> 5c98725deee)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch gradle-7.6 in repository https://gitbox.apache.org/repos/asf/kafka.git omit 864dc5f9826 wip add 5c98725deee wip This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (864dc5f9826) \ N -- N -- N refs/heads/gradle-7.6 (5c98725deee) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: retry_zinc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-)
[kafka] branch remove-kraft-readme-rat-exclusion created (now 1e5cc880721)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch remove-kraft-readme-rat-exclusion in repository https://gitbox.apache.org/repos/asf/kafka.git at 1e5cc880721 MINOR: Remove config/kraft/README.md from rat exclusion list No new revisions were added by this update.
[kafka] branch gradle-7.6 updated (599855f0d4a -> 864dc5f9826)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch gradle-7.6 in repository https://gitbox.apache.org/repos/asf/kafka.git omit 599855f0d4a wip add 864dc5f9826 wip This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (599855f0d4a) \ N -- N -- N refs/heads/gradle-7.6 (864dc5f9826) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: retry_zinc | 6 ++ 1 file changed, 2 insertions(+), 4 deletions(-)
[kafka] branch gradle-7.6 updated (271581091cb -> 599855f0d4a)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch gradle-7.6 in repository https://gitbox.apache.org/repos/asf/kafka.git omit 271581091cb wip add 599855f0d4a wip This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (271581091cb) \ N -- N -- N refs/heads/gradle-7.6 (599855f0d4a) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: retry_zinc | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-)
[kafka] branch gradle-7.6 updated (3bd8a58aa06 -> 271581091cb)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch gradle-7.6 in repository https://gitbox.apache.org/repos/asf/kafka.git from 3bd8a58aa06 Remove explicit `buildoutput.log` delete add 271581091cb wip No new revisions were added by this update. Summary of changes: retry_zinc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-)
[kafka] branch gradle-7.6 updated (44fffdfd26f -> 3bd8a58aa06)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch gradle-7.6 in repository https://gitbox.apache.org/repos/asf/kafka.git from 44fffdfd26f MINOR: Update Gradle to 7.6 add 3bd8a58aa06 Remove explicit `buildoutput.log` delete No new revisions were added by this update. Summary of changes: retry_zinc | 1 - 1 file changed, 1 deletion(-)
[kafka] branch gradle-7.6 updated (1674a98120a -> 44fffdfd26f)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch gradle-7.6 in repository https://gitbox.apache.org/repos/asf/kafka.git omit 1674a98120a MINOR: Update Gradle to 7.6 add d3ee9341cc8 KAFKA-12476: Prevent herder tick thread from sleeping excessively after slow operations (#12876) add 471e029f0a6 MINOR: buildoutput.log should be under the `build` directory (#12919) add 44fffdfd26f MINOR: Update Gradle to 7.6 This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (1674a98120a) \ N -- N -- N refs/heads/gradle-7.6 (44fffdfd26f) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: .../runtime/distributed/DistributedHerder.java | 17 +- .../runtime/distributed/DistributedHerderTest.java | 235 - retry_zinc | 8 +- 3 files changed, 152 insertions(+), 108 deletions(-)
[kafka] branch trunk updated (d3ee9341cc8 -> 471e029f0a6)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from d3ee9341cc8 KAFKA-12476: Prevent herder tick thread from sleeping excessively after slow operations (#12876) add 471e029f0a6 MINOR: buildoutput.log should be under the `build` directory (#12919) No new revisions were added by this update. Summary of changes: retry_zinc | 8 1 file changed, 4 insertions(+), 4 deletions(-)
[kafka] branch gradle-7.6 created (now 1674a98120a)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch gradle-7.6 in repository https://gitbox.apache.org/repos/asf/kafka.git at 1674a98120a MINOR: Update Gradle to 7.6 No new revisions were added by this update.
[kafka] branch trunk updated (cc582897bfb -> 2e0f9f42ddc)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from cc582897bfb KAFKA-14299: Fix incorrect pauses in separate state restoration (#12743) add 2e0f9f42ddc MINOR: Inline "Running a Kafka broker in KRaft mode" (#12750) No new revisions were added by this update. Summary of changes: README.md | 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-)
[kafka] branch trunk updated: MINOR: Include TLS version in transport layer debug log (#12751)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new de8c9ea04c4 MINOR: Include TLS version in transport layer debug log (#12751) de8c9ea04c4 is described below commit de8c9ea04c4ab672fe94e332e98efa4d08295b82 Author: Ismael Juma AuthorDate: Mon Oct 17 08:26:23 2022 -0700 MINOR: Include TLS version in transport layer debug log (#12751) This was helpful when debugging an issue recently. Reviewers: Manikumar Reddy , Divij Vaidya --- .../java/org/apache/kafka/common/network/SslTransportLayer.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 844c2bd2c17..904c5216a40 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -451,11 +451,11 @@ public class SslTransportLayer implements TransportLayer { if (netWriteBuffer.hasRemaining()) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); else { -state = sslEngine.getSession().getProtocol().equals(TLS13) ? State.POST_HANDSHAKE : State.READY; -key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); SSLSession session = sslEngine.getSession(); -log.debug("SSL handshake completed successfully with peerHost '{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'", -session.getPeerHost(), session.getPeerPort(), peerPrincipal(), session.getCipherSuite()); +state = session.getProtocol().equals(TLS13) ? State.POST_HANDSHAKE : State.READY; +key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); +log.debug("SSL handshake completed successfully with peerHost '{}' peerPort {} peerPrincipal '{}' protocol '{}' cipherSuite '{}'", +session.getPeerHost(), session.getPeerPort(), peerPrincipal(), session.getProtocol(), session.getCipherSuite()); metadataRegistry.registerCipherInformation( new CipherInformation(session.getCipherSuite(), session.getProtocol())); }
[kafka] 01/01: MINOR: Inline "Running a Kafka broker in KRaft mode"
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch running-kraft-mode-readme in repository https://gitbox.apache.org/repos/asf/kafka.git commit d66cdade89e26165e7505ae65ad4e0df86822598 Author: Ismael Juma AuthorDate: Fri Oct 14 06:48:03 2022 -0700 MINOR: Inline "Running a Kafka broker in KRaft mode" Also moved KRaft mode above zk mode. --- README.md | 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 2aa509e0810..36f71d122c8 100644 --- a/README.md +++ b/README.md @@ -83,15 +83,17 @@ fail due to code changes. You can just run: ./gradlew processMessages processTestMessages +### Running a Kafka broker in KRaft mode + +KAFKA_CLUSTER_ID="$(./bin/kafka-storage.sh random-uuid)" +./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties +./bin/kafka-server-start.sh config/kraft/server.properties + ### Running a Kafka broker in ZooKeeper mode ./bin/zookeeper-server-start.sh config/zookeeper.properties ./bin/kafka-server-start.sh config/server.properties -### Running a Kafka broker in KRaft (Kafka Raft metadata) mode - -See [config/kraft/README.md](https://github.com/apache/kafka/blob/trunk/config/kraft/README.md). - ### Cleaning the build ### ./gradlew clean
[kafka] branch running-kraft-mode-readme created (now d66cdade89e)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch running-kraft-mode-readme in repository https://gitbox.apache.org/repos/asf/kafka.git at d66cdade89e MINOR: Inline "Running a Kafka broker in KRaft mode" This branch includes the following new commits: new d66cdade89e MINOR: Inline "Running a Kafka broker in KRaft mode" The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
svn commit: r57130 - /release/kafka/3.3.0/
Author: ijuma Date: Sun Oct 2 23:08:54 2022 New Revision: 57130 Log: Delete 3.3.0 Removed: release/kafka/3.3.0/
svn commit: r57129 - /release/kafka/KEYS
Author: ijuma Date: Sun Oct 2 23:08:25 2022 New Revision: 57129 Log: Add Jose to KEYS Modified: release/kafka/KEYS Modified: release/kafka/KEYS == --- release/kafka/KEYS (original) +++ release/kafka/KEYS Sun Oct 2 23:08:25 2022 @@ -1771,3 +1771,62 @@ HCVj5EdrMtvz7PlGDwQpbNzSv78Bq8/xEA3l/yzu wATI4tNqbCl0E3n+CC3Jmurt1hplq7QKiAGbrBAn/SM= =ViGk -END PGP PUBLIC KEY BLOCK- +pub rsa4096 2022-08-15 [SC] + 21D66C1494D481B5A9407EC9CF2A47452FC9C6FD +uid [ultimate] Jose Armando Garcia Sancio (CODE SIGNING KEY) +sig 3CF2A47452FC9C6FD 2022-08-15 Jose Armando Garcia Sancio (CODE SIGNING KEY) +sub rsa4096 2022-08-15 [E] +sig CF2A47452FC9C6FD 2022-08-15 Jose Armando Garcia Sancio (CODE SIGNING KEY) + +-BEGIN PGP PUBLIC KEY BLOCK- + +mQINBGL6YIwBEACZXdg9P1ZsKTZH+cFHTxjVPuBEGEpXwcns50JmeKrwd9b7IBtQ +M2/53HOn77tr33yG9r6NWTqTfUCSmX5XMHJKzlc/FRDXjP9reZCqOYFLXjugtuU/ +tAsSzGSMCeDXRC9YeZTbpOxaeGIh7qM47o1rDUX9Rd/ITNDlgZC7JzqWO11wuYoX +7hHFRpWFanB9Cv2RlHRzOzmk1mvkVOA4V0SHX89VgnoDeDXtz/AR66YxFW2U1PoF +x4bjjRRv433tr9fUeI2yI4/KLpZXfYyhzyDg3ZhrCDcDxm7H2mBV+U/DXPyl5+Fl +X719MgusegtMYnQLFZc7f4ZXLrPKYj+r/8ehbZkSb/UXW2fiWUA5YZHhb43l6+Ne +jXxXcj8J/jbwiWY1xmrQ5L/yfG6o+YtaxY9bpPjvZzLuBC8IaliqWtsOe8Pyf6FR +kWlxmjVtfyOTqBxw1q1YWf+pjnTDs2TwDrQ7HRMacItXTvQzqDfpkwgsKhpsTgnI +WvLvWo5s4v+Wee9Sj78HTa+jBih4+tua1CArUTHgcMj1ujBGWdTUmAjon4E0Q9jk +X4rsPse8KxDSWxuKUx/In4vzqoty70lk7fRRm9IDOuulIoXMghubcMxXCQJpxLJK +mN4yJkDjhco3k7ZY3pNtCITz5IGiX/ZuzSvg49Q5vQx0HS9fXgpQUPBe3QARAQAB +tEJKb3NlIEFybWFuZG8gR2FyY2lhIFNhbmNpbyAoQ09ERSBTSUdOSU5HIEtFWSkg +PGpzYW5jaW9AYXBhY2hlLm9yZz6JAk4EEwEKADgWIQQh1mwUlNSBtalAfsnPKkdF +L8nG/QUCYvpgjAIbAwULCQgHAgYVCgkICwIEFgIDAQIeAQIXgAAKCRDPKkdFL8nG +/eXMD/wI/cJjUhADkYQKixips5g7/kE0jottDEcc2n6DIua76DbKXX/n9v6LPvtp +zmLxamu2SeZMyCWJ+IWr/3GLEb9LDmf8Rcw4SnqnuHhFoa+UIwagD+abgBRrByrs +W9yEY2g2e835RhYP4JOJSymbtgkuzgqDamVplJR6b1PQ0oG6kXPOkXxj7n+fhyY2 +lvt/PqwC8P7/o6pxL8JfjFwjMkaJWkbVVhE8lm3QA5GCOzrklCcog51/xIEOXOTP +F7oxWzIMplBSdAdV+EtAqIbWi0ZdeKfvistWU3c10hD31QRFoMmtZ8Kej0KJUGPs +I65uOCYdU7JbjXAdJ41AwGtZyBie25HQX27I6DIBgFIjoijqENq7QVJ3rjQ+AZWq +eFWbaTQVn4hIIzJBDVPQrpS4fNBgqeamx90aH0i1zitYHOvxJcMEP3VGLqytyGGv +cXL2dml6kDKDL0PlyW9wNS1IWorbL349VMoh8s9zOzMTagBxzMPpNYXBflYGrAj0 +fr+vGxMd7sesyMnwCydDlajLwUwE+sFyYinoE91ZHEo94mR4THJT70fA9ZApGFsT +ocoB029NBpC1uxvycK9y/iPCsHOHc7uatdBDQCG4/qhhlatr4mFvBouLKjiDqI5q +q977OCxgb9yr5OAjZTcJ3fXtpI0xwSdr31GqLQpnarsc8TJdwLkCDQRi+mCMARAA +qesM701S4qLL1/K8Zbh0JkiIfFEbIWpQ36UHfIbEdNp4IMW7ql36rTqwwxRpn1Ks +K4t+dQhTjp3hNjxYNoPcCRiULTHK2kij32uEeRgnprIXLw6lm4GXbtK6bJ2kA/oA +yLVCrxJ84+DVW0sfBGyZXtZrSFAD3Ax4ZBRFdlmsL/gS692Gv9T976h6dl9y8NYA +r1BPdvgvhzbnt+DkUUruorIGwMI3Q/gkYhr1Y8iaF+LEGIEks1PaChYJx9bzIBGi +6mGGeKUhjH9Fwq9EngeApgVRnkMquii2WUFqh5IfzdtoZb898jEjSbcsCiO/6i+k +YgM22Xq5OMJXOFvvNiFF99q5sKMNMvTSXuCKNnBQrFagQF6EMmU4XPBsOMsZxNkU +zLqOXNTpioCW28CJNqWIoVE23bbT+iKShTsHKU4J7DFSwdoqVX80eh5xdm1M/C2N +N9TD5GY/k7rjSltxOISjDgspyoV+C8VjtOoAvriY3jP/u0sAKIUIFQ6BtZVRcxaG +w/hOUCNmmw+1z1TcURAaV+tBBvvZRlSL72dMhBRW7aDKmF779WrsIpbocbVMJeZd +OUz+A2aZXJWaYF8EupSjFamIg63W8A9qm7PomE3bhn41FFjAUkGBm53wzcHmPnKY +HFxkA4L2EqjJKx111Clkh2nTpVyAr9nMdwp7mkBv5tkAEQEAAYkCNgQYAQoAIBYh +BCHWbBSU1IG1qUB+yc8qR0Uvycb9BQJi+mCMAhsMAAoJEM8qR0Uvycb98t4QAIdG +3jNi23p9sSSe1eVQPhnFOwqiPuW6vuNtB6Veb6rz+6v5hWFgUttSxGf/TOm+3Av9 +YiAEC8n8lK93Qs7JK5Tb0jl3sd3sUDCmWvnYtwQ5KauB8dHVg6T9kpgdPv/wBe18 +e8hbsvyaF+6e+e2EmJsp2NLwmZ6qruZDSCiFCFn4TE3jKrRiKtGbuxEtc+gyvmxH +JI3/dMGqgIiCeYn409JbjPcg2JgIHk/gNFmV+BI7dYq17MIAtEEpf5layX1DHdMS +P4f/O3BQu0P88vvxe34Uu1a8c9TL/maKGAFBg6H/b7kSfZvR2iuSrgvFZNT7gAhy +xxoZ6G6vxMcOrsmvyHUfGm1xqc5qp9QclC+pZe2LD/+HEV2ICMrf/UgF2OQXq1td +sjovHCRnwYZBQizQDRF54Np+7Cc30itYlYjOGPSk8FPrb12781Uxe/ITz9xD2kE5 +LnZPBQJARdp3Y96Vj/jadADJkutERSbUYQYXsuBpHj5xYVVq+fiNU1dFoc2tus3n +etkXPEsc7JKmG/SBTy3gI0AX45xFtNo/wJi3wAF+l4IHwddY081knKQit4Qn/fkV +lz5IFcS6DFwkN3XOYRQK2DqcH+Fbgkw6EvqCQTMbDI1QNSrweSpT2FbR7C1NWSaZ +yVLj58++qhfmK1CG8EkEB01bBD1MA9SrPE34w+e9 +=UkT2 +-END PGP PUBLIC KEY BLOCK-