This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new de92b200fcc KAFKA-18699 Cleanup Metadata Module (#20346)
de92b200fcc is described below

commit de92b200fcc4407a2ceb059a5861717ae4bf5575
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Fri Aug 22 22:27:44 2025 +0530

    KAFKA-18699 Cleanup Metadata Module (#20346)
    
    https://issues.apache.org/jira/browse/KAFKA-18699
    
    This PR aims at cleaning up the `metadata` module further by getting rid
    of some extra code which can be replaced by record
    
    Reviewers: Ken Huang <[email protected]>, Ming-Yen Chung
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../kafka/controller/BrokerControlStates.java      |  36 +-----
 .../apache/kafka/controller/BrokerIdAndEpoch.java  |  38 +------
 .../kafka/controller/PartitionChangeBuilder.java   |   4 +-
 .../controller/PartitionReassignmentReplicas.java  |   9 +-
 .../controller/ReplicationControlManager.java      |   9 +-
 .../java/org/apache/kafka/image/AclsImage.java     |  21 +---
 .../org/apache/kafka/image/ClientQuotasImage.java  |  30 +----
 .../java/org/apache/kafka/image/ClusterImage.java  |  34 +-----
 .../org/apache/kafka/image/ConfigurationImage.java |  23 +---
 .../apache/kafka/image/DelegationTokenImage.java   |  29 +----
 .../apache/kafka/image/LocalReplicaChanges.java    |  22 +---
 .../java/org/apache/kafka/image/MetadataImage.java | 124 +--------------------
 .../org/apache/kafka/image/MetadataProvenance.java |  62 +----------
 .../apache/kafka/image/MetadataVersionChange.java  |  45 +-------
 .../org/apache/kafka/image/ProducerIdsImage.java   |  39 ++-----
 .../java/org/apache/kafka/image/ScramImage.java    |  45 +++-----
 .../java/org/apache/kafka/image/TopicsImage.java   |  42 +------
 .../kafka/image/loader/SnapshotManifest.java       |  57 +---------
 .../apache/kafka/image/node/MetadataImageNode.java |  18 +--
 .../kafka/metadata/BrokerHeartbeatReply.java       |  79 ++-----------
 .../kafka/metadata/BrokerRegistrationReply.java    |  29 +----
 .../apache/kafka/metadata/DelegationTokenData.java |  28 +----
 .../metadata/FinalizedControllerFeatures.java      |  34 +-----
 .../apache/kafka/metadata/ScramCredentialData.java |  63 +++--------
 .../kafka/metadata/authorizer/StandardAcl.java     |  98 +---------------
 .../metadata/authorizer/StandardAclWithId.java     |  44 +-------
 .../authorizer/StandardAuthorizerData.java         |  26 +----
 .../kafka/metadata/placement/PlacementSpec.java    |  58 +---------
 .../kafka/metadata/placement/TopicAssignment.java  |  36 +-----
 .../kafka/metadata/placement/UsableBroker.java     |  49 +-------
 .../kafka/metadata/util/BatchFileReader.java       |  17 +--
 .../PartitionReassignmentReplicasTest.java         |   8 +-
 .../controller/PeriodicTaskControlManagerTest.java |  15 +--
 .../kafka/image/MetadataVersionChangeTest.java     |  19 ++++
 .../org/apache/kafka/metadata/RecordTestUtils.java |  17 +--
 .../authorizer/StandardAuthorizerTest.java         |   8 +-
 .../metadata/placement/TopicAssignmentTest.java    |   4 +-
 37 files changed, 141 insertions(+), 1178 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/BrokerControlStates.java 
b/metadata/src/main/java/org/apache/kafka/controller/BrokerControlStates.java
index b3b4dc9414f..8114c9c6f21 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/BrokerControlStates.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/BrokerControlStates.java
@@ -17,39 +17,5 @@
 
 package org.apache.kafka.controller;
 
-import java.util.Objects;
-
-
-class BrokerControlStates {
-    private final BrokerControlState current;
-    private final BrokerControlState next;
-
-    BrokerControlStates(BrokerControlState current, BrokerControlState next) {
-        this.current = current;
-        this.next = next;
-    }
-
-    BrokerControlState current() {
-        return current;
-    }
-
-    BrokerControlState next() {
-        return next;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(current, next);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof BrokerControlStates other)) return false;
-        return other.current == current && other.next == next;
-    }
-
-    @Override
-    public String toString() {
-        return "BrokerControlStates(current=" + current + ", next=" + next + 
")";
-    }
+record BrokerControlStates(BrokerControlState current, BrokerControlState 
next) {
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/BrokerIdAndEpoch.java 
b/metadata/src/main/java/org/apache/kafka/controller/BrokerIdAndEpoch.java
index 7ef75c4e3b3..ddb944ee77c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokerIdAndEpoch.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerIdAndEpoch.java
@@ -17,41 +17,5 @@
 
 package org.apache.kafka.controller;
 
-import java.util.Objects;
-
-public class BrokerIdAndEpoch {
-    private final int id;
-    private final long epoch;
-
-    public BrokerIdAndEpoch(
-        int id,
-        long epoch
-    ) {
-        this.id = id;
-        this.epoch = epoch;
-    }
-
-    public int id() {
-        return id;
-    }
-
-    public long epoch() {
-        return epoch;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || (!(o instanceof BrokerIdAndEpoch other))) return 
false;
-        return id == other.id && epoch == other.epoch;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(id, epoch);
-    }
-
-    @Override
-    public String toString() {
-        return "BrokerIdAndEpoch(id=" + id + ", epoch=" + epoch + ")";
-    }
+public record BrokerIdAndEpoch(int id, long epoch) {
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index a1d35293bd1..8697ad00962 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -421,8 +421,8 @@ public class PartitionChangeBuilder {
 
         PartitionReassignmentReplicas.CompletedReassignment 
completedReassignment = completedReassignmentOpt.get();
 
-        targetIsr = completedReassignment.isr;
-        targetReplicas = completedReassignment.replicas;
+        targetIsr = completedReassignment.isr();
+        targetReplicas = completedReassignment.replicas();
         targetRemoving = List.of();
         targetAdding = List.of();
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
index 51fdf42bb67..5f85dd1a3d6 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
@@ -129,14 +129,7 @@ class PartitionReassignmentReplicas {
         );
     }
 
-    static class CompletedReassignment {
-        final List<Integer> replicas;
-        final List<Integer> isr;
-
-        public CompletedReassignment(List<Integer> replicas, List<Integer> 
isr) {
-            this.replicas = replicas;
-            this.isr = isr;
-        }
+    record CompletedReassignment(List<Integer> replicas, List<Integer> isr) {
     }
 
     List<Integer> originalReplicas() {
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index fe561244dab..a1e93b3f10f 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -2441,14 +2441,7 @@ public class ReplicationControlManager {
         }
     }
 
-    private static final class IneligibleReplica {
-        private final int replicaId;
-        private final String reason;
-
-        private IneligibleReplica(int replicaId, String reason) {
-            this.replicaId = replicaId;
-            this.reason = reason;
-        }
+    private record IneligibleReplica(int replicaId, String reason) {
 
         @Override
         public String toString() {
diff --git a/metadata/src/main/java/org/apache/kafka/image/AclsImage.java 
b/metadata/src/main/java/org/apache/kafka/image/AclsImage.java
index 12f778b841e..86112fa22fb 100644
--- a/metadata/src/main/java/org/apache/kafka/image/AclsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/AclsImage.java
@@ -30,14 +30,12 @@ import java.util.Map.Entry;
 
 /**
  * Represents the ACLs in the metadata image.
- *
+ * <p>
  * This class is thread-safe.
  */
-public final class AclsImage {
+public record AclsImage(Map<Uuid, StandardAcl> acls) {
     public static final AclsImage EMPTY = new AclsImage(Map.of());
 
-    private final Map<Uuid, StandardAcl> acls;
-
     public AclsImage(Map<Uuid, StandardAcl> acls) {
         this.acls = Collections.unmodifiableMap(acls);
     }
@@ -46,10 +44,6 @@ public final class AclsImage {
         return acls.isEmpty();
     }
 
-    public Map<Uuid, StandardAcl> acls() {
-        return acls;
-    }
-
     public void write(ImageWriter writer) {
         for (Entry<Uuid, StandardAcl> entry : acls.entrySet()) {
             StandardAclWithId aclWithId = new 
StandardAclWithId(entry.getKey(), entry.getValue());
@@ -57,17 +51,6 @@ public final class AclsImage {
         }
     }
 
-    @Override
-    public int hashCode() {
-        return acls.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof AclsImage other)) return false;
-        return acls.equals(other.acls);
-    }
-
     @Override
     public String toString() {
         return new AclsImageNode(this).stringify();
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
index 6cf23a2b688..e5dfb0433d0 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
@@ -45,14 +45,12 @@ import static 
org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH
 
 /**
  * Represents the client quotas in the metadata image.
- *
+ * <p>
  * This class is thread-safe.
  */
-public final class ClientQuotasImage {
+public record ClientQuotasImage(Map<ClientQuotaEntity, ClientQuotaImage> 
entities) {
     public static final ClientQuotasImage EMPTY = new 
ClientQuotasImage(Map.of());
 
-    private final Map<ClientQuotaEntity, ClientQuotaImage> entities;
-
     public ClientQuotasImage(Map<ClientQuotaEntity, ClientQuotaImage> 
entities) {
         this.entities = Collections.unmodifiableMap(entities);
     }
@@ -61,11 +59,6 @@ public final class ClientQuotasImage {
         return entities.isEmpty();
     }
 
-    // Visible for testing
-    public Map<ClientQuotaEntity, ClientQuotaImage> entities() {
-        return entities;
-    }
-
     public void write(ImageWriter writer) {
         for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : 
entities.entrySet()) {
             ClientQuotaEntity entity = entry.getKey();
@@ -82,14 +75,14 @@ public final class ClientQuotasImage {
             if (component.entityType().isEmpty()) {
                 throw new InvalidRequestException("Invalid empty entity 
type.");
             } else if (exactMatch.containsKey(component.entityType()) ||
-                    typeMatch.contains(component.entityType())) {
+                typeMatch.contains(component.entityType())) {
                 throw new InvalidRequestException("Entity type " + 
component.entityType() +
                     " cannot appear more than once in the filter.");
             }
             if (!(component.entityType().equals(IP) || 
component.entityType().equals(USER) ||
-                    component.entityType().equals(CLIENT_ID))) {
+                component.entityType().equals(CLIENT_ID))) {
                 throw new UnsupportedVersionException("Unsupported entity type 
" +
-                        component.entityType());
+                    component.entityType());
             }
             switch (component.matchType()) {
                 case MATCH_TYPE_EXACT:
@@ -119,7 +112,7 @@ public final class ClientQuotasImage {
         }
         if (exactMatch.containsKey(IP) || typeMatch.contains(IP)) {
             if ((exactMatch.containsKey(USER) || typeMatch.contains(USER)) ||
-                    (exactMatch.containsKey(CLIENT_ID) || 
typeMatch.contains(CLIENT_ID))) {
+                (exactMatch.containsKey(CLIENT_ID) || 
typeMatch.contains(CLIENT_ID))) {
                 throw new InvalidRequestException("Invalid entity filter 
component " +
                     "combination. IP filter component should not be used with 
" +
                     "user or clientId filter component.");
@@ -173,17 +166,6 @@ public final class ClientQuotasImage {
         return data;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof ClientQuotasImage other)) return false;
-        return entities.equals(other.entities);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(entities);
-    }
-
     @Override
     public String toString() {
         return new ClientQuotasImageNode(this).stringify();
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
index 22166f1c2a3..1e4dbe20d7a 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
@@ -25,22 +25,16 @@ import org.apache.kafka.metadata.ControllerRegistration;
 
 import java.util.Collections;
 import java.util.Map;
-import java.util.Objects;
-
 
 /**
  * Represents the cluster in the metadata image.
- *
+ * <p>
  * This class is thread-safe.
  */
-public final class ClusterImage {
+public record ClusterImage(Map<Integer, BrokerRegistration> brokers, 
Map<Integer, ControllerRegistration> controllers) {
     public static final ClusterImage EMPTY = new ClusterImage(
-            Map.of(),
-            Map.of());
-
-    private final Map<Integer, BrokerRegistration> brokers;
-
-    private final Map<Integer, ControllerRegistration> controllers;
+        Map.of(),
+        Map.of());
 
     public ClusterImage(
         Map<Integer, BrokerRegistration> brokers,
@@ -54,18 +48,10 @@ public final class ClusterImage {
         return brokers.isEmpty();
     }
 
-    public Map<Integer, BrokerRegistration> brokers() {
-        return brokers;
-    }
-
     public BrokerRegistration broker(int nodeId) {
         return brokers.get(nodeId);
     }
 
-    public Map<Integer, ControllerRegistration> controllers() {
-        return controllers;
-    }
-
     public long brokerEpoch(int brokerId) {
         BrokerRegistration brokerRegistration = broker(brokerId);
         if (brokerRegistration == null) {
@@ -89,18 +75,6 @@ public final class ClusterImage {
         }
     }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(brokers, controllers);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof ClusterImage other)) return false;
-        return brokers.equals(other.brokers) &&
-            controllers.equals(other.controllers);
-    }
-
     @Override
     public String toString() {
         return new ClusterImageNode(this).stringify();
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
index c0757d351a7..8147a4b58a8 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
@@ -30,29 +30,10 @@ import java.util.Properties;
 
 /**
  * Represents the configuration of a resource.
- *
+ * <p>
  * This class is thread-safe.
  */
-public final class ConfigurationImage {
-    private final ConfigResource resource;
-
-    private final Map<String, String> data;
-
-    public ConfigurationImage(
-        ConfigResource resource,
-        Map<String, String> data
-    ) {
-        this.resource = resource;
-        this.data = data;
-    }
-
-    public ConfigResource resource() {
-        return resource;
-    }
-
-    public Map<String, String> data() {
-        return data;
-    }
+public record ConfigurationImage(ConfigResource resource, Map<String, String> 
data) {
 
     public boolean isEmpty() {
         return data.isEmpty();
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/DelegationTokenImage.java 
b/metadata/src/main/java/org/apache/kafka/image/DelegationTokenImage.java
index c2c90a4bbb9..41339fc667c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/DelegationTokenImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/DelegationTokenImage.java
@@ -31,16 +31,14 @@ import java.util.Map.Entry;
 
 /**
  * Represents the DelegationToken credentials in the metadata image.
- *
+ * <p>
  * This class is thread-safe.
+ *
+ * @param tokens Map TokenID to TokenInformation. The TokenID is also 
contained in the TokenInformation inside the DelegationTokenData
  */
-public final class DelegationTokenImage {
+public record DelegationTokenImage(Map<String, DelegationTokenData> tokens) {
     public static final DelegationTokenImage EMPTY = new 
DelegationTokenImage(Map.of());
 
-    // Map TokenID to TokenInformation.
-    // The TokenID is also contained in the TokenInformation inside the 
DelegationTokenData
-    private final Map<String, DelegationTokenData> tokens;
-
     public DelegationTokenImage(Map<String, DelegationTokenData> tokens) {
         this.tokens = Collections.unmodifiableMap(tokens);
     }
@@ -55,31 +53,14 @@ public final class DelegationTokenImage {
                 List<String> tokenIds = new ArrayList<>(tokens.keySet());
                 String delegationTokenImageString = "DelegationTokenImage(" + 
String.join(", ", tokenIds) + ")";
                 options.handleLoss(delegationTokenImageString);
-            } 
+            }
         }
     }
 
-    public Map<String, DelegationTokenData> tokens() {
-        return tokens;
-    }
-
     public boolean isEmpty() {
         return tokens.isEmpty();
     }
 
-    @Override
-    public int hashCode() {
-        return tokens.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null) return false;
-        if (!o.getClass().equals(DelegationTokenImage.class)) return false;
-        DelegationTokenImage other = (DelegationTokenImage) o;
-        return tokens.equals(other.tokens);
-    }
-
     @Override
     public String toString() {
         return new DelegationTokenImageNode(this).stringify();
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java 
b/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java
index e24b1455cb2..317641e4534 100644
--- a/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java
+++ b/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java
@@ -92,26 +92,6 @@ public final class LocalReplicaChanges {
         );
     }
 
-    public static final class PartitionInfo {
-        private final Uuid topicId;
-        private final PartitionRegistration partition;
-
-        public PartitionInfo(Uuid topicId, PartitionRegistration partition) {
-            this.topicId = topicId;
-            this.partition = partition;
-        }
-
-        public Uuid topicId() {
-            return topicId;
-        }
-
-        public PartitionRegistration partition() {
-            return partition;
-        }
-
-        @Override
-        public String toString() {
-            return String.format("PartitionInfo(topicId = %s, partition = 
%s)", topicId, partition);
-        }
+    public record PartitionInfo(Uuid topicId, PartitionRegistration partition) 
{
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java 
b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
index da245f00519..99364c039e7 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
@@ -22,15 +22,16 @@ import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.server.common.OffsetAndEpoch;
 
-import java.util.Objects;
-
 
 /**
  * The broker metadata image.
- *
+ * <p>
  * This class is thread-safe.
  */
-public final class MetadataImage {
+public record MetadataImage(MetadataProvenance provenance, FeaturesImage 
features, ClusterImage cluster,
+                            TopicsImage topics, ConfigurationsImage configs, 
ClientQuotasImage clientQuotas,
+                            ProducerIdsImage producerIds, AclsImage acls, 
ScramImage scram,
+                            DelegationTokenImage delegationTokens) {
     public static final MetadataImage EMPTY = new MetadataImage(
         MetadataProvenance.EMPTY,
         FeaturesImage.EMPTY,
@@ -43,50 +44,6 @@ public final class MetadataImage {
         ScramImage.EMPTY,
         DelegationTokenImage.EMPTY);
 
-    private final MetadataProvenance provenance;
-
-    private final FeaturesImage features;
-
-    private final ClusterImage cluster;
-
-    private final TopicsImage topics;
-
-    private final ConfigurationsImage configs;
-
-    private final ClientQuotasImage clientQuotas;
-
-    private final ProducerIdsImage producerIds;
-
-    private final AclsImage acls;
-
-    private final ScramImage scram;
-
-    private final DelegationTokenImage delegationTokens;
-
-    public MetadataImage(
-        MetadataProvenance provenance,
-        FeaturesImage features,
-        ClusterImage cluster,
-        TopicsImage topics,
-        ConfigurationsImage configs,
-        ClientQuotasImage clientQuotas,
-        ProducerIdsImage producerIds,
-        AclsImage acls,
-        ScramImage scram,
-        DelegationTokenImage delegationTokens
-    ) {
-        this.provenance = provenance;
-        this.features = features;
-        this.cluster = cluster;
-        this.topics = topics;
-        this.configs = configs;
-        this.clientQuotas = clientQuotas;
-        this.producerIds = producerIds;
-        this.acls = acls;
-        this.scram = scram;
-        this.delegationTokens = delegationTokens;
-    }
-
     public boolean isEmpty() {
         return features.isEmpty() &&
             cluster.isEmpty() &&
@@ -99,10 +56,6 @@ public final class MetadataImage {
             delegationTokens.isEmpty();
     }
 
-    public MetadataProvenance provenance() {
-        return provenance;
-    }
-
     public OffsetAndEpoch highestOffsetAndEpoch() {
         return new OffsetAndEpoch(provenance.lastContainedOffset(), 
provenance.lastContainedEpoch());
     }
@@ -111,42 +64,6 @@ public final class MetadataImage {
         return provenance.lastContainedOffset();
     }
 
-    public FeaturesImage features() {
-        return features;
-    }
-
-    public ClusterImage cluster() {
-        return cluster;
-    }
-
-    public TopicsImage topics() {
-        return topics;
-    }
-
-    public ConfigurationsImage configs() {
-        return configs;
-    }
-
-    public ClientQuotasImage clientQuotas() {
-        return clientQuotas;
-    }
-
-    public ProducerIdsImage producerIds() {
-        return producerIds;
-    }
-
-    public AclsImage acls() {
-        return acls;
-    }
-
-    public ScramImage scram() {
-        return scram;
-    }
-
-    public DelegationTokenImage delegationTokens() {
-        return delegationTokens;
-    }
-
     public void write(ImageWriter writer, ImageWriterOptions options) {
         // Features should be written out first so we can include the 
metadata.version at the beginning of the
         // snapshot
@@ -162,37 +79,6 @@ public final class MetadataImage {
         writer.close(true);
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || !o.getClass().equals(this.getClass())) return false;
-        MetadataImage other = (MetadataImage) o;
-        return provenance.equals(other.provenance) &&
-            features.equals(other.features) &&
-            cluster.equals(other.cluster) &&
-            topics.equals(other.topics) &&
-            configs.equals(other.configs) &&
-            clientQuotas.equals(other.clientQuotas) &&
-            producerIds.equals(other.producerIds) &&
-            acls.equals(other.acls) &&
-            scram.equals(other.scram) &&
-            delegationTokens.equals(other.delegationTokens);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(
-            provenance,
-            features,
-            cluster,
-            topics,
-            configs,
-            clientQuotas,
-            producerIds,
-            acls,
-            scram,
-            delegationTokens);
-    }
-
     @Override
     public String toString() {
         return new MetadataImageNode(this).stringify();
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java 
b/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java
index e9cef3e70e6..fe6bf6f5333 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java
@@ -20,48 +20,18 @@ package org.apache.kafka.image;
 import org.apache.kafka.server.common.OffsetAndEpoch;
 import org.apache.kafka.snapshot.Snapshots;
 
-import java.util.Objects;
-
 
 /**
  * Information about the source of a metadata image.
  */
-public final class MetadataProvenance {
+public record MetadataProvenance(long lastContainedOffset, int 
lastContainedEpoch, long lastContainedLogTimeMs,
+                                 boolean isOffsetBatchAligned) {
     public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, 
-1, -1L, false);
 
-    private final long lastContainedOffset;
-    private final int lastContainedEpoch;
-    private final long lastContainedLogTimeMs;
-    private final boolean isOffsetBatchAligned;
-
-    public MetadataProvenance(
-        long lastContainedOffset,
-        int lastContainedEpoch,
-        long lastContainedLogTimeMs,
-        boolean isOffsetBatchAligned
-    ) {
-        this.lastContainedOffset = lastContainedOffset;
-        this.lastContainedEpoch = lastContainedEpoch;
-        this.lastContainedLogTimeMs = lastContainedLogTimeMs;
-        this.isOffsetBatchAligned = isOffsetBatchAligned;
-    }
-
     public OffsetAndEpoch snapshotId() {
         return new OffsetAndEpoch(lastContainedOffset + 1, lastContainedEpoch);
     }
 
-    public long lastContainedOffset() {
-        return lastContainedOffset;
-    }
-
-    public int lastContainedEpoch() {
-        return lastContainedEpoch;
-    }
-
-    public long lastContainedLogTimeMs() {
-        return lastContainedLogTimeMs;
-    }
-
     /**
      * Returns whether lastContainedOffset is the last offset in a record batch
      */
@@ -75,32 +45,4 @@ public final class MetadataProvenance {
     public String snapshotName() {
         return String.format("snapshot %s", 
Snapshots.filenameFromSnapshotId(snapshotId()));
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || !o.getClass().equals(this.getClass())) return false;
-        MetadataProvenance other = (MetadataProvenance) o;
-        return lastContainedOffset == other.lastContainedOffset &&
-            lastContainedEpoch == other.lastContainedEpoch &&
-            lastContainedLogTimeMs == other.lastContainedLogTimeMs &&
-            isOffsetBatchAligned == other.isOffsetBatchAligned;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(lastContainedOffset,
-            lastContainedEpoch,
-            lastContainedLogTimeMs,
-            isOffsetBatchAligned);
-    }
-
-    @Override
-    public String toString() {
-        return "MetadataProvenance(" +
-            "lastContainedOffset=" + lastContainedOffset +
-            ", lastContainedEpoch=" + lastContainedEpoch +
-            ", lastContainedLogTimeMs=" + lastContainedLogTimeMs +
-            ", isOffsetBatchAligned=" + isOffsetBatchAligned +
-            ")";
-    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/MetadataVersionChange.java 
b/metadata/src/main/java/org/apache/kafka/image/MetadataVersionChange.java
index d9b43a306e5..f03e1719950 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataVersionChange.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataVersionChange.java
@@ -21,28 +21,13 @@ import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.Objects;
 
-
 /**
  * A change in the MetadataVersion.
  */
-public final class MetadataVersionChange {
-    private final MetadataVersion oldVersion;
-    private final MetadataVersion newVersion;
-
-    public MetadataVersionChange(
-            MetadataVersion oldVersion,
-            MetadataVersion newVersion
-    ) {
-        this.oldVersion = Objects.requireNonNull(oldVersion);
-        this.newVersion = Objects.requireNonNull(newVersion);
-    }
-
-    public MetadataVersion oldVersion() {
-        return oldVersion;
-    }
-
-    public MetadataVersion newVersion() {
-        return newVersion;
+public record MetadataVersionChange(MetadataVersion oldVersion, 
MetadataVersion newVersion) {
+    public MetadataVersionChange {
+        Objects.requireNonNull(oldVersion);
+        Objects.requireNonNull(newVersion);
     }
 
     public boolean isUpgrade() {
@@ -52,26 +37,4 @@ public final class MetadataVersionChange {
     public boolean isDowngrade() {
         return newVersion.isLessThan(oldVersion);
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || !o.getClass().equals(this.getClass())) return false;
-        MetadataVersionChange other = (MetadataVersionChange) o;
-        return oldVersion.equals(other.oldVersion) &&
-                newVersion.equals(other.newVersion);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(oldVersion,
-                newVersion);
-    }
-
-    @Override
-    public String toString() {
-        return "MetadataVersionChange(" +
-                "oldVersion=" + oldVersion +
-                ", newVersion=" + newVersion +
-                ")";
-    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
index ea3f76fdfa8..d1a0a4a7d12 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
@@ -21,36 +21,22 @@ import org.apache.kafka.common.metadata.ProducerIdsRecord;
 import org.apache.kafka.image.node.ProducerIdsImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
 
-import java.util.Objects;
-
-
 /**
  * Stores the highest seen producer ID in the metadata image.
- *
+ * <p>
  * This class is thread-safe.
+ *
+ * @param nextProducerId The next producer ID, or -1 in the special case where 
no producer IDs have been issued.
  */
-public final class ProducerIdsImage {
+public record ProducerIdsImage(long nextProducerId) {
     public static final ProducerIdsImage EMPTY = new ProducerIdsImage(-1L);
 
-    /**
-     * The next producer ID, or -1 in the special case where no producer IDs 
have been issued.
-     */
-    private final long nextProducerId;
-
-    public ProducerIdsImage(long nextProducerId) {
-        this.nextProducerId = nextProducerId;
-    }
-
-    public long nextProducerId() {
-        return nextProducerId;
-    }
-
     public void write(ImageWriter writer) {
         if (nextProducerId >= 0) {
             writer.write(0, new ProducerIdsRecord().
-                    setBrokerId(-1).
-                    setBrokerEpoch(-1).
-                    setNextProducerId(nextProducerId));
+                setBrokerId(-1).
+                setBrokerEpoch(-1).
+                setNextProducerId(nextProducerId));
         }
     }
 
@@ -58,17 +44,6 @@ public final class ProducerIdsImage {
         return nextProducerId == EMPTY.nextProducerId;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof ProducerIdsImage other)) return false;
-        return nextProducerId == other.nextProducerId;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(nextProducerId);
-    }
-
     @Override
     public String toString() {
         return new ProducerIdsImageNode(this).stringify();
diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
index c0c17a2a482..60c28d8f72e 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
@@ -39,29 +39,27 @@ import java.util.Map.Entry;
 
 /**
  * Represents the SCRAM credentials in the metadata image.
- *
+ * <p>
  * This class is thread-safe.
  */
-public final class ScramImage {
+public record ScramImage(Map<ScramMechanism, Map<String, ScramCredentialData>> 
mechanisms) {
     public static final ScramImage EMPTY = new ScramImage(Map.of());
 
-    private final Map<ScramMechanism, Map<String, ScramCredentialData>> 
mechanisms;
-
-    public ScramImage(Map<ScramMechanism, Map<String, ScramCredentialData>> 
mechanisms) {
-        this.mechanisms = Collections.unmodifiableMap(mechanisms);
+    public ScramImage {
+        mechanisms = Collections.unmodifiableMap(mechanisms);
     }
 
     public void write(ImageWriter writer, ImageWriterOptions options) {
         if (options.metadataVersion().isScramSupported()) {
-            for (Entry<ScramMechanism, Map<String, ScramCredentialData>> 
mechanismEntry : mechanisms.entrySet()) {
-                for (Entry<String, ScramCredentialData> userEntry : 
mechanismEntry.getValue().entrySet()) {
+            for (var mechanismEntry : mechanisms.entrySet()) {
+                for (var userEntry : mechanismEntry.getValue().entrySet()) {
                     writer.write(0, 
userEntry.getValue().toRecord(userEntry.getKey(), mechanismEntry.getKey()));
                 }
             }
         } else {
             boolean isEmpty = true;
             StringBuilder scramImageString = new StringBuilder("ScramImage({");
-            for (Entry<ScramMechanism, Map<String, ScramCredentialData>> 
mechanismEntry : mechanisms.entrySet()) {
+            for (var mechanismEntry : mechanisms.entrySet()) {
                 if (!mechanismEntry.getValue().isEmpty()) {
                     
scramImageString.append(mechanismEntry.getKey()).append(":");
                     List<String> users = new 
ArrayList<>(mechanismEntry.getValue().keySet());
@@ -87,7 +85,7 @@ public final class ScramImage {
 
         if ((users == null) || (users.isEmpty())) {
             // If there are no users listed then get all the users
-            for (Map<String, ScramCredentialData> scramCredentialDataSet : 
mechanisms.values()) {
+            for (var scramCredentialDataSet : mechanisms.values()) {
                 for (String user : scramCredentialDataSet.keySet()) {
                     uniqueUsers.put(user, false);
                 }
@@ -105,17 +103,17 @@ public final class ScramImage {
 
         DescribeUserScramCredentialsResponseData retval = new 
DescribeUserScramCredentialsResponseData();
 
-        for (Map.Entry<String, Boolean> user : uniqueUsers.entrySet()) {
+        for (Entry<String, Boolean> user : uniqueUsers.entrySet()) {
             DescribeUserScramCredentialsResult result = new 
DescribeUserScramCredentialsResult().setUser(user.getKey());
 
             if (!user.getValue()) {
                 boolean dataFound = false;
                 List<CredentialInfo> credentialInfos = new ArrayList<>();
-                for (Map.Entry<ScramMechanism, Map<String, 
ScramCredentialData>> mechanismsEntry : mechanisms.entrySet()) {
+                for (var mechanismsEntry : mechanisms.entrySet()) {
                     Map<String, ScramCredentialData> credentialDataSet = 
mechanismsEntry.getValue();
                     if (credentialDataSet.containsKey(user.getKey())) {
                         credentialInfos.add(new 
CredentialInfo().setMechanism(mechanismsEntry.getKey().type())
-                                                                
.setIterations(credentialDataSet.get(user.getKey()).iterations()));
+                            
.setIterations(credentialDataSet.get(user.getKey()).iterations()));
                         dataFound = true;
                     }
                 }
@@ -123,38 +121,21 @@ public final class ScramImage {
                     result.setCredentialInfos(credentialInfos);
                 } else {
                     result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code())
-                          .setErrorMessage(DESCRIBE_USER_THAT_DOES_NOT_EXIST + 
user.getKey());
+                        .setErrorMessage(DESCRIBE_USER_THAT_DOES_NOT_EXIST + 
user.getKey());
                 }
             } else {
                 result.setErrorCode(Errors.DUPLICATE_RESOURCE.code())
-                      .setErrorMessage(DESCRIBE_DUPLICATE_USER + 
user.getKey());
+                    .setErrorMessage(DESCRIBE_DUPLICATE_USER + user.getKey());
             }
             retval.results().add(result);
         }
         return retval;
     }
 
-    public Map<ScramMechanism, Map<String, ScramCredentialData>> mechanisms() {
-        return mechanisms;
-    }
-
     public boolean isEmpty() {
         return mechanisms.isEmpty();
     }
 
-    @Override
-    public int hashCode() {
-        return mechanisms.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null) return false;
-        if (!o.getClass().equals(ScramImage.class)) return false;
-        ScramImage other = (ScramImage) o;
-        return mechanisms.equals(other.mechanisms);
-    }
-
     @Override
     public String toString() {
         return new ScramImageNode(this).stringify();
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java 
b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
index 21dba62576a..36e73921932 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
@@ -26,26 +26,14 @@ import org.apache.kafka.server.immutable.ImmutableMap;
 import org.apache.kafka.server.util.TranslatedValueMapView;
 
 import java.util.Map;
-import java.util.Objects;
 
 /**
  * Represents the topics in the metadata image.
- *
+ * <p>
  * This class is thread-safe.
  */
-public final class TopicsImage {
-    public static final TopicsImage EMPTY =  new 
TopicsImage(ImmutableMap.empty(), ImmutableMap.empty());
-
-    private final ImmutableMap<Uuid, TopicImage> topicsById;
-    private final ImmutableMap<String, TopicImage> topicsByName;
-
-    public TopicsImage(
-        ImmutableMap<Uuid, TopicImage> topicsById,
-        ImmutableMap<String, TopicImage> topicsByName
-    ) {
-        this.topicsById = topicsById;
-        this.topicsByName = topicsByName;
-    }
+public record TopicsImage(ImmutableMap<Uuid, TopicImage> topicsById, 
ImmutableMap<String, TopicImage> topicsByName) {
+    public static final TopicsImage EMPTY = new 
TopicsImage(ImmutableMap.empty(), ImmutableMap.empty());
 
     public TopicsImage including(TopicImage topic) {
         return new TopicsImage(
@@ -57,14 +45,6 @@ public final class TopicsImage {
         return topicsById.isEmpty() && topicsByName.isEmpty();
     }
 
-    public ImmutableMap<Uuid, TopicImage> topicsById() {
-        return topicsById;
-    }
-
-    public ImmutableMap<String, TopicImage> topicsByName() {
-        return topicsByName;
-    }
-
     public PartitionRegistration getPartition(Uuid id, int partitionId) {
         TopicImage topicImage = topicsById.get(id);
         if (topicImage == null) return null;
@@ -85,21 +65,9 @@ public final class TopicsImage {
         }
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof TopicsImage other)) return false;
-        return topicsById.equals(other.topicsById) &&
-            topicsByName.equals(other.topicsByName);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(topicsById, topicsByName);
-    }
-
     /**
      * Expose a view of this TopicsImage as a map from topic names to IDs.
-     *
+     * <p>
      * Like TopicsImage itself, this map is immutable.
      */
     public Map<String, Uuid> topicNameToIdView() {
@@ -108,7 +76,7 @@ public final class TopicsImage {
 
     /**
      * Expose a view of this TopicsImage as a map from IDs to names.
-     *
+     * <p>
      * Like TopicsImage itself, this map is immutable.
      */
     public Map<Uuid, String> topicIdToNameView() {
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/SnapshotManifest.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/SnapshotManifest.java
index 5653a4689ea..cbd5e660c19 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/SnapshotManifest.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/SnapshotManifest.java
@@ -19,65 +19,16 @@ package org.apache.kafka.image.loader;
 
 import org.apache.kafka.image.MetadataProvenance;
 
-import java.util.Objects;
-
-
 /**
  * Contains information about a snapshot that was loaded.
+ *
+ * @param provenance The source of this snapshot.
+ * @param elapsedNs  The time in microseconds that it took to load the 
snapshot.
  */
-public class SnapshotManifest implements LoaderManifest {
-    /**
-     * The source of this snapshot.
-     */
-    private final MetadataProvenance provenance;
-
-    /**
-     * The time in microseconds that it took to load the snapshot.
-     */
-    private final long elapsedNs;
-
-    public SnapshotManifest(
-        MetadataProvenance provenance,
-        long elapsedNs
-    ) {
-        this.provenance = provenance;
-        this.elapsedNs = elapsedNs;
-    }
+public record SnapshotManifest(MetadataProvenance provenance, long elapsedNs) 
implements LoaderManifest {
 
     @Override
     public LoaderManifestType type() {
         return LoaderManifestType.SNAPSHOT;
     }
-
-    @Override
-    public MetadataProvenance provenance() {
-        return provenance;
-    }
-
-    public long elapsedNs() {
-        return elapsedNs;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(
-                provenance,
-                elapsedNs);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || !o.getClass().equals(this.getClass())) return false;
-        SnapshotManifest other = (SnapshotManifest) o;
-        return provenance.equals(other.provenance) &&
-                elapsedNs == other.elapsedNs;
-    }
-
-    @Override
-    public String toString() {
-        return "SnapshotManifest(" +
-                "provenance=" + provenance +
-                ", elapsedNs=" + elapsedNs +
-                ")";
-    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java 
b/metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java
index d9b0ae880f8..0eebac81521 100644
--- a/metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java
+++ b/metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java
@@ -24,17 +24,15 @@ import java.util.Map;
 import java.util.function.Function;
 
 
-public class MetadataImageNode implements MetadataNode {
+/**
+ * @param image The metadata image.
+ */
+public record MetadataImageNode(MetadataImage image) implements MetadataNode {
     /**
      * The name of this node.
      */
     public static final String NAME = "image";
 
-    /**
-     * The metadata image.
-     */
-    private final MetadataImage image;
-
     private static final Map<String, Function<MetadataImage, MetadataNode>> 
CHILDREN = Map.of(
         ProvenanceNode.NAME, image -> new ProvenanceNode(image.provenance()),
         FeaturesImageNode.NAME, image -> new 
FeaturesImageNode(image.features()),
@@ -48,14 +46,6 @@ public class MetadataImageNode implements MetadataNode {
         DelegationTokenImageNode.NAME, image -> new 
DelegationTokenImageNode(image.delegationTokens())
     );
 
-    public MetadataImageNode(MetadataImage image) {
-        this.image = image;
-    }
-
-    public MetadataImage image() {
-        return image;
-    }
-
     @Override
     public Collection<String> childNames() {
         return CHILDREN.keySet();
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java
index 9380bccbcfd..011c3c09952 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java
@@ -17,76 +17,13 @@
 
 package org.apache.kafka.metadata;
 
-import java.util.Objects;
 
-
-public class BrokerHeartbeatReply {
-    /**
-     * True if the heartbeat reply should tell the broker that it has caught 
up.
-     */
-    private final boolean isCaughtUp;
-
-    /**
-     * True if the heartbeat reply should tell the broker that it is fenced.
-     */
-    private final boolean isFenced;
-
-    /**
-     * True if the broker is currently in a controlled shutdown state.
-     */
-    private final boolean inControlledShutdown;
-
-    /**
-     * True if the heartbeat reply should tell the broker that it should shut 
down.
-     */
-    private final boolean shouldShutDown;
-
-    public BrokerHeartbeatReply(boolean isCaughtUp,
-                                boolean isFenced,
-                                boolean inControlledShutdown,
-                                boolean shouldShutDown) {
-        this.isCaughtUp = isCaughtUp;
-        this.isFenced = isFenced;
-        this.inControlledShutdown = inControlledShutdown;
-        this.shouldShutDown = shouldShutDown;
-    }
-
-    public boolean isCaughtUp() {
-        return isCaughtUp;
-    }
-
-    public boolean isFenced() {
-        return isFenced;
-    }
-
-    public boolean inControlledShutdown() {
-        return inControlledShutdown;
-    }
-
-    public boolean shouldShutDown() {
-        return shouldShutDown;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(isCaughtUp, isFenced, inControlledShutdown, 
shouldShutDown);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof BrokerHeartbeatReply other)) return false;
-        return other.isCaughtUp == isCaughtUp &&
-            other.isFenced == isFenced &&
-            other.inControlledShutdown == inControlledShutdown &&
-            other.shouldShutDown == shouldShutDown;
-    }
-
-    @Override
-    public String toString() {
-        return "BrokerHeartbeatReply(isCaughtUp=" + isCaughtUp +
-            ", isFenced=" + isFenced +
-            ", inControlledShutdown=" + inControlledShutdown +
-            ", shouldShutDown = " + shouldShutDown +
-            ")";
-    }
+/**
+ * @param isCaughtUp           True if the heartbeat reply should tell the 
broker that it has caught up.
+ * @param isFenced             True if the heartbeat reply should tell the 
broker that it is fenced.
+ * @param inControlledShutdown True if the broker is currently in a controlled 
shutdown state.
+ * @param shouldShutDown       True if the heartbeat reply should tell the 
broker that it should shut down.
+ */
+public record BrokerHeartbeatReply(boolean isCaughtUp, boolean isFenced, 
boolean inControlledShutdown,
+                                   boolean shouldShutDown) {
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java
index cfd86e67ab5..977ec2f9863 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java
@@ -17,33 +17,6 @@
 
 package org.apache.kafka.metadata;
 
-import java.util.Objects;
 
-
-public class BrokerRegistrationReply {
-    private final long epoch;
-
-    public BrokerRegistrationReply(long epoch) {
-        this.epoch = epoch;
-    }
-
-    public long epoch() {
-        return epoch;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(epoch);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof BrokerRegistrationReply other)) return false;
-        return other.epoch == epoch;
-    }
-
-    @Override
-    public String toString() {
-        return "BrokerRegistrationReply(epoch=" + epoch + ")";
-    }
+public record BrokerRegistrationReply(long epoch) {
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java 
b/metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java
index df853169e19..9ed69f01081 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java
@@ -24,16 +24,13 @@ import org.apache.kafka.common.utils.SecurityUtils;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 
 /**
  * Represents the Delegation Tokens in the metadata image.
- *
+ * <p>
  * This class is thread-safe.
  */
-public final class DelegationTokenData {
-
-    private final TokenInformation tokenInformation;
+public record DelegationTokenData(TokenInformation tokenInformation) {
 
     public static DelegationTokenData fromRecord(DelegationTokenRecord record) 
{
         List<KafkaPrincipal> renewers = new ArrayList<>();
@@ -50,14 +47,6 @@ public final class DelegationTokenData {
             record.expirationTimestamp()));
     }
 
-    public DelegationTokenData(TokenInformation tokenInformation) {
-        this.tokenInformation = tokenInformation;
-    }
-
-    public TokenInformation tokenInformation() {
-        return tokenInformation;
-    }
-
     public DelegationTokenRecord toRecord() {
         return new DelegationTokenRecord()
             .setOwner(tokenInformation.ownerAsString())
@@ -69,19 +58,6 @@ public final class DelegationTokenData {
             .setTokenId(tokenInformation.tokenId());
     }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(tokenInformation);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null) return false;
-        if (!o.getClass().equals(DelegationTokenData.class)) return false;
-        DelegationTokenData other = (DelegationTokenData) o;
-        return tokenInformation.equals(other.tokenInformation);
-    }
-
     /*
      * We explicitly hide tokenInformation when converting DelegationTokenData 
to string
      * For legacy reasons, we did not change TokenInformation to hide 
sensitive data.
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/FinalizedControllerFeatures.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/FinalizedControllerFeatures.java
index 0a5af620f2a..d66f6ebe02c 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/FinalizedControllerFeatures.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/FinalizedControllerFeatures.java
@@ -19,18 +19,13 @@ package org.apache.kafka.metadata;
 
 import java.util.Collections;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
-
 /**
  * A map of feature names to their supported versions.
  */
-public class FinalizedControllerFeatures {
-    private final Map<String, Short> featureMap;
-    private final long epoch;
-
+public record FinalizedControllerFeatures(Map<String, Short> featureMap, long 
epoch) {
     public FinalizedControllerFeatures(Map<String, Short> featureMap, long 
epoch) {
         this.featureMap = Collections.unmodifiableMap(featureMap);
         this.epoch = epoch;
@@ -47,31 +42,4 @@ public class FinalizedControllerFeatures {
     public Set<String> featureNames() {
         return featureMap.keySet();
     }
-
-    public Map<String, Short> featureMap() {
-        return featureMap;
-    }
-
-    public long epoch() {
-        return epoch;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(featureMap, epoch);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof FinalizedControllerFeatures other)) return false;
-        return featureMap.equals(other.featureMap) && epoch == other.epoch;
-    }
-
-    @Override
-    public String toString() {
-        return "FinalizedControllerFeatures(" +
-                "featureMap=" + featureMap.toString() +
-                ", epoch=" + epoch +
-                ")";
-    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java 
b/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java
index 4339a572b71..ff35881baaa 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java
@@ -26,51 +26,18 @@ import java.util.Objects;
 
 /**
  * Represents the ACLs in the metadata image.
- *
+ * <p>
  * This class is thread-safe.
  */
-public final class ScramCredentialData {
-    private final byte[] salt;
-    private final byte[] storedKey;
-    private final byte[] serverKey;
-    private final int iterations;
-
+public record ScramCredentialData(byte[] salt, byte[] storedKey, byte[] 
serverKey, int iterations) {
     public static ScramCredentialData fromRecord(
         UserScramCredentialRecord record
     ) {
         return new ScramCredentialData(
-                record.salt(),
-                record.storedKey(),
-                record.serverKey(),
-                record.iterations());
-    }
-
-    public ScramCredentialData(
-        byte[] salt,
-        byte[] storedKey,
-        byte[] serverKey,
-        int iterations
-    ) {
-        this.salt = salt;
-        this.storedKey = storedKey;
-        this.serverKey = serverKey;
-        this.iterations = iterations;
-    }
-
-    public byte[] salt() {
-        return salt;
-    }
-
-    public byte[] storedKey() {
-        return storedKey;
-    }
-
-    public byte[] serverKey() {
-        return serverKey;
-    }
-
-    public int iterations() {
-        return iterations;
+            record.salt(),
+            record.storedKey(),
+            record.serverKey(),
+            record.iterations());
     }
 
     public UserScramCredentialRecord toRecord(
@@ -78,12 +45,12 @@ public final class ScramCredentialData {
         ScramMechanism mechanism
     ) {
         return new UserScramCredentialRecord().
-                setName(userName).
-                setMechanism(mechanism.type()).
-                setSalt(salt).
-                setStoredKey(storedKey).
-                setServerKey(serverKey).
-                setIterations(iterations);
+            setName(userName).
+            setMechanism(mechanism.type()).
+            setSalt(salt).
+            setStoredKey(storedKey).
+            setServerKey(serverKey).
+            setIterations(iterations);
     }
 
     public ScramCredential toCredential() {
@@ -106,9 +73,9 @@ public final class ScramCredentialData {
         if (!o.getClass().equals(ScramCredentialData.class)) return false;
         ScramCredentialData other = (ScramCredentialData) o;
         return Arrays.equals(salt, other.salt) &&
-                Arrays.equals(storedKey, other.storedKey) &&
-                Arrays.equals(serverKey, other.serverKey) &&
-                iterations == other.iterations;
+            Arrays.equals(storedKey, other.storedKey) &&
+            Arrays.equals(serverKey, other.serverKey) &&
+            iterations == other.iterations;
     }
 
     @Override
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAcl.java 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAcl.java
index 8817b93bae4..21a142d3b80 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAcl.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAcl.java
@@ -27,13 +27,13 @@ import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 
-import java.util.Objects;
-
 
 /**
  * A Kafka ACLs which is identified by a UUID and stored in the metadata log.
  */
-public final class StandardAcl implements Comparable<StandardAcl> {
+public record StandardAcl(ResourceType resourceType, String resourceName, 
PatternType patternType, String principal,
+                          String host, AclOperation operation,
+                          AclPermissionType permissionType) implements 
Comparable<StandardAcl> {
     public static StandardAcl fromRecord(AccessControlEntryRecord record) {
         return new StandardAcl(
             ResourceType.fromCode(record.resourceType()),
@@ -56,47 +56,6 @@ public final class StandardAcl implements 
Comparable<StandardAcl> {
             acl.entry().permissionType());
     }
 
-    private final ResourceType resourceType;
-    private final String resourceName;
-    private final PatternType patternType;
-    private final String principal;
-    private final String host;
-    private final AclOperation operation;
-    private final AclPermissionType permissionType;
-
-    public StandardAcl(
-                ResourceType resourceType,
-                String resourceName,
-                PatternType patternType,
-                String principal,
-                String host,
-                AclOperation operation,
-                AclPermissionType permissionType) {
-        this.resourceType = resourceType;
-        this.resourceName = resourceName;
-        this.patternType = patternType;
-        this.principal = principal;
-        this.host = host;
-        this.operation = operation;
-        this.permissionType = permissionType;
-    }
-
-    public ResourceType resourceType() {
-        return resourceType;
-    }
-
-    public String resourceName() {
-        return resourceName;
-    }
-
-    public PatternType patternType() {
-        return patternType;
-    }
-
-    public String principal() {
-        return principal;
-    }
-
     public KafkaPrincipal kafkaPrincipal() {
         int colonIndex = principal.indexOf(":");
         if (colonIndex == -1) {
@@ -108,18 +67,6 @@ public final class StandardAcl implements 
Comparable<StandardAcl> {
         return new KafkaPrincipal(principalType, principalName);
     }
 
-    public String host() {
-        return host;
-    }
-
-    public AclOperation operation() {
-        return operation;
-    }
-
-    public AclPermissionType permissionType() {
-        return permissionType;
-    }
-
     public AclBinding toBinding() {
         ResourcePattern resourcePattern =
             new ResourcePattern(resourceType, resourceName, patternType);
@@ -128,32 +75,6 @@ public final class StandardAcl implements 
Comparable<StandardAcl> {
         return new AclBinding(resourcePattern, accessControlEntry);
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || !o.getClass().equals(StandardAcl.class)) return false;
-        if (o == this) return true;
-        StandardAcl other = (StandardAcl) o;
-        return resourceType.equals(other.resourceType) &&
-            resourceName.equals(other.resourceName) &&
-            patternType.equals(other.patternType) &&
-            principal.equals(other.principal) &&
-            host.equals(other.host) &&
-            operation.equals(other.operation) &&
-            permissionType.equals(other.permissionType);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(
-            resourceType,
-            resourceName,
-            patternType,
-            principal,
-            host,
-            operation,
-            permissionType);
-    }
-
     /**
      * Compare two StandardAcl objects. See {@link 
StandardAuthorizerData#authorize} for an
      * explanation of why we want this particular sort order.
@@ -176,17 +97,4 @@ public final class StandardAcl implements 
Comparable<StandardAcl> {
         result = permissionType.compareTo(other.permissionType);
         return result;
     }
-
-    @Override
-    public String toString() {
-        return "StandardAcl(" +
-            "resourceType=" + resourceType +
-            ", resourceName=" + resourceName +
-            ", patternType=" + patternType +
-            ", principal=" + principal +
-            ", host=" + host +
-            ", operation=" + operation +
-            ", permissionType=" + permissionType +
-            ")";
-    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAclWithId.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAclWithId.java
index 822a7752e0e..83a9d3f2bc9 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAclWithId.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAclWithId.java
@@ -21,34 +21,14 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.metadata.AccessControlEntryRecord;
 
-import java.util.Objects;
-
-
 /**
  * A tuple of (id, acl)
  */
-public final class StandardAclWithId {
+public record StandardAclWithId(Uuid id, StandardAcl acl) {
     public static StandardAclWithId fromRecord(AccessControlEntryRecord 
record) {
         return new StandardAclWithId(record.id(), 
StandardAcl.fromRecord(record));
     }
 
-    private final Uuid id;
-    private final StandardAcl acl;
-
-    public StandardAclWithId(Uuid id,
-                             StandardAcl acl) {
-        this.id = id;
-        this.acl = acl;
-    }
-
-    public Uuid id() {
-        return id;
-    }
-
-    public StandardAcl acl() {
-        return acl;
-    }
-
     public AccessControlEntryRecord toRecord() {
         return new AccessControlEntryRecord().
             setId(id).
@@ -64,26 +44,4 @@ public final class StandardAclWithId {
     public AclBinding toBinding() {
         return acl.toBinding();
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || !o.getClass().equals(StandardAclWithId.class)) return 
false;
-        if (o == this) return true;
-        StandardAclWithId other = (StandardAclWithId) o;
-        return id.equals(other.id) &&
-            acl.equals(other.acl);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(id, acl);
-    }
-
-    @Override
-    public String toString() {
-        return "StandardAclWithId(" +
-            "id=" + id +
-            ", acl=" + acl +
-            ")";
-    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index be50a0c895b..54bb6ea4784 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -550,17 +550,7 @@ public class StandardAuthorizerData {
         }
     }
 
-    private static class DefaultRule implements MatchingRule {
-        private final AuthorizationResult result;
-
-        private DefaultRule(AuthorizationResult result) {
-            this.result = result;
-        }
-
-        @Override
-        public AuthorizationResult result() {
-            return result;
-        }
+    private record DefaultRule(AuthorizationResult result) implements 
MatchingRule {
 
         @Override
         public String toString() {
@@ -568,19 +558,7 @@ public class StandardAuthorizerData {
         }
     }
 
-    private static class MatchingAclRule implements MatchingRule {
-        private final StandardAcl acl;
-        private final AuthorizationResult result;
-
-        private MatchingAclRule(StandardAcl acl, AuthorizationResult result) {
-            this.acl = acl;
-            this.result = result;
-        }
-
-        @Override
-        public AuthorizationResult result() {
-            return result;
-        }
+    private record MatchingAclRule(StandardAcl acl, AuthorizationResult 
result) implements MatchingRule {
 
         @Override
         public String toString() {
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/PlacementSpec.java 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/PlacementSpec.java
index 85daaf59e5d..c3247f1981c 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/PlacementSpec.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/PlacementSpec.java
@@ -19,65 +19,9 @@ package org.apache.kafka.metadata.placement;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.Objects;
-
-
 /**
  * Specifies a replica placement that we want to make.
  */
 @InterfaceStability.Unstable
-public class PlacementSpec {
-    private final int startPartition;
-
-    private final int numPartitions;
-
-    private final short numReplicas;
-
-    public PlacementSpec(
-        int startPartition,
-        int numPartitions,
-        short numReplicas
-    ) {
-        this.startPartition = startPartition;
-        this.numPartitions = numPartitions;
-        this.numReplicas = numReplicas;
-    }
-
-    public int startPartition() {
-        return startPartition;
-    }
-
-    public int numPartitions() {
-        return numPartitions;
-    }
-
-    public short numReplicas() {
-        return numReplicas;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null) return false;
-        if (!(o.getClass().equals(this.getClass()))) return false;
-        PlacementSpec other = (PlacementSpec) o;
-        return startPartition == other.startPartition &&
-            numPartitions == other.numPartitions &&
-            numReplicas == other.numReplicas;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(startPartition,
-            numPartitions,
-            numReplicas);
-    }
-
-    @Override
-    public String toString() {
-        return "PlacementSpec" +
-            "(startPartition=" + startPartition +
-            ", numPartitions=" + numPartitions +
-            ", numReplicas=" + numReplicas +
-            ")";
-    }
+public record PlacementSpec(int startPartition, int numPartitions, short 
numReplicas) {
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/TopicAssignment.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/TopicAssignment.java
index 88bdc5df96c..e34ae025174 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/TopicAssignment.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/TopicAssignment.java
@@ -18,42 +18,14 @@
 package org.apache.kafka.metadata.placement;
 
 import java.util.List;
-import java.util.Objects;
 
 /**
  * The topic assignment.
- *
+ * <p>
  * This class is immutable. It's internal state does not change.
  */
-public class TopicAssignment {
-    private final List<PartitionAssignment> assignments;
-
-    public TopicAssignment(List<PartitionAssignment> assignments) {
-        this.assignments = List.copyOf(assignments);
-    }
-
-    /**
-     * @return The replica assignment for each partition, where the index in 
the list corresponds to different partition.
-     */
-    public List<PartitionAssignment> assignments() {
-        return assignments;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof TopicAssignment other)) return false;
-        return assignments.equals(other.assignments);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(assignments);
-    }
-
-    @Override
-    public String toString() {
-        return "TopicAssignment" +
-            "(assignments=" + assignments +
-            ")";
+public record TopicAssignment(List<PartitionAssignment> assignments) {
+    public TopicAssignment {
+        assignments = List.copyOf(assignments);
     }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/UsableBroker.java 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/UsableBroker.java
index 17f531d023f..2c24d937f9e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/UsableBroker.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/UsableBroker.java
@@ -19,58 +19,11 @@ package org.apache.kafka.metadata.placement;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.Objects;
 import java.util.Optional;
 
-
 /**
  * A broker where a replica can be placed.
  */
 @InterfaceStability.Unstable
-public class UsableBroker {
-    private final int id;
-
-    private final Optional<String> rack;
-
-    private final boolean fenced;
-
-    public UsableBroker(int id, Optional<String> rack, boolean fenced) {
-        this.id = id;
-        this.rack = rack;
-        this.fenced = fenced;
-    }
-
-    public int id() {
-        return id;
-    }
-
-    public Optional<String> rack() {
-        return rack;
-    }
-
-    public boolean fenced() {
-        return fenced;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof UsableBroker other)) return false;
-        return other.id == id && other.rack.equals(rack) && other.fenced == 
fenced;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(id,
-            rack,
-            fenced);
-    }
-
-    @Override
-    public String toString() {
-        return "UsableBroker" +
-            "(id=" + id +
-            ", rack=" + rack +
-            ", fenced=" + fenced +
-            ")";
-    }
+public record UsableBroker(int id, Optional<String> rack, boolean fenced) {
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java 
b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
index 11f6403a850..7ac61b9e502 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
@@ -69,22 +69,7 @@ public final class BatchFileReader implements 
Iterator<BatchFileReader.BatchAndT
         }
     }
 
-    public static class BatchAndType {
-        private final Batch<ApiMessageAndVersion> batch;
-        private final boolean isControl;
-
-        public BatchAndType(Batch<ApiMessageAndVersion> batch, boolean 
isControl) {
-            this.batch = batch;
-            this.isControl = isControl;
-        }
-
-        public Batch<ApiMessageAndVersion> batch() {
-            return batch;
-        }
-
-        public boolean isControl() {
-            return isControl;
-        }
+    public record BatchAndType(Batch<ApiMessageAndVersion> batch, boolean 
isControl) {
     }
 
     private final FileRecords fileRecords;
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
index 1a9cd50bd1d..33c540b9d14 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
@@ -113,8 +113,8 @@ public class PartitionReassignmentReplicasTest {
             replicas.maybeCompleteReassignment(List.of(0, 1, 2, 3, 4, 5));
         assertTrue(reassignmentOptional.isPresent());
         PartitionReassignmentReplicas.CompletedReassignment 
completedReassignment = reassignmentOptional.get();
-        assertEquals(List.of(3, 4, 5), completedReassignment.isr);
-        assertEquals(List.of(3, 4, 5), completedReassignment.replicas);
+        assertEquals(List.of(3, 4, 5), completedReassignment.isr());
+        assertEquals(List.of(3, 4, 5), completedReassignment.replicas());
     }
 
     @Test
@@ -126,8 +126,8 @@ public class PartitionReassignmentReplicasTest {
             replicas.maybeCompleteReassignment(List.of(0, 1, 2, 3));
         assertTrue(reassignmentOptional.isPresent());
         PartitionReassignmentReplicas.CompletedReassignment 
completedReassignment = reassignmentOptional.get();
-        assertEquals(List.of(0, 1, 3), completedReassignment.isr);
-        assertEquals(List.of(0, 1, 3), completedReassignment.replicas);
+        assertEquals(List.of(0, 1, 3), completedReassignment.isr());
+        assertEquals(List.of(0, 1, 3), completedReassignment.replicas());
     }
 
     @Test
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/PeriodicTaskControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/PeriodicTaskControlManagerTest.java
index aebdb596c6c..2d62fdd02b1 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/PeriodicTaskControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/PeriodicTaskControlManagerTest.java
@@ -64,20 +64,7 @@ public class PeriodicTaskControlManagerTest {
         }
     }
 
-    static class TrackedTask {
-        final String tag;
-        final long deadlineNs;
-        final Supplier<ControllerResult<Void>> op;
-
-        TrackedTask(
-            String tag,
-            long deadlineNs,
-            Supplier<ControllerResult<Void>> op
-        ) {
-            this.tag = tag;
-            this.deadlineNs = deadlineNs;
-            this.op = op;
-        }
+    record TrackedTask(String tag, long deadlineNs, 
Supplier<ControllerResult<Void>> op) {
     }
 
     static class PeriodicTaskControlManagerTestEnv implements 
PeriodicTaskControlManager.QueueAccessor {
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java 
b/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java
index 323cc5abf9a..4cfd4334694 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Timeout;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
@@ -57,4 +58,22 @@ public class MetadataVersionChangeTest {
             "is changing from " + MetadataVersion.latestProduction() + " to " 
+ MetadataVersion.MINIMUM_VERSION,
                 new 
MetadataVersionChangeException(CHANGE_LATEST_TO_MINIMUM).toString());
     }
+
+    @Test
+    public void testConstructorThrowsExceptionWhenOldVersionIsNull() {
+        assertThrows(NullPointerException.class, () -> 
+            new MetadataVersionChange(null, MetadataVersion.MINIMUM_VERSION));
+    }
+
+    @Test
+    public void testConstructorThrowsExceptionWhenNewVersionIsNull() {
+        assertThrows(NullPointerException.class, () -> 
+            new MetadataVersionChange(MetadataVersion.MINIMUM_VERSION, null));
+    }
+
+    @Test
+    public void testConstructorThrowsExceptionWhenBothVersionsAreNull() {
+        assertThrows(NullPointerException.class, () -> 
+            new MetadataVersionChange(null, null));
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java 
b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index c022ae79893..bd15406cbb0 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -106,22 +106,7 @@ public class RecordTestUtils {
         }
     }
 
-    public static class ImageDeltaPair<I, D> {
-        private final Supplier<I> imageSupplier;
-        private final Function<I, D> deltaCreator;
-
-        public ImageDeltaPair(Supplier<I> imageSupplier, Function<I, D> 
deltaCreator) {
-            this.imageSupplier = imageSupplier;
-            this.deltaCreator = deltaCreator;
-        }
-
-        public Supplier<I> imageSupplier() {
-            return imageSupplier;
-        }
-
-        public Function<I, D> deltaCreator() {
-            return deltaCreator;
-        }
+    public record ImageDeltaPair<I, D>(Supplier<I> imageSupplier, Function<I, 
D> deltaCreator) {
     }
 
     public static class 
TestThroughAllIntermediateImagesLeadingToFinalImageHelper<D, I> {
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index b2f9fa68b3a..74d5a49d7ea 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -569,9 +569,9 @@ public class StandardAuthorizerTest {
 
             String expectedAuditLog = "Principal = User:bob is Denied 
operation = READ " +
                 "from host = 127.0.0.1 on resource = Topic:LITERAL:alpha for 
request = Fetch " +
-                "with resourceRefCount = 1 based on rule 
MatchingAcl(acl=StandardAcl(resourceType=TOPIC, " +
+                "with resourceRefCount = 1 based on rule 
MatchingAcl(acl=StandardAcl[resourceType=TOPIC, " +
                 "resourceName=alp, patternType=PREFIXED, principal=User:bob, 
host=*, operation=READ, " +
-                "permissionType=DENY))";
+                "permissionType=DENY])";
 
             if (logIfDenied) {
                 Mockito.verify(auditLog).info(expectedAuditLog);
@@ -611,9 +611,9 @@ public class StandardAuthorizerTest {
 
             String expectedAuditLog = "Principal = User:bob is Allowed 
operation = READ " +
                 "from host = 127.0.0.1 on resource = Topic:LITERAL:green1 for 
request = Fetch " +
-                "with resourceRefCount = 1 based on rule 
MatchingAcl(acl=StandardAcl(resourceType=TOPIC, " +
+                "with resourceRefCount = 1 based on rule 
MatchingAcl(acl=StandardAcl[resourceType=TOPIC, " +
                 "resourceName=green, patternType=PREFIXED, principal=User:bob, 
host=*, operation=READ, " +
-                "permissionType=ALLOW))";
+                "permissionType=ALLOW])";
 
             if (logIfAllowed) {
                 Mockito.verify(auditLog).debug(expectedAuditLog);
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java
index 3551e2ce7b0..81625151dba 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java
@@ -85,7 +85,7 @@ public class TopicAssignmentTest {
             new PartitionAssignment(replicas, directories::get)
         );
         TopicAssignment topicAssignment = new 
TopicAssignment(partitionAssignments);
-        
assertEquals("TopicAssignment(assignments=[PartitionAssignment(replicas=[0, 1, 
2], " +
-                "directories=[v56qeYzNRrqNtXsxzcReog, MvUIAsOiRlSePeiBHdZrSQ, 
jUqCchHtTHqMxeVv4dw1RA])])", topicAssignment.toString());
+        
assertEquals("TopicAssignment[assignments=[PartitionAssignment(replicas=[0, 1, 
2], " +
+                "directories=[v56qeYzNRrqNtXsxzcReog, MvUIAsOiRlSePeiBHdZrSQ, 
jUqCchHtTHqMxeVv4dw1RA])]]", topicAssignment.toString());
     }
 }

Reply via email to