This is an automated email from the ASF dual-hosted git repository.
clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aac62a32d9f KAFKA-18698: Migrate suitable classes to records in server
and server-common modules (#18783)
aac62a32d9f is described below
commit aac62a32d9f47d57f3aada9e8353053b7ee2bbd7
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Feb 5 18:00:11 2025 +0800
KAFKA-18698: Migrate suitable classes to records in server and
server-common modules (#18783)
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>, Christo Lolov <[email protected]>
---
.../kafka/server/common/FinalizedFeatures.java | 45 ++---------
.../kafka/server/common/TopicIdPartition.java | 42 ++---------
.../kafka/server/util/InterBrokerSendThread.java | 10 +--
.../server/util/RequestAndCompletionHandler.java | 35 ++-------
.../server/util/InterBrokerSendThreadTest.java | 34 ++++-----
.../java/org/apache/kafka/server/Assignment.java | 86 ++++------------------
6 files changed, 54 insertions(+), 198 deletions(-)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
index 1eb39466409..1bf77a7c14c 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
@@ -21,11 +21,11 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-public final class FinalizedFeatures {
- private final MetadataVersion metadataVersion;
- private final Map<String, Short> finalizedFeatures;
- private final long finalizedFeaturesEpoch;
-
+public record FinalizedFeatures(
+ MetadataVersion metadataVersion,
+ Map<String, Short> finalizedFeatures,
+ long finalizedFeaturesEpoch
+) {
public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
return new FinalizedFeatures(version, Collections.emptyMap(), -1);
}
@@ -40,39 +40,4 @@ public final class FinalizedFeatures {
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME,
metadataVersion.featureLevel());
}
-
- public MetadataVersion metadataVersion() {
- return metadataVersion;
- }
-
- public Map<String, Short> finalizedFeatures() {
- return finalizedFeatures;
- }
-
- public long finalizedFeaturesEpoch() {
- return finalizedFeaturesEpoch;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == null || !(o.getClass().equals(FinalizedFeatures.class)))
return false;
- FinalizedFeatures other = (FinalizedFeatures) o;
- return metadataVersion == other.metadataVersion &&
- finalizedFeatures.equals(other.finalizedFeatures) &&
- finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(metadataVersion, finalizedFeatures,
finalizedFeaturesEpoch);
- }
-
- @Override
- public String toString() {
- return "Features" +
- "(metadataVersion=" + metadataVersion +
- ", finalizedFeatures=" + finalizedFeatures +
- ", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch +
- ")";
- }
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/TopicIdPartition.java
b/server-common/src/main/java/org/apache/kafka/server/common/TopicIdPartition.java
index 74f62dd953c..3a8a75b25db 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/TopicIdPartition.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/TopicIdPartition.java
@@ -18,45 +18,15 @@ package org.apache.kafka.server.common;
import org.apache.kafka.common.Uuid;
-import java.util.Objects;
-
/**
* Represents a partition using its unique topic Id and partition number.
+ * @param topicId Universally unique Id representing this topic partition.
+ * @param partitionId The partition Id.
*/
-public final class TopicIdPartition {
- private final Uuid topicId;
- private final int partitionId;
-
- public TopicIdPartition(Uuid topicId, int partitionId) {
- this.topicId = topicId;
- this.partitionId = partitionId;
- }
-
- /**
- * @return Universally unique Id representing this topic partition.
- */
- public Uuid topicId() {
- return topicId;
- }
-
- /**
- * @return The partition Id.
- */
- public int partitionId() {
- return partitionId;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof TopicIdPartition other)) return false;
- return other.topicId.equals(topicId) && other.partitionId ==
partitionId;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(topicId, partitionId);
- }
-
+public record TopicIdPartition(
+ Uuid topicId,
+ int partitionId
+) {
@Override
public String toString() {
return topicId + ":" + partitionId;
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
b/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
index 093946eb5f0..cdd7ae3e98e 100644
---
a/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
+++
b/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
@@ -89,14 +89,14 @@ public abstract class InterBrokerSendThread extends
ShutdownableThread {
private void drainGeneratedRequests() {
generateRequests().forEach(request ->
unsentRequests.put(
- request.destination,
+ request.destination(),
networkClient.newClientRequest(
- request.destination.idString(),
- request.request,
- request.creationTimeMs,
+ request.destination().idString(),
+ request.request(),
+ request.creationTimeMs(),
true,
requestTimeoutMs,
- request.handler
+ request.handler()
)
)
);
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
b/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
index da14fb5e4a4..7e2fea6af9b 100644
---
a/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
+++
b/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
@@ -20,32 +20,9 @@ import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.requests.AbstractRequest;
-public final class RequestAndCompletionHandler {
-
- public final long creationTimeMs;
- public final Node destination;
- public final AbstractRequest.Builder<? extends AbstractRequest> request;
- public final RequestCompletionHandler handler;
-
- public RequestAndCompletionHandler(
- long creationTimeMs,
- Node destination,
- AbstractRequest.Builder<? extends AbstractRequest> request,
- RequestCompletionHandler handler
- ) {
- this.creationTimeMs = creationTimeMs;
- this.destination = destination;
- this.request = request;
- this.handler = handler;
- }
-
- @Override
- public String toString() {
- return "RequestAndCompletionHandler(" +
- "creationTimeMs=" + creationTimeMs +
- ", destination=" + destination +
- ", request=" + request +
- ", handler=" + handler +
- ')';
- }
-}
+public record RequestAndCompletionHandler(
+ long creationTimeMs,
+ Node destination,
+ AbstractRequest.Builder<? extends AbstractRequest> request,
+ RequestCompletionHandler handler
+) { }
diff --git
a/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
b/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
index f0d029de8e2..1e89a2297eb 100644
---
a/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
@@ -166,15 +166,15 @@ public class InterBrokerSendThreadTest {
final TestInterBrokerSendThread sendThread = new
TestInterBrokerSendThread();
final ClientRequest clientRequest =
- new ClientRequest("dest", request, 0, "1", 0, true,
requestTimeoutMs, handler.handler);
+ new ClientRequest("dest", request, 0, "1", 0, true,
requestTimeoutMs, handler.handler());
when(networkClient.newClientRequest(
ArgumentMatchers.eq("1"),
- same(handler.request),
+ same(handler.request()),
anyLong(),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
- same(handler.handler)
+ same(handler.handler())
)).thenReturn(clientRequest);
when(networkClient.ready(node, time.milliseconds())).thenReturn(true);
@@ -187,11 +187,11 @@ public class InterBrokerSendThreadTest {
verify(networkClient)
.newClientRequest(
ArgumentMatchers.eq("1"),
- same(handler.request),
+ same(handler.request()),
anyLong(),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
- same(handler.handler));
+ same(handler.handler()));
verify(networkClient).ready(any(), anyLong());
verify(networkClient).send(same(clientRequest), anyLong());
verify(networkClient).poll(anyLong(), anyLong());
@@ -209,15 +209,15 @@ public class InterBrokerSendThreadTest {
final TestInterBrokerSendThread sendThread = new
TestInterBrokerSendThread();
final ClientRequest clientRequest =
- new ClientRequest("dest", request, 0, "1", 0, true,
requestTimeoutMs, handler.handler);
+ new ClientRequest("dest", request, 0, "1", 0, true,
requestTimeoutMs, handler.handler());
when(networkClient.newClientRequest(
ArgumentMatchers.eq("1"),
- same(handler.request),
+ same(handler.request()),
anyLong(),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
- same(handler.handler)
+ same(handler.handler())
)).thenReturn(clientRequest);
when(networkClient.ready(node, time.milliseconds())).thenReturn(false);
@@ -236,11 +236,11 @@ public class InterBrokerSendThreadTest {
verify(networkClient)
.newClientRequest(
ArgumentMatchers.eq("1"),
- same(handler.request),
+ same(handler.request()),
anyLong(),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
- same(handler.handler));
+ same(handler.handler()));
verify(networkClient).ready(any(), anyLong());
verify(networkClient).connectionDelay(any(), anyLong());
verify(networkClient).poll(anyLong(), anyLong());
@@ -261,16 +261,16 @@ public class InterBrokerSendThreadTest {
final ClientRequest clientRequest =
new ClientRequest(
- "dest", request, 0, "1", time.milliseconds(), true,
requestTimeoutMs, handler.handler);
+ "dest", request, 0, "1", time.milliseconds(), true,
requestTimeoutMs, handler.handler());
time.sleep(1500L);
when(networkClient.newClientRequest(
ArgumentMatchers.eq("1"),
- same(handler.request),
- ArgumentMatchers.eq(handler.creationTimeMs),
+ same(handler.request()),
+ ArgumentMatchers.eq(handler.creationTimeMs()),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
- same(handler.handler)
+ same(handler.handler())
)).thenReturn(clientRequest);
// make the node unready so the request is not cleared
@@ -289,11 +289,11 @@ public class InterBrokerSendThreadTest {
verify(networkClient)
.newClientRequest(
ArgumentMatchers.eq("1"),
- same(handler.request),
- ArgumentMatchers.eq(handler.creationTimeMs),
+ same(handler.request()),
+ ArgumentMatchers.eq(handler.creationTimeMs()),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
- same(handler.handler));
+ same(handler.handler()));
verify(networkClient).ready(any(), anyLong());
verify(networkClient).connectionDelay(any(), anyLong());
verify(networkClient).poll(anyLong(), anyLong());
diff --git a/server/src/main/java/org/apache/kafka/server/Assignment.java
b/server/src/main/java/org/apache/kafka/server/Assignment.java
index 393a0dae1dc..680a62e08a7 100644
--- a/server/src/main/java/org/apache/kafka/server/Assignment.java
+++ b/server/src/main/java/org/apache/kafka/server/Assignment.java
@@ -24,64 +24,25 @@ import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.TopicIdPartition;
-import java.util.Objects;
-
-final class Assignment {
- /**
- * The topic ID and partition index of the replica.
- */
- private final TopicIdPartition topicIdPartition;
-
- /**
- * The ID of the directory we are placing the replica into.
- */
- private final Uuid directoryId;
-
- /**
- * The time in monotonic nanosecond when this assignment was created.
- */
- private final long submissionTimeNs;
-
- /**
- * The callback to invoke on success.
- */
- private final Runnable successCallback;
-
- Assignment(
- TopicIdPartition topicIdPartition,
- Uuid directoryId,
- long submissionTimeNs,
- Runnable successCallback
- ) {
- this.topicIdPartition = topicIdPartition;
- this.directoryId = directoryId;
- this.submissionTimeNs = submissionTimeNs;
- this.successCallback = successCallback;
- }
-
- TopicIdPartition topicIdPartition() {
- return topicIdPartition;
- }
-
- Uuid directoryId() {
- return directoryId;
- }
-
- long submissionTimeNs() {
- return submissionTimeNs;
- }
-
- Runnable successCallback() {
- return successCallback;
- }
+/**
+ * @param topicIdPartition The topic ID and partition index of the replica.
+ * @param directoryId The ID of the directory we are placing the replica
into.
+ * @param submissionTimeNs The time in monotonic nanosecond when this
assignment was created.
+ * @param successCallback The callback to invoke on success.
+ */
+record Assignment(
+ TopicIdPartition topicIdPartition,
+ Uuid directoryId,
+ long submissionTimeNs,
+ Runnable successCallback
+) {
/**
* Check if this Assignment is still valid to be sent.
*
- * @param nodeId The broker ID.
- * @param image The metadata image.
- *
- * @return True only if the Assignment is still valid.
+ * @param nodeId The broker ID.
+ * @param image The metadata image.
+ * @return True only if the Assignment is still valid.
*/
boolean valid(int nodeId, MetadataImage image) {
TopicImage topicImage =
image.topics().getTopic(topicIdPartition.topicId());
@@ -96,23 +57,6 @@ final class Assignment {
return Replicas.contains(partition.replicas, nodeId);
}
- @Override
- public boolean equals(Object o) {
- if (o == null || (!(o instanceof Assignment other))) return false;
- return topicIdPartition.equals(other.topicIdPartition) &&
- directoryId.equals(other.directoryId) &&
- submissionTimeNs == other.submissionTimeNs &&
- successCallback.equals(other.successCallback);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(topicIdPartition,
- directoryId,
- submissionTimeNs,
- successCallback);
- }
-
@Override
public String toString() {
StringBuilder bld = new StringBuilder();