This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 634e99e9ab1 MINOR: Cleanup metadata module (#20115)
634e99e9ab1 is described below

commit 634e99e9ab16db61990f4922f0a30b6a93f27785
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Jul 21 01:51:09 2025 +0500

    MINOR: Cleanup metadata module (#20115)
    
    - Removed unused methods and arguments;
    - Used enhanced switch and functional style expression for Optional;
    - Fixed IDEA code inspection warnings.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../errors/InvalidReplicaDirectoriesException.java |  5 ++--
 .../kafka/controller/BrokerHeartbeatTracker.java   |  2 +-
 .../controller/ClientQuotaControlManager.java      | 32 ++++++++++-----------
 .../kafka/controller/ClusterControlManager.java    |  4 +--
 .../controller/ConfigurationControlManager.java    |  4 +--
 .../kafka/controller/EventPerformanceMonitor.java  |  4 +--
 .../kafka/controller/KRaftVersionAccessor.java     |  1 -
 .../apache/kafka/controller/QuorumController.java  |  5 ----
 .../controller/ReplicationControlManager.java      |  2 --
 .../errors/EventHandlerExceptionInfo.java          |  6 ++--
 .../org/apache/kafka/image/ClientQuotaImage.java   |  4 +--
 .../org/apache/kafka/image/ClientQuotasImage.java  |  5 ++--
 .../java/org/apache/kafka/image/ClusterDelta.java  |  8 ++----
 .../org/apache/kafka/image/ConfigurationImage.java |  4 +--
 .../apache/kafka/image/ConfigurationsImage.java    |  5 ++--
 .../java/org/apache/kafka/image/MetadataImage.java |  6 ++--
 .../org/apache/kafka/image/ProducerIdsImage.java   |  3 +-
 .../java/org/apache/kafka/image/ScramImage.java    |  6 ++--
 .../java/org/apache/kafka/image/TopicDelta.java    | 11 +++-----
 .../java/org/apache/kafka/image/TopicsDelta.java   | 14 ++++-----
 .../kafka/image/node/ClientQuotasImageNode.java    | 14 ++++-----
 .../apache/kafka/metadata/KafkaConfigSchema.java   | 33 ++++++++--------------
 .../kafka/metadata/PartitionRegistration.java      |  7 ++---
 .../metadata/authorizer/StandardAuthorizer.java    |  5 ----
 .../authorizer/StandardAuthorizerData.java         |  4 +--
 .../metadata/bootstrap/BootstrapMetadata.java      |  2 +-
 .../metadata/placement/PartitionAssignment.java    |  2 +-
 .../kafka/metadata/properties/MetaProperties.java  | 16 +++--------
 .../metadata/properties/MetaPropertiesVersion.java | 10 +++----
 .../apache/kafka/metadata/storage/Formatter.java   | 17 ++++-------
 .../controller/PartitionChangeBuilderTest.java     | 16 ++++-------
 .../controller/ReplicationControlManagerTest.java  |  4 +--
 .../metrics/ControllerMetricsTestUtils.java        | 19 ++++---------
 .../apache/kafka/image/ClientQuotasImageTest.java  |  4 +--
 .../kafka/image/ConfigurationsImageTest.java       |  4 +--
 .../apache/kafka/image/ProducerIdsImageTest.java   |  4 +--
 .../kafka/image/writer/ImageWriterOptionsTest.java | 10 ++++---
 .../metadata/bootstrap/BootstrapMetadataTest.java  |  3 +-
 .../metadata/properties/MetaPropertiesTest.java    | 12 ++------
 .../org/apache/kafka/metalog/LocalLogManager.java  |  4 +--
 40 files changed, 117 insertions(+), 204 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/common/errors/InvalidReplicaDirectoriesException.java
 
b/metadata/src/main/java/org/apache/kafka/common/errors/InvalidReplicaDirectoriesException.java
index c83a8b5bd4a..417521bfb48 100644
--- 
a/metadata/src/main/java/org/apache/kafka/common/errors/InvalidReplicaDirectoriesException.java
+++ 
b/metadata/src/main/java/org/apache/kafka/common/errors/InvalidReplicaDirectoriesException.java
@@ -23,14 +23,15 @@ import org.apache.kafka.common.metadata.PartitionRecord;
  * A record was encountered where the number of directories does not match the 
number of replicas.
  */
 public class InvalidReplicaDirectoriesException extends 
InvalidMetadataException {
+    private static final String ERR_MSG = "The lengths for replicas and 
directories do not match: ";
 
     private static final long serialVersionUID = 1L;
 
     public InvalidReplicaDirectoriesException(PartitionRecord record) {
-        super("The lengths for replicas and directories do not match: " + 
record);
+        super(ERR_MSG + record);
     }
 
     public InvalidReplicaDirectoriesException(PartitionChangeRecord record) {
-        super("The lengths for replicas and directories do not match: " + 
record);
+        super(ERR_MSG + record);
     }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatTracker.java
 
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatTracker.java
index 722030f07ae..7698f44bbf3 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatTracker.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatTracker.java
@@ -26,7 +26,7 @@ import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * The BrokerheartbeatTracker stores the last time each broker sent a 
heartbeat to us.
+ * The BrokerHeartbeatTracker stores the last time each broker sent a 
heartbeat to us.
  * This class will be present only on the active controller.
  *
  * UNLIKE MOST OF THE KAFKA CONTROLLER, THIS CLASS CAN BE ACCESSED FROM 
MULTIPLE THREADS.
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
index 3f9b3f61f10..ca6ec897330 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
@@ -261,32 +261,32 @@ public class ClientQuotaControlManager {
         }
 
         // Ensure the quota value is valid
-        switch (configKey.type()) {
-            case DOUBLE:
-                return ApiError.NONE;
-            case SHORT:
+        return switch (configKey.type()) {
+            case DOUBLE -> ApiError.NONE;
+            case SHORT -> {
                 if (value > Short.MAX_VALUE) {
-                    return new ApiError(Errors.INVALID_REQUEST,
+                    yield new ApiError(Errors.INVALID_REQUEST,
                         "Proposed value for " + key + " is too large for a 
SHORT.");
                 }
-                return getErrorForIntegralQuotaValue(value, key);
-            case INT:
+                yield getErrorForIntegralQuotaValue(value, key);
+            }
+            case INT -> {
                 if (value > Integer.MAX_VALUE) {
-                    return new ApiError(Errors.INVALID_REQUEST,
+                    yield new ApiError(Errors.INVALID_REQUEST,
                         "Proposed value for " + key + " is too large for an 
INT.");
                 }
-                return getErrorForIntegralQuotaValue(value, key);
-            case LONG: {
+                yield getErrorForIntegralQuotaValue(value, key);
+            }
+            case LONG -> {
                 if (value > Long.MAX_VALUE) {
-                    return new ApiError(Errors.INVALID_REQUEST,
+                    yield new ApiError(Errors.INVALID_REQUEST,
                         "Proposed value for " + key + " is too large for a 
LONG.");
                 }
-                return getErrorForIntegralQuotaValue(value, key);
+                yield getErrorForIntegralQuotaValue(value, key);
             }
-            default:
-                return new ApiError(Errors.UNKNOWN_SERVER_ERROR,
-                        "Unexpected config type " + configKey.type() + " 
should be Long or Double");
-        }
+            default -> new ApiError(Errors.UNKNOWN_SERVER_ERROR,
+                    "Unexpected config type " + configKey.type() + " should be 
Long or Double");
+        };
     }
 
     static ApiError getErrorForIntegralQuotaValue(double value, String key) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 14fce5ca0f3..2ce4b2f420f 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -823,7 +823,7 @@ public class ClusterControlManager {
     }
 
     Iterator<Entry<Integer, Map<String, VersionRange>>> 
brokerSupportedFeatures() {
-        return new Iterator<Entry<Integer, Map<String, VersionRange>>>() {
+        return new Iterator<>() {
             private final Iterator<BrokerRegistration> iter = 
brokerRegistrations.values().iterator();
 
             @Override
@@ -845,7 +845,7 @@ public class ClusterControlManager {
             throw new UnsupportedVersionException("The current MetadataVersion 
is too old to " +
                     "support controller registrations.");
         }
-        return new Iterator<Entry<Integer, Map<String, VersionRange>>>() {
+        return new Iterator<>() {
             private final Iterator<ControllerRegistration> iter = 
controllerRegistrations.values().iterator();
 
             @Override
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index b367fda8114..d0e48f18142 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -364,9 +364,7 @@ public class ConfigurationControlManager {
             if (!newlyCreatedResource) {
                 existenceChecker.accept(configResource);
             }
-            if (alterConfigPolicy.isPresent()) {
-                alterConfigPolicy.get().validate(new 
RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck));
-            }
+            alterConfigPolicy.ifPresent(policy -> policy.validate(new 
RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck)));
         } catch (ConfigException e) {
             return new ApiError(INVALID_CONFIG, e.getMessage());
         } catch (Throwable e) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java
 
b/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java
index fbe8b1c3cbb..55ff819ff7b 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java
@@ -74,12 +74,12 @@ class EventPerformanceMonitor {
     /**
      * The period in nanoseconds.
      */
-    private long periodNs;
+    private final long periodNs;
 
     /**
      * The always-log threshold in nanoseconds.
      */
-    private long alwaysLogThresholdNs;
+    private final long alwaysLogThresholdNs;
 
     /**
      * The name of the slowest event we've seen so far, or null if none has 
been seen.
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/KRaftVersionAccessor.java 
b/metadata/src/main/java/org/apache/kafka/controller/KRaftVersionAccessor.java
index b07d2c6398c..0eb6f96b858 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/KRaftVersionAccessor.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/KRaftVersionAccessor.java
@@ -36,7 +36,6 @@ interface KRaftVersionAccessor {
      * @param epoch the current epoch
      * @param newVersion the new kraft version to upgrade to
      * @param validateOnly whether to just validate the change and not persist 
it
-     * @throws ApiException when the upgrade fails to validate
      */
     void upgradeKRaftVersion(int epoch, KRaftVersion newVersion, boolean 
validateOnly);
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 7dee1e3cd3d..5f4d6142434 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -676,11 +676,6 @@ public final class QuorumController implements Controller {
         return clusterControl;
     }
 
-    // Visible for testing
-    FeatureControlManager featureControl() {
-        return featureControl;
-    }
-
     // Visible for testing
     ConfigurationControlManager configurationControl() {
         return configurationControl;
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 9ad4bba3424..fe561244dab 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1109,7 +1109,6 @@ public class ReplicationControlManager {
                     topic,
                     partitionId,
                     partition,
-                    context.requestHeader().requestApiVersion(),
                     partitionData);
 
                 if (validationError != Errors.NONE) {
@@ -1239,7 +1238,6 @@ public class ReplicationControlManager {
         TopicControlInfo topic,
         int partitionId,
         PartitionRegistration partition,
-        short requestApiVersion,
         AlterPartitionRequestData.PartitionData partitionData
     ) {
         if (partition == null) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
 
b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
index fdd428495c1..c914e63cdaa 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
@@ -167,10 +167,8 @@ public final class EventHandlerExceptionInfo {
             bld.append("event unable to start processing because of ");
         }
         bld.append(internalException.getClass().getSimpleName());
-        if (externalException.isPresent()) {
-            bld.append(" (treated as ").
-                
append(externalException.get().getClass().getSimpleName()).append(")");
-        }
+        externalException.ifPresent(e -> bld.append(" (treated as ")
+                .append(e.getClass().getSimpleName()).append(")"));
         if (causesFailover()) {
             bld.append(" at epoch ").append(epoch);
         }
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
index 2161ae2621f..eab0d102acf 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
@@ -23,7 +23,6 @@ import 
org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 import org.apache.kafka.image.node.ClientQuotaImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
-import org.apache.kafka.image.writer.ImageWriterOptions;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -47,8 +46,7 @@ public record ClientQuotaImage(Map<String, Double> quotas) {
 
     public void write(
         ClientQuotaEntity entity,
-        ImageWriter writer,
-        ImageWriterOptions options
+        ImageWriter writer
     ) {
         for (Entry<String, Double> entry : quotas.entrySet()) {
             writer.write(0, new ClientQuotaRecord().
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
index 77096e5d009..6cf23a2b688 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
@@ -26,7 +26,6 @@ import 
org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryDat
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 import org.apache.kafka.image.node.ClientQuotasImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
-import org.apache.kafka.image.writer.ImageWriterOptions;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -67,11 +66,11 @@ public final class ClientQuotasImage {
         return entities;
     }
 
-    public void write(ImageWriter writer, ImageWriterOptions options) {
+    public void write(ImageWriter writer) {
         for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : 
entities.entrySet()) {
             ClientQuotaEntity entity = entry.getKey();
             ClientQuotaImage clientQuotaImage = entry.getValue();
-            clientQuotaImage.write(entity, writer, options);
+            clientQuotaImage.write(entity, writer);
         }
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
index caf09243ff4..8f0556959b9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
@@ -165,9 +165,7 @@ public final class ClusterDelta {
             int nodeId = entry.getKey();
             Optional<BrokerRegistration> brokerRegistration = entry.getValue();
             if (!newBrokers.containsKey(nodeId)) {
-                if (brokerRegistration.isPresent()) {
-                    newBrokers.put(nodeId, brokerRegistration.get());
-                }
+                brokerRegistration.ifPresent(registration -> 
newBrokers.put(nodeId, registration));
             }
         }
         Map<Integer, ControllerRegistration> newControllers = new 
HashMap<>(image.controllers().size());
@@ -184,9 +182,7 @@ public final class ClusterDelta {
             int nodeId = entry.getKey();
             Optional<ControllerRegistration> controllerRegistration = 
entry.getValue();
             if (!newControllers.containsKey(nodeId)) {
-                if (controllerRegistration.isPresent()) {
-                    newControllers.put(nodeId, controllerRegistration.get());
-                }
+                controllerRegistration.ifPresent(registration -> 
newControllers.put(nodeId, registration));
             }
         }
         return new ClusterImage(newBrokers, newControllers);
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
index b4227b3df65..c0757d351a7 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.image.node.ConfigurationImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
-import org.apache.kafka.image.writer.ImageWriterOptions;
 
 import java.util.Collections;
 import java.util.Map;
@@ -71,8 +70,7 @@ public final class ConfigurationImage {
 
     public void write(
         ConfigResource configResource,
-        ImageWriter writer,
-        ImageWriterOptions options
+        ImageWriter writer
     ) {
         for (Map.Entry<String, String> entry : data.entrySet()) {
             writer.write(0, new ConfigRecord().
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
index 4df6c363840..9d1670e6e15 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
@@ -20,7 +20,6 @@ package org.apache.kafka.image;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.image.node.ConfigurationsImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
-import org.apache.kafka.image.writer.ImageWriterOptions;
 
 import java.util.Collections;
 import java.util.Map;
@@ -74,11 +73,11 @@ public final class ConfigurationsImage {
         }
     }
 
-    public void write(ImageWriter writer, ImageWriterOptions options) {
+    public void write(ImageWriter writer) {
         for (Entry<ConfigResource, ConfigurationImage> entry : 
data.entrySet()) {
             ConfigResource configResource = entry.getKey();
             ConfigurationImage configImage = entry.getValue();
-            configImage.write(configResource, writer, options);
+            configImage.write(configResource, writer);
         }
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java 
b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
index fc14eb1c5b1..da245f00519 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
@@ -153,9 +153,9 @@ public final class MetadataImage {
         features.write(writer, options);
         cluster.write(writer, options);
         topics.write(writer, options);
-        configs.write(writer, options);
-        clientQuotas.write(writer, options);
-        producerIds.write(writer, options);
+        configs.write(writer);
+        clientQuotas.write(writer);
+        producerIds.write(writer);
         acls.write(writer);
         scram.write(writer, options);
         delegationTokens.write(writer, options);
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
index 8b7402ed98e..ea3f76fdfa8 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
@@ -20,7 +20,6 @@ package org.apache.kafka.image;
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
 import org.apache.kafka.image.node.ProducerIdsImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
-import org.apache.kafka.image.writer.ImageWriterOptions;
 
 import java.util.Objects;
 
@@ -46,7 +45,7 @@ public final class ProducerIdsImage {
         return nextProducerId;
     }
 
-    public void write(ImageWriter writer, ImageWriterOptions options) {
+    public void write(ImageWriter writer) {
         if (nextProducerId >= 0) {
             writer.write(0, new ProducerIdsRecord().
                     setBrokerId(-1).
diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
index b6dedc84625..c0c17a2a482 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
@@ -109,17 +109,17 @@ public final class ScramImage {
             DescribeUserScramCredentialsResult result = new 
DescribeUserScramCredentialsResult().setUser(user.getKey());
 
             if (!user.getValue()) {
-                boolean datafound = false;
+                boolean dataFound = false;
                 List<CredentialInfo> credentialInfos = new ArrayList<>();
                 for (Map.Entry<ScramMechanism, Map<String, 
ScramCredentialData>> mechanismsEntry : mechanisms.entrySet()) {
                     Map<String, ScramCredentialData> credentialDataSet = 
mechanismsEntry.getValue();
                     if (credentialDataSet.containsKey(user.getKey())) {
                         credentialInfos.add(new 
CredentialInfo().setMechanism(mechanismsEntry.getKey().type())
                                                                 
.setIterations(credentialDataSet.get(user.getKey()).iterations()));
-                        datafound = true;
+                        dataFound = true;
                     }
                 }
-                if (datafound) {
+                if (dataFound) {
                     result.setCredentialInfos(credentialInfos);
                 } else {
                     result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code())
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
index 8fc84c5e006..b5ae7068988 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
@@ -20,7 +20,6 @@ package org.apache.kafka.image;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.metadata.ClearElrRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.metadata.PartitionRegistration;
@@ -40,8 +39,8 @@ import java.util.stream.Collectors;
 public final class TopicDelta {
     private final TopicImage image;
     private final Map<Integer, PartitionRegistration> partitionChanges = new 
HashMap<>();
-    private Map<Integer, Integer> partitionToUncleanLeaderElectionCount = new 
HashMap<>();
-    private Map<Integer, Integer> partitionToElrElectionCount = new 
HashMap<>();
+    private final Map<Integer, Integer> partitionToUncleanLeaderElectionCount 
= new HashMap<>();
+    private final Map<Integer, Integer> partitionToElrElectionCount = new 
HashMap<>();
 
     public TopicDelta(TopicImage image) {
         this.image = image;
@@ -113,11 +112,9 @@ public final class TopicDelta {
         }
     }
 
-    public void replay(ClearElrRecord record) {
+    public void replay() {
         // Some partitions are not added to the image yet, let's check the 
partitionChanges first.
-        partitionChanges.forEach((partitionId, partition) -> {
-            maybeClearElr(partitionId, partition);
-        });
+        partitionChanges.forEach(this::maybeClearElr);
 
         image.partitions().forEach((partitionId, partition) -> {
             if (!partitionChanges.containsKey(partitionId)) {
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
index e25bf71f05a..63ae2d06026 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
@@ -95,11 +95,11 @@ public final class TopicsDelta {
         topicDelta.replay(record);
     }
 
-    private void maybeReplayClearElrRecord(Uuid topicId, ClearElrRecord 
record) {
+    private void maybeReplayClearElrRecord(Uuid topicId) {
         // Only apply the record if the topic is not deleted.
         if (!deletedTopicIds.contains(topicId)) {
             TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
-            topicDelta.replay(record);
+            topicDelta.replay();
         }
     }
 
@@ -123,15 +123,11 @@ public final class TopicsDelta {
                     record.topicName() + ": no such topic found.");
             }
 
-            maybeReplayClearElrRecord(topicId, record);
+            maybeReplayClearElrRecord(topicId);
         } else {
             // Update all the existing topics
-            image.topicsById().forEach((topicId, image) -> {
-                maybeReplayClearElrRecord(topicId, record);
-            });
-            createdTopicIds().forEach((topicId -> {
-                maybeReplayClearElrRecord(topicId, record);
-            }));
+            image.topicsById().forEach((topicId, image) -> 
maybeReplayClearElrRecord(topicId));
+            createdTopicIds().forEach((this::maybeReplayClearElrRecord));
         }
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java 
b/metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java
index f3cd15dfb83..36fb9732c34 100644
--- 
a/metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java
+++ 
b/metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java
@@ -63,14 +63,11 @@ public class ClientQuotasImageNode implements MetadataNode {
         String ip = null;
         String user = null;
         for (Map.Entry<String, String> entry : entity.entries().entrySet()) {
-            if (entry.getKey().equals(CLIENT_ID)) {
-                clientId = entry.getValue();
-            } else if (entry.getKey().equals(IP)) {
-                ip = entry.getValue();
-            } else if (entry.getKey().equals(USER)) {
-                user = entry.getValue();
-            } else {
-                throw new RuntimeException("Invalid entity type " + 
entry.getKey());
+            switch (entry.getKey()) {
+                case CLIENT_ID -> clientId = entry.getValue();
+                case IP -> ip = entry.getValue();
+                case USER -> user = entry.getValue();
+                default -> throw new RuntimeException("Invalid entity type " + 
entry.getKey());
             }
         }
         StringBuilder bld = new StringBuilder();
@@ -85,7 +82,6 @@ public class ClientQuotasImageNode implements MetadataNode {
         }
         if (user != null) {
             
bld.append(prefix).append("user(").append(escape(user)).append(")");
-            prefix = "_";
         }
         return bld.toString();
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java 
b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
index a8cf8a47bb1..7eff72cca8a 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
@@ -55,28 +55,17 @@ public class KafkaConfigSchema {
      * makes sense to put it here.
      */
     public static ConfigEntry.ConfigType translateConfigType(ConfigDef.Type 
type) {
-        switch (type) {
-            case BOOLEAN:
-                return ConfigEntry.ConfigType.BOOLEAN;
-            case STRING:
-                return ConfigEntry.ConfigType.STRING;
-            case INT:
-                return ConfigEntry.ConfigType.INT;
-            case SHORT:
-                return ConfigEntry.ConfigType.SHORT;
-            case LONG:
-                return ConfigEntry.ConfigType.LONG;
-            case DOUBLE:
-                return ConfigEntry.ConfigType.DOUBLE;
-            case LIST:
-                return ConfigEntry.ConfigType.LIST;
-            case CLASS:
-                return ConfigEntry.ConfigType.CLASS;
-            case PASSWORD:
-                return ConfigEntry.ConfigType.PASSWORD;
-            default:
-                return ConfigEntry.ConfigType.UNKNOWN;
-        }
+        return switch (type) {
+            case BOOLEAN -> ConfigEntry.ConfigType.BOOLEAN;
+            case STRING -> ConfigEntry.ConfigType.STRING;
+            case INT -> ConfigEntry.ConfigType.INT;
+            case SHORT -> ConfigEntry.ConfigType.SHORT;
+            case LONG -> ConfigEntry.ConfigType.LONG;
+            case DOUBLE -> ConfigEntry.ConfigType.DOUBLE;
+            case LIST -> ConfigEntry.ConfigType.LIST;
+            case CLASS -> ConfigEntry.ConfigType.CLASS;
+            case PASSWORD -> ConfigEntry.ConfigType.PASSWORD;
+        };
     }
 
     private static final Map<ConfigEntry.ConfigSource, 
DescribeConfigsResponse.ConfigSource> TRANSLATE_CONFIG_SOURCE_MAP;
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 1c837aa4e1d..1d97db0b82e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -216,8 +216,8 @@ public class PartitionRegistration {
     }
 
     private PartitionRegistration(int[] replicas, Uuid[] directories, int[] 
isr, int[] removingReplicas,
-                                 int[] addingReplicas, int leader, 
LeaderRecoveryState leaderRecoveryState,
-                                 int leaderEpoch, int partitionEpoch, int[] 
elr, int[] lastKnownElr) {
+                                  int[] addingReplicas, int leader, 
LeaderRecoveryState leaderRecoveryState,
+                                  int leaderEpoch, int partitionEpoch, int[] 
elr, int[] lastKnownElr) {
         Objects.requireNonNull(directories);
         if (directories.length > 0 && directories.length != replicas.length) {
             throw new IllegalArgumentException("The lengths for replicas and 
directories do not match.");
@@ -410,8 +410,7 @@ public class PartitionRegistration {
         return new ApiMessageAndVersion(record, 
options.metadataVersion().partitionRecordVersion());
     }
 
-    public PartitionState toLeaderAndIsrPartitionState(TopicPartition tp,
-                                                                           
boolean isNew) {
+    public PartitionState toLeaderAndIsrPartitionState(TopicPartition tp, 
boolean isNew) {
         return new PartitionState().
             setTopicName(tp.topic()).
             setPartitionIndex(tp.partition()).
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
index 2d97683e14e..5a0749648f9 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
@@ -94,11 +94,6 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer, Monitorabl
         initialLoadFuture.complete(null);
     }
 
-    // Visible for testing
-    public CompletableFuture<Void> initialLoadFuture() {
-        return initialLoadFuture;
-    }
-
     @Override
     public void completeInitialLoad(Exception e) {
         if (!initialLoadFuture.isDone()) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index 6b139b5d910..be50a0c895b 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -106,8 +106,6 @@ public class StandardAuthorizerData {
      */
     private AclCache aclCache;
 
-
-
     private static Logger createLogger(int nodeId) {
         return new LogContext("[StandardAuthorizer " + nodeId + "] 
").logger(StandardAuthorizerData.class);
     }
@@ -172,7 +170,7 @@ public class StandardAuthorizerData {
     }
 
     StandardAuthorizerData copyWithNewAcls(AclCache aclCache) {
-        StandardAuthorizerData newData =  new StandardAuthorizerData(
+        StandardAuthorizerData newData = new StandardAuthorizerData(
             log,
             aclMutator,
             loadingComplete,
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
index 786f4c1e31e..9438a76693e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
@@ -177,7 +177,7 @@ public class BootstrapMetadata {
 
     @Override
     public String toString() {
-        return "BootstrapMetadata(records=" + records.toString() +
+        return "BootstrapMetadata(records=" + records +
             ", metadataVersionLevel=" + metadataVersionLevel +
             ", source=" + source +
             ")";
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
index cefba273b25..9ac230be79e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
@@ -37,7 +37,7 @@ public class PartitionAssignment {
 
     public PartitionAssignment(List<Integer> replicas, DefaultDirProvider 
defaultDirProvider) {
         this.replicas = List.copyOf(replicas);
-        this.directories = replicas.stream().map(replica -> 
defaultDirProvider.defaultDir(replica)).toList();
+        this.directories = 
replicas.stream().map(defaultDirProvider::defaultDir).toList();
     }
 
     /**
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java
index 229219efcc2..bd02cb0fff6 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java
@@ -238,15 +238,11 @@ public final class MetaProperties {
         StringBuilder bld = new StringBuilder();
         bld.append("MetaProperties");
         bld.append("(version=").append(version.number());
-        if (clusterId.isPresent()) {
-            bld.append(", clusterId=").append(clusterId.get());
-        }
+        clusterId.ifPresent(id -> bld.append(", clusterId=").append(id));
         if (nodeId.isPresent()) {
             bld.append(", nodeId=").append(nodeId.getAsInt());
         }
-        if (directoryId.isPresent()) {
-            bld.append(", directoryId=").append(directoryId.get());
-        }
+        directoryId.ifPresent(id -> bld.append(", directoryId=").append(id));
         bld.append(")");
         return bld.toString();
     }
@@ -254,9 +250,7 @@ public final class MetaProperties {
     public Properties toProperties() {
         Properties props = new Properties();
         props.setProperty(VERSION_PROP, version.numberString());
-        if (clusterId.isPresent()) {
-            props.setProperty(CLUSTER_ID_PROP, clusterId.get());
-        }
+        clusterId.ifPresent(id -> props.setProperty(CLUSTER_ID_PROP, id));
         if (version.hasBrokerId()) {
             if (nodeId.isPresent()) {
                 props.setProperty(BROKER_ID_PROP, "" + nodeId.getAsInt());
@@ -264,9 +258,7 @@ public final class MetaProperties {
         } else {
             props.setProperty(NODE_ID_PROP, "" + nodeId.getAsInt());
         }
-        if (directoryId.isPresent()) {
-            props.setProperty(DIRECTORY_ID_PROP, directoryId.get().toString());
-        }
+        directoryId.ifPresent(id -> props.setProperty(DIRECTORY_ID_PROP, 
id.toString()));
         return props;
     }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java
index abe5d3d1d11..0cceb7852a4 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java
@@ -47,11 +47,11 @@ public enum MetaPropertiesVersion {
     }
 
     public static MetaPropertiesVersion fromNumber(int number) {
-        switch (number) {
-            case 0: return V0;
-            case 1: return V1;
-            default: throw new RuntimeException("Unknown meta.properties 
version number " + number);
-        }
+        return switch (number) {
+            case 0 -> V0;
+            case 1 -> V1;
+            default -> throw new RuntimeException("Unknown meta.properties 
version number " + number);
+        };
     }
 
     MetaPropertiesVersion(int number) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java 
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 04b52c9e665..38219c2069a 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -461,17 +461,12 @@ public class Formatter {
         DYNAMIC_METADATA_VOTER_DIRECTORY;
 
         String description() {
-            switch (this) {
-                case LOG_DIRECTORY:
-                    return "data directory";
-                case STATIC_METADATA_DIRECTORY:
-                    return "metadata directory";
-                case DYNAMIC_METADATA_NON_VOTER_DIRECTORY:
-                    return "dynamic metadata directory";
-                case DYNAMIC_METADATA_VOTER_DIRECTORY:
-                    return "dynamic metadata voter directory";
-            }
-            throw new RuntimeException("invalid enum type " + this);
+            return switch (this) {
+                case LOG_DIRECTORY -> "data directory";
+                case STATIC_METADATA_DIRECTORY -> "metadata directory";
+                case DYNAMIC_METADATA_NON_VOTER_DIRECTORY -> "dynamic metadata 
directory";
+                case DYNAMIC_METADATA_VOTER_DIRECTORY -> "dynamic metadata 
voter directory";
+            };
         }
 
         boolean isDynamicMetadataDirectory() {
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index f836be9e93f..bb5f5b9216d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -119,16 +119,12 @@ public class PartitionChangeBuilderTest {
     private static final Uuid FOO_ID = 
Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
 
     private static MetadataVersion 
metadataVersionForPartitionChangeRecordVersion(short version) {
-        switch (version) {
-            case (short) 0:
-                return MetadataVersion.IBP_3_7_IV0;
-            case (short) 1:
-                return MetadataVersion.IBP_3_7_IV2;
-            case (short) 2:
-                return MetadataVersion.IBP_4_0_IV1;
-            default:
-                throw new RuntimeException("Unknown PartitionChangeRecord 
version " + version);
-        }
+        return switch (version) {
+            case (short) 0 -> MetadataVersion.IBP_3_7_IV0;
+            case (short) 1 -> MetadataVersion.IBP_3_7_IV2;
+            case (short) 2 -> MetadataVersion.IBP_4_0_IV1;
+            default -> throw new RuntimeException("Unknown 
PartitionChangeRecord version " + version);
+        };
     }
 
     private static PartitionChangeBuilder createFooBuilder(MetadataVersion 
metadataVersion) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index ff18775e79c..7024d0de3a2 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -935,7 +935,7 @@ public class ReplicationControlManagerTest {
             }
 
             @Override
-            public void close() throws Exception { /* Nothing to do */ }
+            public void close() { /* Nothing to do */ }
 
             @Override
             public void configure(Map<String, ?> configs) { /* Nothing to do 
*/ }
@@ -2990,7 +2990,7 @@ public class ReplicationControlManagerTest {
             new int[]{2, 3, 4}, new int[]{3, 4, 2}}).topicId();
         KRaftClusterDescriber describer = replication.clusterDescriber;
         HashSet<UsableBroker> brokers = new HashSet<>();
-        describer.usableBrokers().forEachRemaining(broker -> 
brokers.add(broker));
+        describer.usableBrokers().forEachRemaining(brokers::add);
         assertEquals(Set.of(
             new UsableBroker(0, Optional.empty(), true),
             new UsableBroker(1, Optional.empty(), true),
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
index e3d82d8e59c..2e7fb0cde48 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
@@ -62,20 +62,13 @@ public class ControllerMetricsTestUtils {
     }
 
     public static PartitionRegistration fakePartitionRegistration(
-        FakePartitionRegistrationType  type
+        FakePartitionRegistrationType type
     ) {
-        int leader = 0;
-        switch (type) {
-            case NORMAL:
-                leader = 0;
-                break;
-            case NON_PREFERRED_LEADER:
-                leader = 1;
-                break;
-            case OFFLINE:
-                leader = -1;
-                break;
-        }
+        int leader = switch (type) {
+            case NORMAL -> 0;
+            case NON_PREFERRED_LEADER -> 1;
+            case OFFLINE -> -1;
+        };
         return new PartitionRegistration.Builder().
             setReplicas(new int[] {0, 1, 2}).
             setDirectories(DirectoryId.migratingArray(3)).
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
index 76a878e1d48..46f8e908fe2 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
@@ -20,11 +20,9 @@ package org.apache.kafka.image;
 import org.apache.kafka.common.metadata.ClientQuotaRecord;
 import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
-import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.image.writer.RecordListWriter;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.config.QuotaConfig;
 
 import org.junit.jupiter.api.Test;
@@ -136,7 +134,7 @@ public class ClientQuotasImageTest {
 
     private static List<ApiMessageAndVersion> 
getImageRecords(ClientQuotasImage image) {
         RecordListWriter writer = new RecordListWriter();
-        image.write(writer, new 
ImageWriterOptions.Builder(MetadataVersion.latestProduction()).build());
+        image.write(writer);
         return writer.records();
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
index 6e01c4dbf03..6300e1293e8 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
@@ -19,11 +19,9 @@ package org.apache.kafka.image;
 
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.metadata.ConfigRecord;
-import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.image.writer.RecordListWriter;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -136,7 +134,7 @@ public class ConfigurationsImageTest {
 
     private static List<ApiMessageAndVersion> 
getImageRecords(ConfigurationsImage image) {
         RecordListWriter writer = new RecordListWriter();
-        image.write(writer, new 
ImageWriterOptions.Builder(MetadataVersion.latestProduction()).build());
+        image.write(writer);
         return writer.records();
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
index e34b1f3d1a6..1d4d07b58f8 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
@@ -18,11 +18,9 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
-import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.image.writer.RecordListWriter;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -113,7 +111,7 @@ public class ProducerIdsImageTest {
 
     private static List<ApiMessageAndVersion> getImageRecords(ProducerIdsImage 
image) {
         RecordListWriter writer = new RecordListWriter();
-        image.write(writer, new 
ImageWriterOptions.Builder(MetadataVersion.latestProduction()).build());
+        image.write(writer);
         return writer.records();
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java
index 0070d41bdf9..433177b5c80 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java
@@ -41,7 +41,9 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.util.function.Consumer;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 @Timeout(value = 40)
@@ -75,10 +77,10 @@ public class ImageWriterOptionsTest {
         MetadataVersion version = MetadataVersion.MINIMUM_VERSION;
         ImageWriterOptions options = new ImageWriterOptions.Builder(version).
             setEligibleLeaderReplicasEnabled(true).build();
-        assertEquals(true, options.isEligibleLeaderReplicasEnabled());
+        assertTrue(options.isEligibleLeaderReplicasEnabled());
 
         options = new ImageWriterOptions.Builder(version).build();
-        assertEquals(false, options.isEligibleLeaderReplicasEnabled());
+        assertFalse(options.isEligibleLeaderReplicasEnabled());
     }
 
     @ParameterizedTest
@@ -111,9 +113,9 @@ public class ImageWriterOptionsTest {
         ImageWriterOptions options = new 
ImageWriterOptions.Builder(metadataImage).build();
         assertEquals(MetadataVersion.IBP_4_0_IV1, options.metadataVersion());
         if (isElrEnabled) {
-            assertEquals(true, options.isEligibleLeaderReplicasEnabled());
+            assertTrue(options.isEligibleLeaderReplicasEnabled());
         } else {
-            assertEquals(false, options.isEligibleLeaderReplicasEnabled());
+            assertFalse(options.isEligibleLeaderReplicasEnabled());
         }
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
index 858e823b7e0..0ad45185eda 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
@@ -131,7 +131,6 @@ public class BootstrapMetadataTest {
         BootstrapMetadata bootstrapMetadata = 
BootstrapMetadata.fromRecords(RECORDS_WITH_OLD_METADATA_VERSION, "quux");
         assertEquals("No MetadataVersion with feature level 1. Valid feature 
levels are from " + MetadataVersion.MINIMUM_VERSION.featureLevel()
             + " to " + MetadataVersion.latestTesting().featureLevel() + ".",
-                assertThrows(RuntimeException.class,
-                    () -> bootstrapMetadata.metadataVersion()).getMessage());
+                assertThrows(RuntimeException.class, 
bootstrapMetadata::metadataVersion).getMessage());
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/properties/MetaPropertiesTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/properties/MetaPropertiesTest.java
index 05a96d9dd2d..640e46544fc 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/properties/MetaPropertiesTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/properties/MetaPropertiesTest.java
@@ -88,15 +88,11 @@ public final class MetaPropertiesTest {
         assertEquals(directoryId, metaProperties.directoryId());
         Properties props = new Properties();
         props.setProperty("version", "0");
-        if (clusterId.isPresent()) {
-            props.setProperty("cluster.id", clusterId.get());
-        }
+        clusterId.ifPresent(id -> props.setProperty("cluster.id", id));
         if (nodeId.isPresent()) {
             props.setProperty("broker.id", "" + nodeId.getAsInt());
         }
-        if (directoryId.isPresent()) {
-            props.setProperty("directory.id", directoryId.get().toString());
-        }
+        directoryId.ifPresent(id -> props.setProperty("directory.id", 
id.toString()));
         Properties props2 = metaProperties.toProperties();
         assertEquals(props, props2);
         MetaProperties metaProperties2 = new 
MetaProperties.Builder(props2).build();
@@ -151,9 +147,7 @@ public final class MetaPropertiesTest {
         props.setProperty("version", "1");
         props.setProperty("cluster.id", clusterId);
         props.setProperty("node.id", "" + nodeId);
-        if (directoryId.isPresent()) {
-            props.setProperty("directory.id", directoryId.get().toString());
-        }
+        directoryId.ifPresent(id -> props.setProperty("directory.id", 
id.toString()));
         Properties props2 = metaProperties.toProperties();
         assertEquals(props, props2);
         MetaProperties metaProperties2 = new 
MetaProperties.Builder(props2).build();
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index c13f90f57fa..013d5945d2e 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -186,7 +186,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
         /**
          * The initial max read offset which LocalLog instances will be 
configured with.
          */
-        private long initialMaxReadOffset = Long.MAX_VALUE;
+        private final long initialMaxReadOffset = Long.MAX_VALUE;
 
         public SharedLogData(Optional<RawSnapshotReader> snapshot) {
             if (snapshot.isPresent()) {
@@ -544,7 +544,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
                         numEntriesFound++;
                     }
                 }
-                log.trace("Completed log check for node " + nodeId);
+                log.trace("Completed log check for node {}", nodeId);
             } catch (Exception e) {
                 log.error("Exception while handling log check", e);
             }

Reply via email to