This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 634e99e9ab1 MINOR: Cleanup metadata module (#20115)
634e99e9ab1 is described below
commit 634e99e9ab16db61990f4922f0a30b6a93f27785
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Jul 21 01:51:09 2025 +0500
MINOR: Cleanup metadata module (#20115)
- Removed unused methods and arguments;
- Used enhanced switch and functional style expression for Optional;
- Fixed IDEA code inspection warnings.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../errors/InvalidReplicaDirectoriesException.java | 5 ++--
.../kafka/controller/BrokerHeartbeatTracker.java | 2 +-
.../controller/ClientQuotaControlManager.java | 32 ++++++++++-----------
.../kafka/controller/ClusterControlManager.java | 4 +--
.../controller/ConfigurationControlManager.java | 4 +--
.../kafka/controller/EventPerformanceMonitor.java | 4 +--
.../kafka/controller/KRaftVersionAccessor.java | 1 -
.../apache/kafka/controller/QuorumController.java | 5 ----
.../controller/ReplicationControlManager.java | 2 --
.../errors/EventHandlerExceptionInfo.java | 6 ++--
.../org/apache/kafka/image/ClientQuotaImage.java | 4 +--
.../org/apache/kafka/image/ClientQuotasImage.java | 5 ++--
.../java/org/apache/kafka/image/ClusterDelta.java | 8 ++----
.../org/apache/kafka/image/ConfigurationImage.java | 4 +--
.../apache/kafka/image/ConfigurationsImage.java | 5 ++--
.../java/org/apache/kafka/image/MetadataImage.java | 6 ++--
.../org/apache/kafka/image/ProducerIdsImage.java | 3 +-
.../java/org/apache/kafka/image/ScramImage.java | 6 ++--
.../java/org/apache/kafka/image/TopicDelta.java | 11 +++-----
.../java/org/apache/kafka/image/TopicsDelta.java | 14 ++++-----
.../kafka/image/node/ClientQuotasImageNode.java | 14 ++++-----
.../apache/kafka/metadata/KafkaConfigSchema.java | 33 ++++++++--------------
.../kafka/metadata/PartitionRegistration.java | 7 ++---
.../metadata/authorizer/StandardAuthorizer.java | 5 ----
.../authorizer/StandardAuthorizerData.java | 4 +--
.../metadata/bootstrap/BootstrapMetadata.java | 2 +-
.../metadata/placement/PartitionAssignment.java | 2 +-
.../kafka/metadata/properties/MetaProperties.java | 16 +++--------
.../metadata/properties/MetaPropertiesVersion.java | 10 +++----
.../apache/kafka/metadata/storage/Formatter.java | 17 ++++-------
.../controller/PartitionChangeBuilderTest.java | 16 ++++-------
.../controller/ReplicationControlManagerTest.java | 4 +--
.../metrics/ControllerMetricsTestUtils.java | 19 ++++---------
.../apache/kafka/image/ClientQuotasImageTest.java | 4 +--
.../kafka/image/ConfigurationsImageTest.java | 4 +--
.../apache/kafka/image/ProducerIdsImageTest.java | 4 +--
.../kafka/image/writer/ImageWriterOptionsTest.java | 10 ++++---
.../metadata/bootstrap/BootstrapMetadataTest.java | 3 +-
.../metadata/properties/MetaPropertiesTest.java | 12 ++------
.../org/apache/kafka/metalog/LocalLogManager.java | 4 +--
40 files changed, 117 insertions(+), 204 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/common/errors/InvalidReplicaDirectoriesException.java
b/metadata/src/main/java/org/apache/kafka/common/errors/InvalidReplicaDirectoriesException.java
index c83a8b5bd4a..417521bfb48 100644
---
a/metadata/src/main/java/org/apache/kafka/common/errors/InvalidReplicaDirectoriesException.java
+++
b/metadata/src/main/java/org/apache/kafka/common/errors/InvalidReplicaDirectoriesException.java
@@ -23,14 +23,15 @@ import org.apache.kafka.common.metadata.PartitionRecord;
* A record was encountered where the number of directories does not match the
number of replicas.
*/
public class InvalidReplicaDirectoriesException extends
InvalidMetadataException {
+ private static final String ERR_MSG = "The lengths for replicas and
directories do not match: ";
private static final long serialVersionUID = 1L;
public InvalidReplicaDirectoriesException(PartitionRecord record) {
- super("The lengths for replicas and directories do not match: " +
record);
+ super(ERR_MSG + record);
}
public InvalidReplicaDirectoriesException(PartitionChangeRecord record) {
- super("The lengths for replicas and directories do not match: " +
record);
+ super(ERR_MSG + record);
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatTracker.java
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatTracker.java
index 722030f07ae..7698f44bbf3 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatTracker.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatTracker.java
@@ -26,7 +26,7 @@ import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
/**
- * The BrokerheartbeatTracker stores the last time each broker sent a
heartbeat to us.
+ * The BrokerHeartbeatTracker stores the last time each broker sent a
heartbeat to us.
* This class will be present only on the active controller.
*
* UNLIKE MOST OF THE KAFKA CONTROLLER, THIS CLASS CAN BE ACCESSED FROM
MULTIPLE THREADS.
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
index 3f9b3f61f10..ca6ec897330 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
@@ -261,32 +261,32 @@ public class ClientQuotaControlManager {
}
// Ensure the quota value is valid
- switch (configKey.type()) {
- case DOUBLE:
- return ApiError.NONE;
- case SHORT:
+ return switch (configKey.type()) {
+ case DOUBLE -> ApiError.NONE;
+ case SHORT -> {
if (value > Short.MAX_VALUE) {
- return new ApiError(Errors.INVALID_REQUEST,
+ yield new ApiError(Errors.INVALID_REQUEST,
"Proposed value for " + key + " is too large for a
SHORT.");
}
- return getErrorForIntegralQuotaValue(value, key);
- case INT:
+ yield getErrorForIntegralQuotaValue(value, key);
+ }
+ case INT -> {
if (value > Integer.MAX_VALUE) {
- return new ApiError(Errors.INVALID_REQUEST,
+ yield new ApiError(Errors.INVALID_REQUEST,
"Proposed value for " + key + " is too large for an
INT.");
}
- return getErrorForIntegralQuotaValue(value, key);
- case LONG: {
+ yield getErrorForIntegralQuotaValue(value, key);
+ }
+ case LONG -> {
if (value > Long.MAX_VALUE) {
- return new ApiError(Errors.INVALID_REQUEST,
+ yield new ApiError(Errors.INVALID_REQUEST,
"Proposed value for " + key + " is too large for a
LONG.");
}
- return getErrorForIntegralQuotaValue(value, key);
+ yield getErrorForIntegralQuotaValue(value, key);
}
- default:
- return new ApiError(Errors.UNKNOWN_SERVER_ERROR,
- "Unexpected config type " + configKey.type() + "
should be Long or Double");
- }
+ default -> new ApiError(Errors.UNKNOWN_SERVER_ERROR,
+ "Unexpected config type " + configKey.type() + " should be
Long or Double");
+ };
}
static ApiError getErrorForIntegralQuotaValue(double value, String key) {
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 14fce5ca0f3..2ce4b2f420f 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -823,7 +823,7 @@ public class ClusterControlManager {
}
Iterator<Entry<Integer, Map<String, VersionRange>>>
brokerSupportedFeatures() {
- return new Iterator<Entry<Integer, Map<String, VersionRange>>>() {
+ return new Iterator<>() {
private final Iterator<BrokerRegistration> iter =
brokerRegistrations.values().iterator();
@Override
@@ -845,7 +845,7 @@ public class ClusterControlManager {
throw new UnsupportedVersionException("The current MetadataVersion
is too old to " +
"support controller registrations.");
}
- return new Iterator<Entry<Integer, Map<String, VersionRange>>>() {
+ return new Iterator<>() {
private final Iterator<ControllerRegistration> iter =
controllerRegistrations.values().iterator();
@Override
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index b367fda8114..d0e48f18142 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -364,9 +364,7 @@ public class ConfigurationControlManager {
if (!newlyCreatedResource) {
existenceChecker.accept(configResource);
}
- if (alterConfigPolicy.isPresent()) {
- alterConfigPolicy.get().validate(new
RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck));
- }
+ alterConfigPolicy.ifPresent(policy -> policy.validate(new
RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck)));
} catch (ConfigException e) {
return new ApiError(INVALID_CONFIG, e.getMessage());
} catch (Throwable e) {
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java
b/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java
index fbe8b1c3cbb..55ff819ff7b 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java
@@ -74,12 +74,12 @@ class EventPerformanceMonitor {
/**
* The period in nanoseconds.
*/
- private long periodNs;
+ private final long periodNs;
/**
* The always-log threshold in nanoseconds.
*/
- private long alwaysLogThresholdNs;
+ private final long alwaysLogThresholdNs;
/**
* The name of the slowest event we've seen so far, or null if none has
been seen.
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/KRaftVersionAccessor.java
b/metadata/src/main/java/org/apache/kafka/controller/KRaftVersionAccessor.java
index b07d2c6398c..0eb6f96b858 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/KRaftVersionAccessor.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/KRaftVersionAccessor.java
@@ -36,7 +36,6 @@ interface KRaftVersionAccessor {
* @param epoch the current epoch
* @param newVersion the new kraft version to upgrade to
* @param validateOnly whether to just validate the change and not persist
it
- * @throws ApiException when the upgrade fails to validate
*/
void upgradeKRaftVersion(int epoch, KRaftVersion newVersion, boolean
validateOnly);
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 7dee1e3cd3d..5f4d6142434 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -676,11 +676,6 @@ public final class QuorumController implements Controller {
return clusterControl;
}
- // Visible for testing
- FeatureControlManager featureControl() {
- return featureControl;
- }
-
// Visible for testing
ConfigurationControlManager configurationControl() {
return configurationControl;
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 9ad4bba3424..fe561244dab 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1109,7 +1109,6 @@ public class ReplicationControlManager {
topic,
partitionId,
partition,
- context.requestHeader().requestApiVersion(),
partitionData);
if (validationError != Errors.NONE) {
@@ -1239,7 +1238,6 @@ public class ReplicationControlManager {
TopicControlInfo topic,
int partitionId,
PartitionRegistration partition,
- short requestApiVersion,
AlterPartitionRequestData.PartitionData partitionData
) {
if (partition == null) {
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
index fdd428495c1..c914e63cdaa 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
@@ -167,10 +167,8 @@ public final class EventHandlerExceptionInfo {
bld.append("event unable to start processing because of ");
}
bld.append(internalException.getClass().getSimpleName());
- if (externalException.isPresent()) {
- bld.append(" (treated as ").
-
append(externalException.get().getClass().getSimpleName()).append(")");
- }
+ externalException.ifPresent(e -> bld.append(" (treated as ")
+ .append(e.getClass().getSimpleName()).append(")"));
if (causesFailover()) {
bld.append(" at epoch ").append(epoch);
}
diff --git
a/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
b/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
index 2161ae2621f..eab0d102acf 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
@@ -23,7 +23,6 @@ import
org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.image.node.ClientQuotaImageNode;
import org.apache.kafka.image.writer.ImageWriter;
-import org.apache.kafka.image.writer.ImageWriterOptions;
import java.util.ArrayList;
import java.util.Collections;
@@ -47,8 +46,7 @@ public record ClientQuotaImage(Map<String, Double> quotas) {
public void write(
ClientQuotaEntity entity,
- ImageWriter writer,
- ImageWriterOptions options
+ ImageWriter writer
) {
for (Entry<String, Double> entry : quotas.entrySet()) {
writer.write(0, new ClientQuotaRecord().
diff --git
a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
index 77096e5d009..6cf23a2b688 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
@@ -26,7 +26,6 @@ import
org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryDat
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.image.node.ClientQuotasImageNode;
import org.apache.kafka.image.writer.ImageWriter;
-import org.apache.kafka.image.writer.ImageWriterOptions;
import java.util.Collections;
import java.util.HashMap;
@@ -67,11 +66,11 @@ public final class ClientQuotasImage {
return entities;
}
- public void write(ImageWriter writer, ImageWriterOptions options) {
+ public void write(ImageWriter writer) {
for (Entry<ClientQuotaEntity, ClientQuotaImage> entry :
entities.entrySet()) {
ClientQuotaEntity entity = entry.getKey();
ClientQuotaImage clientQuotaImage = entry.getValue();
- clientQuotaImage.write(entity, writer, options);
+ clientQuotaImage.write(entity, writer);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
index caf09243ff4..8f0556959b9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
@@ -165,9 +165,7 @@ public final class ClusterDelta {
int nodeId = entry.getKey();
Optional<BrokerRegistration> brokerRegistration = entry.getValue();
if (!newBrokers.containsKey(nodeId)) {
- if (brokerRegistration.isPresent()) {
- newBrokers.put(nodeId, brokerRegistration.get());
- }
+ brokerRegistration.ifPresent(registration ->
newBrokers.put(nodeId, registration));
}
}
Map<Integer, ControllerRegistration> newControllers = new
HashMap<>(image.controllers().size());
@@ -184,9 +182,7 @@ public final class ClusterDelta {
int nodeId = entry.getKey();
Optional<ControllerRegistration> controllerRegistration =
entry.getValue();
if (!newControllers.containsKey(nodeId)) {
- if (controllerRegistration.isPresent()) {
- newControllers.put(nodeId, controllerRegistration.get());
- }
+ controllerRegistration.ifPresent(registration ->
newControllers.put(nodeId, registration));
}
}
return new ClusterImage(newBrokers, newControllers);
diff --git
a/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
b/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
index b4227b3df65..c0757d351a7 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.image.node.ConfigurationImageNode;
import org.apache.kafka.image.writer.ImageWriter;
-import org.apache.kafka.image.writer.ImageWriterOptions;
import java.util.Collections;
import java.util.Map;
@@ -71,8 +70,7 @@ public final class ConfigurationImage {
public void write(
ConfigResource configResource,
- ImageWriter writer,
- ImageWriterOptions options
+ ImageWriter writer
) {
for (Map.Entry<String, String> entry : data.entrySet()) {
writer.write(0, new ConfigRecord().
diff --git
a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
index 4df6c363840..9d1670e6e15 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
@@ -20,7 +20,6 @@ package org.apache.kafka.image;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.image.node.ConfigurationsImageNode;
import org.apache.kafka.image.writer.ImageWriter;
-import org.apache.kafka.image.writer.ImageWriterOptions;
import java.util.Collections;
import java.util.Map;
@@ -74,11 +73,11 @@ public final class ConfigurationsImage {
}
}
- public void write(ImageWriter writer, ImageWriterOptions options) {
+ public void write(ImageWriter writer) {
for (Entry<ConfigResource, ConfigurationImage> entry :
data.entrySet()) {
ConfigResource configResource = entry.getKey();
ConfigurationImage configImage = entry.getValue();
- configImage.write(configResource, writer, options);
+ configImage.write(configResource, writer);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
index fc14eb1c5b1..da245f00519 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
@@ -153,9 +153,9 @@ public final class MetadataImage {
features.write(writer, options);
cluster.write(writer, options);
topics.write(writer, options);
- configs.write(writer, options);
- clientQuotas.write(writer, options);
- producerIds.write(writer, options);
+ configs.write(writer);
+ clientQuotas.write(writer);
+ producerIds.write(writer);
acls.write(writer);
scram.write(writer, options);
delegationTokens.write(writer, options);
diff --git
a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
index 8b7402ed98e..ea3f76fdfa8 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
@@ -20,7 +20,6 @@ package org.apache.kafka.image;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.image.node.ProducerIdsImageNode;
import org.apache.kafka.image.writer.ImageWriter;
-import org.apache.kafka.image.writer.ImageWriterOptions;
import java.util.Objects;
@@ -46,7 +45,7 @@ public final class ProducerIdsImage {
return nextProducerId;
}
- public void write(ImageWriter writer, ImageWriterOptions options) {
+ public void write(ImageWriter writer) {
if (nextProducerId >= 0) {
writer.write(0, new ProducerIdsRecord().
setBrokerId(-1).
diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
index b6dedc84625..c0c17a2a482 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
@@ -109,17 +109,17 @@ public final class ScramImage {
DescribeUserScramCredentialsResult result = new
DescribeUserScramCredentialsResult().setUser(user.getKey());
if (!user.getValue()) {
- boolean datafound = false;
+ boolean dataFound = false;
List<CredentialInfo> credentialInfos = new ArrayList<>();
for (Map.Entry<ScramMechanism, Map<String,
ScramCredentialData>> mechanismsEntry : mechanisms.entrySet()) {
Map<String, ScramCredentialData> credentialDataSet =
mechanismsEntry.getValue();
if (credentialDataSet.containsKey(user.getKey())) {
credentialInfos.add(new
CredentialInfo().setMechanism(mechanismsEntry.getKey().type())
.setIterations(credentialDataSet.get(user.getKey()).iterations()));
- datafound = true;
+ dataFound = true;
}
}
- if (datafound) {
+ if (dataFound) {
result.setCredentialInfos(credentialInfos);
} else {
result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code())
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
index 8fc84c5e006..b5ae7068988 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
@@ -20,7 +20,6 @@ package org.apache.kafka.image;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.metadata.PartitionRegistration;
@@ -40,8 +39,8 @@ import java.util.stream.Collectors;
public final class TopicDelta {
private final TopicImage image;
private final Map<Integer, PartitionRegistration> partitionChanges = new
HashMap<>();
- private Map<Integer, Integer> partitionToUncleanLeaderElectionCount = new
HashMap<>();
- private Map<Integer, Integer> partitionToElrElectionCount = new
HashMap<>();
+ private final Map<Integer, Integer> partitionToUncleanLeaderElectionCount
= new HashMap<>();
+ private final Map<Integer, Integer> partitionToElrElectionCount = new
HashMap<>();
public TopicDelta(TopicImage image) {
this.image = image;
@@ -113,11 +112,9 @@ public final class TopicDelta {
}
}
- public void replay(ClearElrRecord record) {
+ public void replay() {
// Some partitions are not added to the image yet, let's check the
partitionChanges first.
- partitionChanges.forEach((partitionId, partition) -> {
- maybeClearElr(partitionId, partition);
- });
+ partitionChanges.forEach(this::maybeClearElr);
image.partitions().forEach((partitionId, partition) -> {
if (!partitionChanges.containsKey(partitionId)) {
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
index e25bf71f05a..63ae2d06026 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
@@ -95,11 +95,11 @@ public final class TopicsDelta {
topicDelta.replay(record);
}
- private void maybeReplayClearElrRecord(Uuid topicId, ClearElrRecord
record) {
+ private void maybeReplayClearElrRecord(Uuid topicId) {
// Only apply the record if the topic is not deleted.
if (!deletedTopicIds.contains(topicId)) {
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
- topicDelta.replay(record);
+ topicDelta.replay();
}
}
@@ -123,15 +123,11 @@ public final class TopicsDelta {
record.topicName() + ": no such topic found.");
}
- maybeReplayClearElrRecord(topicId, record);
+ maybeReplayClearElrRecord(topicId);
} else {
// Update all the existing topics
- image.topicsById().forEach((topicId, image) -> {
- maybeReplayClearElrRecord(topicId, record);
- });
- createdTopicIds().forEach((topicId -> {
- maybeReplayClearElrRecord(topicId, record);
- }));
+ image.topicsById().forEach((topicId, image) ->
maybeReplayClearElrRecord(topicId));
+ createdTopicIds().forEach((this::maybeReplayClearElrRecord));
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java
b/metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java
index f3cd15dfb83..36fb9732c34 100644
---
a/metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java
+++
b/metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java
@@ -63,14 +63,11 @@ public class ClientQuotasImageNode implements MetadataNode {
String ip = null;
String user = null;
for (Map.Entry<String, String> entry : entity.entries().entrySet()) {
- if (entry.getKey().equals(CLIENT_ID)) {
- clientId = entry.getValue();
- } else if (entry.getKey().equals(IP)) {
- ip = entry.getValue();
- } else if (entry.getKey().equals(USER)) {
- user = entry.getValue();
- } else {
- throw new RuntimeException("Invalid entity type " +
entry.getKey());
+ switch (entry.getKey()) {
+ case CLIENT_ID -> clientId = entry.getValue();
+ case IP -> ip = entry.getValue();
+ case USER -> user = entry.getValue();
+ default -> throw new RuntimeException("Invalid entity type " +
entry.getKey());
}
}
StringBuilder bld = new StringBuilder();
@@ -85,7 +82,6 @@ public class ClientQuotasImageNode implements MetadataNode {
}
if (user != null) {
bld.append(prefix).append("user(").append(escape(user)).append(")");
- prefix = "_";
}
return bld.toString();
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
index a8cf8a47bb1..7eff72cca8a 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
@@ -55,28 +55,17 @@ public class KafkaConfigSchema {
* makes sense to put it here.
*/
public static ConfigEntry.ConfigType translateConfigType(ConfigDef.Type
type) {
- switch (type) {
- case BOOLEAN:
- return ConfigEntry.ConfigType.BOOLEAN;
- case STRING:
- return ConfigEntry.ConfigType.STRING;
- case INT:
- return ConfigEntry.ConfigType.INT;
- case SHORT:
- return ConfigEntry.ConfigType.SHORT;
- case LONG:
- return ConfigEntry.ConfigType.LONG;
- case DOUBLE:
- return ConfigEntry.ConfigType.DOUBLE;
- case LIST:
- return ConfigEntry.ConfigType.LIST;
- case CLASS:
- return ConfigEntry.ConfigType.CLASS;
- case PASSWORD:
- return ConfigEntry.ConfigType.PASSWORD;
- default:
- return ConfigEntry.ConfigType.UNKNOWN;
- }
+ return switch (type) {
+ case BOOLEAN -> ConfigEntry.ConfigType.BOOLEAN;
+ case STRING -> ConfigEntry.ConfigType.STRING;
+ case INT -> ConfigEntry.ConfigType.INT;
+ case SHORT -> ConfigEntry.ConfigType.SHORT;
+ case LONG -> ConfigEntry.ConfigType.LONG;
+ case DOUBLE -> ConfigEntry.ConfigType.DOUBLE;
+ case LIST -> ConfigEntry.ConfigType.LIST;
+ case CLASS -> ConfigEntry.ConfigType.CLASS;
+ case PASSWORD -> ConfigEntry.ConfigType.PASSWORD;
+ };
}
private static final Map<ConfigEntry.ConfigSource,
DescribeConfigsResponse.ConfigSource> TRANSLATE_CONFIG_SOURCE_MAP;
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 1c837aa4e1d..1d97db0b82e 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -216,8 +216,8 @@ public class PartitionRegistration {
}
private PartitionRegistration(int[] replicas, Uuid[] directories, int[]
isr, int[] removingReplicas,
- int[] addingReplicas, int leader,
LeaderRecoveryState leaderRecoveryState,
- int leaderEpoch, int partitionEpoch, int[]
elr, int[] lastKnownElr) {
+ int[] addingReplicas, int leader,
LeaderRecoveryState leaderRecoveryState,
+ int leaderEpoch, int partitionEpoch, int[]
elr, int[] lastKnownElr) {
Objects.requireNonNull(directories);
if (directories.length > 0 && directories.length != replicas.length) {
throw new IllegalArgumentException("The lengths for replicas and
directories do not match.");
@@ -410,8 +410,7 @@ public class PartitionRegistration {
return new ApiMessageAndVersion(record,
options.metadataVersion().partitionRecordVersion());
}
- public PartitionState toLeaderAndIsrPartitionState(TopicPartition tp,
-
boolean isNew) {
+ public PartitionState toLeaderAndIsrPartitionState(TopicPartition tp,
boolean isNew) {
return new PartitionState().
setTopicName(tp.topic()).
setPartitionIndex(tp.partition()).
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
index 2d97683e14e..5a0749648f9 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
@@ -94,11 +94,6 @@ public class StandardAuthorizer implements
ClusterMetadataAuthorizer, Monitorabl
initialLoadFuture.complete(null);
}
- // Visible for testing
- public CompletableFuture<Void> initialLoadFuture() {
- return initialLoadFuture;
- }
-
@Override
public void completeInitialLoad(Exception e) {
if (!initialLoadFuture.isDone()) {
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index 6b139b5d910..be50a0c895b 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -106,8 +106,6 @@ public class StandardAuthorizerData {
*/
private AclCache aclCache;
-
-
private static Logger createLogger(int nodeId) {
return new LogContext("[StandardAuthorizer " + nodeId + "]
").logger(StandardAuthorizerData.class);
}
@@ -172,7 +170,7 @@ public class StandardAuthorizerData {
}
StandardAuthorizerData copyWithNewAcls(AclCache aclCache) {
- StandardAuthorizerData newData = new StandardAuthorizerData(
+ StandardAuthorizerData newData = new StandardAuthorizerData(
log,
aclMutator,
loadingComplete,
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
index 786f4c1e31e..9438a76693e 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
@@ -177,7 +177,7 @@ public class BootstrapMetadata {
@Override
public String toString() {
- return "BootstrapMetadata(records=" + records.toString() +
+ return "BootstrapMetadata(records=" + records +
", metadataVersionLevel=" + metadataVersionLevel +
", source=" + source +
")";
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
b/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
index cefba273b25..9ac230be79e 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
@@ -37,7 +37,7 @@ public class PartitionAssignment {
public PartitionAssignment(List<Integer> replicas, DefaultDirProvider
defaultDirProvider) {
this.replicas = List.copyOf(replicas);
- this.directories = replicas.stream().map(replica ->
defaultDirProvider.defaultDir(replica)).toList();
+ this.directories =
replicas.stream().map(defaultDirProvider::defaultDir).toList();
}
/**
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java
b/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java
index 229219efcc2..bd02cb0fff6 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java
@@ -238,15 +238,11 @@ public final class MetaProperties {
StringBuilder bld = new StringBuilder();
bld.append("MetaProperties");
bld.append("(version=").append(version.number());
- if (clusterId.isPresent()) {
- bld.append(", clusterId=").append(clusterId.get());
- }
+ clusterId.ifPresent(id -> bld.append(", clusterId=").append(id));
if (nodeId.isPresent()) {
bld.append(", nodeId=").append(nodeId.getAsInt());
}
- if (directoryId.isPresent()) {
- bld.append(", directoryId=").append(directoryId.get());
- }
+ directoryId.ifPresent(id -> bld.append(", directoryId=").append(id));
bld.append(")");
return bld.toString();
}
@@ -254,9 +250,7 @@ public final class MetaProperties {
public Properties toProperties() {
Properties props = new Properties();
props.setProperty(VERSION_PROP, version.numberString());
- if (clusterId.isPresent()) {
- props.setProperty(CLUSTER_ID_PROP, clusterId.get());
- }
+ clusterId.ifPresent(id -> props.setProperty(CLUSTER_ID_PROP, id));
if (version.hasBrokerId()) {
if (nodeId.isPresent()) {
props.setProperty(BROKER_ID_PROP, "" + nodeId.getAsInt());
@@ -264,9 +258,7 @@ public final class MetaProperties {
} else {
props.setProperty(NODE_ID_PROP, "" + nodeId.getAsInt());
}
- if (directoryId.isPresent()) {
- props.setProperty(DIRECTORY_ID_PROP, directoryId.get().toString());
- }
+ directoryId.ifPresent(id -> props.setProperty(DIRECTORY_ID_PROP,
id.toString()));
return props;
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java
b/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java
index abe5d3d1d11..0cceb7852a4 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java
@@ -47,11 +47,11 @@ public enum MetaPropertiesVersion {
}
public static MetaPropertiesVersion fromNumber(int number) {
- switch (number) {
- case 0: return V0;
- case 1: return V1;
- default: throw new RuntimeException("Unknown meta.properties
version number " + number);
- }
+ return switch (number) {
+ case 0 -> V0;
+ case 1 -> V1;
+ default -> throw new RuntimeException("Unknown meta.properties
version number " + number);
+ };
}
MetaPropertiesVersion(int number) {
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 04b52c9e665..38219c2069a 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -461,17 +461,12 @@ public class Formatter {
DYNAMIC_METADATA_VOTER_DIRECTORY;
String description() {
- switch (this) {
- case LOG_DIRECTORY:
- return "data directory";
- case STATIC_METADATA_DIRECTORY:
- return "metadata directory";
- case DYNAMIC_METADATA_NON_VOTER_DIRECTORY:
- return "dynamic metadata directory";
- case DYNAMIC_METADATA_VOTER_DIRECTORY:
- return "dynamic metadata voter directory";
- }
- throw new RuntimeException("invalid enum type " + this);
+ return switch (this) {
+ case LOG_DIRECTORY -> "data directory";
+ case STATIC_METADATA_DIRECTORY -> "metadata directory";
+ case DYNAMIC_METADATA_NON_VOTER_DIRECTORY -> "dynamic metadata
directory";
+ case DYNAMIC_METADATA_VOTER_DIRECTORY -> "dynamic metadata
voter directory";
+ };
}
boolean isDynamicMetadataDirectory() {
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index f836be9e93f..bb5f5b9216d 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -119,16 +119,12 @@ public class PartitionChangeBuilderTest {
private static final Uuid FOO_ID =
Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
private static MetadataVersion
metadataVersionForPartitionChangeRecordVersion(short version) {
- switch (version) {
- case (short) 0:
- return MetadataVersion.IBP_3_7_IV0;
- case (short) 1:
- return MetadataVersion.IBP_3_7_IV2;
- case (short) 2:
- return MetadataVersion.IBP_4_0_IV1;
- default:
- throw new RuntimeException("Unknown PartitionChangeRecord
version " + version);
- }
+ return switch (version) {
+ case (short) 0 -> MetadataVersion.IBP_3_7_IV0;
+ case (short) 1 -> MetadataVersion.IBP_3_7_IV2;
+ case (short) 2 -> MetadataVersion.IBP_4_0_IV1;
+ default -> throw new RuntimeException("Unknown
PartitionChangeRecord version " + version);
+ };
}
private static PartitionChangeBuilder createFooBuilder(MetadataVersion
metadataVersion) {
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index ff18775e79c..7024d0de3a2 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -935,7 +935,7 @@ public class ReplicationControlManagerTest {
}
@Override
- public void close() throws Exception { /* Nothing to do */ }
+ public void close() { /* Nothing to do */ }
@Override
public void configure(Map<String, ?> configs) { /* Nothing to do
*/ }
@@ -2990,7 +2990,7 @@ public class ReplicationControlManagerTest {
new int[]{2, 3, 4}, new int[]{3, 4, 2}}).topicId();
KRaftClusterDescriber describer = replication.clusterDescriber;
HashSet<UsableBroker> brokers = new HashSet<>();
- describer.usableBrokers().forEachRemaining(broker ->
brokers.add(broker));
+ describer.usableBrokers().forEachRemaining(brokers::add);
assertEquals(Set.of(
new UsableBroker(0, Optional.empty(), true),
new UsableBroker(1, Optional.empty(), true),
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
index e3d82d8e59c..2e7fb0cde48 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
@@ -62,20 +62,13 @@ public class ControllerMetricsTestUtils {
}
public static PartitionRegistration fakePartitionRegistration(
- FakePartitionRegistrationType type
+ FakePartitionRegistrationType type
) {
- int leader = 0;
- switch (type) {
- case NORMAL:
- leader = 0;
- break;
- case NON_PREFERRED_LEADER:
- leader = 1;
- break;
- case OFFLINE:
- leader = -1;
- break;
- }
+ int leader = switch (type) {
+ case NORMAL -> 0;
+ case NON_PREFERRED_LEADER -> 1;
+ case OFFLINE -> -1;
+ };
return new PartitionRegistration.Builder().
setReplicas(new int[] {0, 1, 2}).
setDirectories(DirectoryId.migratingArray(3)).
diff --git
a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
index 76a878e1d48..46f8e908fe2 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
@@ -20,11 +20,9 @@ package org.apache.kafka.image;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
import org.apache.kafka.common.quota.ClientQuotaEntity;
-import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.QuotaConfig;
import org.junit.jupiter.api.Test;
@@ -136,7 +134,7 @@ public class ClientQuotasImageTest {
private static List<ApiMessageAndVersion>
getImageRecords(ClientQuotasImage image) {
RecordListWriter writer = new RecordListWriter();
- image.write(writer, new
ImageWriterOptions.Builder(MetadataVersion.latestProduction()).build());
+ image.write(writer);
return writer.records();
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
index 6e01c4dbf03..6300e1293e8 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
@@ -19,11 +19,9 @@ package org.apache.kafka.image;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ConfigRecord;
-import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -136,7 +134,7 @@ public class ConfigurationsImageTest {
private static List<ApiMessageAndVersion>
getImageRecords(ConfigurationsImage image) {
RecordListWriter writer = new RecordListWriter();
- image.write(writer, new
ImageWriterOptions.Builder(MetadataVersion.latestProduction()).build());
+ image.write(writer);
return writer.records();
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
index e34b1f3d1a6..1d4d07b58f8 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
@@ -18,11 +18,9 @@
package org.apache.kafka.image;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
-import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -113,7 +111,7 @@ public class ProducerIdsImageTest {
private static List<ApiMessageAndVersion> getImageRecords(ProducerIdsImage
image) {
RecordListWriter writer = new RecordListWriter();
- image.write(writer, new
ImageWriterOptions.Builder(MetadataVersion.latestProduction()).build());
+ image.write(writer);
return writer.records();
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java
b/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java
index 0070d41bdf9..433177b5c80 100644
---
a/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java
@@ -41,7 +41,9 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
@@ -75,10 +77,10 @@ public class ImageWriterOptionsTest {
MetadataVersion version = MetadataVersion.MINIMUM_VERSION;
ImageWriterOptions options = new ImageWriterOptions.Builder(version).
setEligibleLeaderReplicasEnabled(true).build();
- assertEquals(true, options.isEligibleLeaderReplicasEnabled());
+ assertTrue(options.isEligibleLeaderReplicasEnabled());
options = new ImageWriterOptions.Builder(version).build();
- assertEquals(false, options.isEligibleLeaderReplicasEnabled());
+ assertFalse(options.isEligibleLeaderReplicasEnabled());
}
@ParameterizedTest
@@ -111,9 +113,9 @@ public class ImageWriterOptionsTest {
ImageWriterOptions options = new
ImageWriterOptions.Builder(metadataImage).build();
assertEquals(MetadataVersion.IBP_4_0_IV1, options.metadataVersion());
if (isElrEnabled) {
- assertEquals(true, options.isEligibleLeaderReplicasEnabled());
+ assertTrue(options.isEligibleLeaderReplicasEnabled());
} else {
- assertEquals(false, options.isEligibleLeaderReplicasEnabled());
+ assertFalse(options.isEligibleLeaderReplicasEnabled());
}
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
index 858e823b7e0..0ad45185eda 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
@@ -131,7 +131,6 @@ public class BootstrapMetadataTest {
BootstrapMetadata bootstrapMetadata =
BootstrapMetadata.fromRecords(RECORDS_WITH_OLD_METADATA_VERSION, "quux");
assertEquals("No MetadataVersion with feature level 1. Valid feature
levels are from " + MetadataVersion.MINIMUM_VERSION.featureLevel()
+ " to " + MetadataVersion.latestTesting().featureLevel() + ".",
- assertThrows(RuntimeException.class,
- () -> bootstrapMetadata.metadataVersion()).getMessage());
+ assertThrows(RuntimeException.class,
bootstrapMetadata::metadataVersion).getMessage());
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/properties/MetaPropertiesTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/properties/MetaPropertiesTest.java
index 05a96d9dd2d..640e46544fc 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/properties/MetaPropertiesTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/properties/MetaPropertiesTest.java
@@ -88,15 +88,11 @@ public final class MetaPropertiesTest {
assertEquals(directoryId, metaProperties.directoryId());
Properties props = new Properties();
props.setProperty("version", "0");
- if (clusterId.isPresent()) {
- props.setProperty("cluster.id", clusterId.get());
- }
+ clusterId.ifPresent(id -> props.setProperty("cluster.id", id));
if (nodeId.isPresent()) {
props.setProperty("broker.id", "" + nodeId.getAsInt());
}
- if (directoryId.isPresent()) {
- props.setProperty("directory.id", directoryId.get().toString());
- }
+ directoryId.ifPresent(id -> props.setProperty("directory.id",
id.toString()));
Properties props2 = metaProperties.toProperties();
assertEquals(props, props2);
MetaProperties metaProperties2 = new
MetaProperties.Builder(props2).build();
@@ -151,9 +147,7 @@ public final class MetaPropertiesTest {
props.setProperty("version", "1");
props.setProperty("cluster.id", clusterId);
props.setProperty("node.id", "" + nodeId);
- if (directoryId.isPresent()) {
- props.setProperty("directory.id", directoryId.get().toString());
- }
+ directoryId.ifPresent(id -> props.setProperty("directory.id",
id.toString()));
Properties props2 = metaProperties.toProperties();
assertEquals(props, props2);
MetaProperties metaProperties2 = new
MetaProperties.Builder(props2).build();
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index c13f90f57fa..013d5945d2e 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -186,7 +186,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
/**
* The initial max read offset which LocalLog instances will be
configured with.
*/
- private long initialMaxReadOffset = Long.MAX_VALUE;
+ private final long initialMaxReadOffset = Long.MAX_VALUE;
public SharedLogData(Optional<RawSnapshotReader> snapshot) {
if (snapshot.isPresent()) {
@@ -544,7 +544,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
numEntriesFound++;
}
}
- log.trace("Completed log check for node " + nodeId);
+ log.trace("Completed log check for node {}", nodeId);
} catch (Exception e) {
log.error("Exception while handling log check", e);
}