This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aac62a32d9f KAFKA-18698: Migrate suitable classes to records in server 
and server-common modules (#18783)
aac62a32d9f is described below

commit aac62a32d9f47d57f3aada9e8353053b7ee2bbd7
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Feb 5 18:00:11 2025 +0800

    KAFKA-18698: Migrate suitable classes to records in server and 
server-common modules (#18783)
    
    Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai 
<[email protected]>, Christo Lolov <[email protected]>
---
 .../kafka/server/common/FinalizedFeatures.java     | 45 ++---------
 .../kafka/server/common/TopicIdPartition.java      | 42 ++---------
 .../kafka/server/util/InterBrokerSendThread.java   | 10 +--
 .../server/util/RequestAndCompletionHandler.java   | 35 ++-------
 .../server/util/InterBrokerSendThreadTest.java     | 34 ++++-----
 .../java/org/apache/kafka/server/Assignment.java   | 86 ++++------------------
 6 files changed, 54 insertions(+), 198 deletions(-)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
index 1eb39466409..1bf77a7c14c 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
@@ -21,11 +21,11 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-public final class FinalizedFeatures {
-    private final MetadataVersion metadataVersion;
-    private final Map<String, Short> finalizedFeatures;
-    private final long finalizedFeaturesEpoch;
-
+public record FinalizedFeatures(
+    MetadataVersion metadataVersion,
+    Map<String, Short> finalizedFeatures,
+    long finalizedFeaturesEpoch
+) {
     public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
         return new FinalizedFeatures(version, Collections.emptyMap(), -1);
     }
@@ -40,39 +40,4 @@ public final class FinalizedFeatures {
         this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
         this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.featureLevel());
     }
-
-    public MetadataVersion metadataVersion() {
-        return metadataVersion;
-    }
-
-    public Map<String, Short> finalizedFeatures() {
-        return finalizedFeatures;
-    }
-
-    public long finalizedFeaturesEpoch() {
-        return finalizedFeaturesEpoch;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || !(o.getClass().equals(FinalizedFeatures.class))) 
return false;
-        FinalizedFeatures other = (FinalizedFeatures) o;
-        return metadataVersion == other.metadataVersion &&
-            finalizedFeatures.equals(other.finalizedFeatures) &&
-                finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(metadataVersion, finalizedFeatures, 
finalizedFeaturesEpoch);
-    }
-
-    @Override
-    public String toString() {
-        return "Features" +
-                "(metadataVersion=" + metadataVersion +
-                ", finalizedFeatures=" + finalizedFeatures +
-                ", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch +
-                ")";
-    }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/TopicIdPartition.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/TopicIdPartition.java
index 74f62dd953c..3a8a75b25db 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/TopicIdPartition.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/TopicIdPartition.java
@@ -18,45 +18,15 @@ package org.apache.kafka.server.common;
 
 import org.apache.kafka.common.Uuid;
 
-import java.util.Objects;
-
 /**
  * Represents a partition using its unique topic Id and partition number.
+ * @param topicId Universally unique Id representing this topic partition.
+ * @param partitionId The partition Id.
  */
-public final class TopicIdPartition {
-    private final Uuid topicId;
-    private final int partitionId;
-
-    public TopicIdPartition(Uuid topicId, int partitionId) {
-        this.topicId = topicId;
-        this.partitionId = partitionId;
-    }
-
-    /**
-     * @return Universally unique Id representing this topic partition.
-     */
-    public Uuid topicId() {
-        return topicId;
-    }
-
-    /**
-     * @return The partition Id.
-     */
-    public int partitionId() {
-        return partitionId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof TopicIdPartition other)) return false;
-        return other.topicId.equals(topicId) && other.partitionId == 
partitionId;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(topicId, partitionId);
-    }
-
+public record TopicIdPartition(
+    Uuid topicId,
+    int partitionId
+) {
     @Override
     public String toString() {
         return topicId + ":" + partitionId;
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
 
b/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
index 093946eb5f0..cdd7ae3e98e 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
@@ -89,14 +89,14 @@ public abstract class InterBrokerSendThread extends 
ShutdownableThread {
     private void drainGeneratedRequests() {
         generateRequests().forEach(request ->
             unsentRequests.put(
-                request.destination,
+                request.destination(),
                 networkClient.newClientRequest(
-                    request.destination.idString(),
-                    request.request,
-                    request.creationTimeMs,
+                    request.destination().idString(),
+                    request.request(),
+                    request.creationTimeMs(),
                     true,
                     requestTimeoutMs,
-                    request.handler
+                    request.handler()
                 )
             )
         );
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
 
b/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
index da14fb5e4a4..7e2fea6af9b 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
@@ -20,32 +20,9 @@ import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.requests.AbstractRequest;
 
-public final class RequestAndCompletionHandler {
-
-    public final long creationTimeMs;
-    public final Node destination;
-    public final AbstractRequest.Builder<? extends AbstractRequest> request;
-    public final RequestCompletionHandler handler;
-
-    public RequestAndCompletionHandler(
-        long creationTimeMs,
-        Node destination,
-        AbstractRequest.Builder<? extends AbstractRequest> request,
-        RequestCompletionHandler handler
-    ) {
-        this.creationTimeMs = creationTimeMs;
-        this.destination = destination;
-        this.request = request;
-        this.handler = handler;
-    }
-
-    @Override
-    public String toString() {
-        return "RequestAndCompletionHandler(" +
-            "creationTimeMs=" + creationTimeMs +
-            ", destination=" + destination +
-            ", request=" + request +
-            ", handler=" + handler +
-            ')';
-    }
-}
+public record RequestAndCompletionHandler(
+    long creationTimeMs,
+    Node destination,
+    AbstractRequest.Builder<? extends AbstractRequest> request,
+    RequestCompletionHandler handler
+) { }
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
index f0d029de8e2..1e89a2297eb 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
@@ -166,15 +166,15 @@ public class InterBrokerSendThreadTest {
         final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
 
         final ClientRequest clientRequest =
-            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler);
+            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler());
 
         when(networkClient.newClientRequest(
             ArgumentMatchers.eq("1"),
-            same(handler.request),
+            same(handler.request()),
             anyLong(),
             ArgumentMatchers.eq(true),
             ArgumentMatchers.eq(requestTimeoutMs),
-            same(handler.handler)
+            same(handler.handler())
         )).thenReturn(clientRequest);
 
         when(networkClient.ready(node, time.milliseconds())).thenReturn(true);
@@ -187,11 +187,11 @@ public class InterBrokerSendThreadTest {
         verify(networkClient)
             .newClientRequest(
                 ArgumentMatchers.eq("1"),
-                same(handler.request),
+                same(handler.request()),
                 anyLong(),
                 ArgumentMatchers.eq(true),
                 ArgumentMatchers.eq(requestTimeoutMs),
-                same(handler.handler));
+                same(handler.handler()));
         verify(networkClient).ready(any(), anyLong());
         verify(networkClient).send(same(clientRequest), anyLong());
         verify(networkClient).poll(anyLong(), anyLong());
@@ -209,15 +209,15 @@ public class InterBrokerSendThreadTest {
         final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
 
         final ClientRequest clientRequest =
-            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler);
+            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler());
 
         when(networkClient.newClientRequest(
             ArgumentMatchers.eq("1"),
-            same(handler.request),
+            same(handler.request()),
             anyLong(),
             ArgumentMatchers.eq(true),
             ArgumentMatchers.eq(requestTimeoutMs),
-            same(handler.handler)
+            same(handler.handler())
         )).thenReturn(clientRequest);
 
         when(networkClient.ready(node, time.milliseconds())).thenReturn(false);
@@ -236,11 +236,11 @@ public class InterBrokerSendThreadTest {
         verify(networkClient)
             .newClientRequest(
                 ArgumentMatchers.eq("1"),
-                same(handler.request),
+                same(handler.request()),
                 anyLong(),
                 ArgumentMatchers.eq(true),
                 ArgumentMatchers.eq(requestTimeoutMs),
-                same(handler.handler));
+                same(handler.handler()));
         verify(networkClient).ready(any(), anyLong());
         verify(networkClient).connectionDelay(any(), anyLong());
         verify(networkClient).poll(anyLong(), anyLong());
@@ -261,16 +261,16 @@ public class InterBrokerSendThreadTest {
 
         final ClientRequest clientRequest =
             new ClientRequest(
-                "dest", request, 0, "1", time.milliseconds(), true, 
requestTimeoutMs, handler.handler);
+                "dest", request, 0, "1", time.milliseconds(), true, 
requestTimeoutMs, handler.handler());
         time.sleep(1500L);
 
         when(networkClient.newClientRequest(
             ArgumentMatchers.eq("1"),
-            same(handler.request),
-            ArgumentMatchers.eq(handler.creationTimeMs),
+            same(handler.request()),
+            ArgumentMatchers.eq(handler.creationTimeMs()),
             ArgumentMatchers.eq(true),
             ArgumentMatchers.eq(requestTimeoutMs),
-            same(handler.handler)
+            same(handler.handler())
         )).thenReturn(clientRequest);
 
         // make the node unready so the request is not cleared
@@ -289,11 +289,11 @@ public class InterBrokerSendThreadTest {
         verify(networkClient)
             .newClientRequest(
                 ArgumentMatchers.eq("1"),
-                same(handler.request),
-                ArgumentMatchers.eq(handler.creationTimeMs),
+                same(handler.request()),
+                ArgumentMatchers.eq(handler.creationTimeMs()),
                 ArgumentMatchers.eq(true),
                 ArgumentMatchers.eq(requestTimeoutMs),
-                same(handler.handler));
+                same(handler.handler()));
         verify(networkClient).ready(any(), anyLong());
         verify(networkClient).connectionDelay(any(), anyLong());
         verify(networkClient).poll(anyLong(), anyLong());
diff --git a/server/src/main/java/org/apache/kafka/server/Assignment.java 
b/server/src/main/java/org/apache/kafka/server/Assignment.java
index 393a0dae1dc..680a62e08a7 100644
--- a/server/src/main/java/org/apache/kafka/server/Assignment.java
+++ b/server/src/main/java/org/apache/kafka/server/Assignment.java
@@ -24,64 +24,25 @@ import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.server.common.TopicIdPartition;
 
-import java.util.Objects;
-
-final class Assignment {
-    /**
-     * The topic ID and partition index of the replica.
-     */
-    private final TopicIdPartition topicIdPartition;
-
-    /**
-     * The ID of the directory we are placing the replica into.
-     */
-    private final Uuid directoryId;
-
-    /**
-     * The time in monotonic nanosecond when this assignment was created.
-     */
-    private final long submissionTimeNs;
-
-    /**
-     * The callback to invoke on success.
-     */
-    private final Runnable successCallback;
-
-    Assignment(
-        TopicIdPartition topicIdPartition,
-        Uuid directoryId,
-        long submissionTimeNs,
-        Runnable successCallback
-    ) {
-        this.topicIdPartition = topicIdPartition;
-        this.directoryId = directoryId;
-        this.submissionTimeNs = submissionTimeNs;
-        this.successCallback = successCallback;
-    }
-
-    TopicIdPartition topicIdPartition() {
-        return topicIdPartition;
-    }
-
-    Uuid directoryId() {
-        return directoryId;
-    }
-
-    long submissionTimeNs() {
-        return submissionTimeNs;
-    }
-
-    Runnable successCallback() {
-        return successCallback;
-    }
+/**
+ * @param topicIdPartition The topic ID and partition index of the replica.
+ * @param directoryId      The ID of the directory we are placing the replica 
into.
+ * @param submissionTimeNs The time in monotonic nanosecond when this 
assignment was created.
+ * @param successCallback  The callback to invoke on success.
+ */
+record Assignment(
+    TopicIdPartition topicIdPartition,
+    Uuid directoryId,
+    long submissionTimeNs,
+    Runnable successCallback
+) {
 
     /**
      * Check if this Assignment is still valid to be sent.
      *
-     * @param nodeId    The broker ID.
-     * @param image     The metadata image.
-     *
-     * @return          True only if the Assignment is still valid.
+     * @param nodeId The broker ID.
+     * @param image  The metadata image.
+     * @return True only if the Assignment is still valid.
      */
     boolean valid(int nodeId, MetadataImage image) {
         TopicImage topicImage = 
image.topics().getTopic(topicIdPartition.topicId());
@@ -96,23 +57,6 @@ final class Assignment {
         return Replicas.contains(partition.replicas, nodeId);
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || (!(o instanceof Assignment other))) return false;
-        return topicIdPartition.equals(other.topicIdPartition) &&
-            directoryId.equals(other.directoryId) &&
-            submissionTimeNs == other.submissionTimeNs &&
-            successCallback.equals(other.successCallback);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(topicIdPartition,
-            directoryId,
-            submissionTimeNs,
-            successCallback);
-    }
-
     @Override
     public String toString() {
         StringBuilder bld = new StringBuilder();

Reply via email to