This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 707a44a6cb7 KAFKA-19068 Eliminate the duplicate type check in creating
ControlRecord (#19346)
707a44a6cb7 is described below
commit 707a44a6cb700e4c029a8a386dc1edde1c5c7253
Author: Nick Guo <[email protected]>
AuthorDate: Sun May 11 00:07:00 2025 +0800
KAFKA-19068 Eliminate the duplicate type check in creating ControlRecord
(#19346)
jira: https://issues.apache.org/jira/browse/KAFKA-19068
`RecordsIterator#decodeControlRecord` do the type check and then
`ControlRecord` constructor does that again.
we should add a static method to ControlRecord to create `ControlRecord`
with type check, and then `ControlRecord` constructor should be changed
to private to ensure all instance is created by the static method.
Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/image/loader/MetadataLoaderTest.java | 7 +--
.../java/org/apache/kafka/raft/ControlRecord.java | 69 ++++++++++------------
.../kafka/raft/internals/RecordsIterator.java | 16 +----
.../org/apache/kafka/raft/ControlRecordTest.java | 20 ++-----
.../kafka/raft/KafkaRaftClientReconfigTest.java | 12 ++--
.../raft/internals/RecordsBatchReaderTest.java | 2 +-
.../kafka/raft/internals/RecordsIteratorTest.java | 2 +-
7 files changed, 48 insertions(+), 80 deletions(-)
diff --git
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index beaeefdc38c..31960252bd8 100644
---
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
-import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@@ -253,7 +252,7 @@ public class MetadataLoaderTest {
100,
4000,
10,
- List.of(new
ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
+ List.of(ControlRecord.of(new
SnapshotHeaderRecord()))
),
Batch.data(0, 0, 0, 0,
List.of(new ApiMessageAndVersion(new
FeatureLevelRecord().
@@ -386,7 +385,7 @@ public class MetadataLoaderTest {
100,
4000,
10,
- List.of(new
ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
+ List.of(ControlRecord.of(new SnapshotHeaderRecord()))
)
)
);
@@ -485,7 +484,7 @@ public class MetadataLoaderTest {
100,
4000,
10,
- List.of(new
ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
+ List.of(ControlRecord.of(new SnapshotHeaderRecord()))
)
)
).setTime(time);
diff --git a/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java
b/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java
index 9588f3c1138..16a0a95ffa7 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java
@@ -23,54 +23,49 @@ import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.ControlRecordUtils;
+import java.nio.ByteBuffer;
import java.util.Objects;
+
public final class ControlRecord {
private final ControlRecordType recordType;
private final ApiMessage message;
- private static void throwIllegalArgument(ControlRecordType recordType,
ApiMessage message) {
- throw new IllegalArgumentException(
- String.format(
- "Record type %s doesn't match message class %s",
- recordType,
- message.getClass()
- )
- );
+ public static ControlRecord of(ByteBuffer key, ByteBuffer value) {
+ ControlRecordType recordType = ControlRecordType.parse(key);
+ final ApiMessage message = switch (recordType) {
+ case LEADER_CHANGE ->
ControlRecordUtils.deserializeLeaderChangeMessage(value);
+ case SNAPSHOT_HEADER ->
ControlRecordUtils.deserializeSnapshotHeaderRecord(value);
+ case SNAPSHOT_FOOTER ->
ControlRecordUtils.deserializeSnapshotFooterRecord(value);
+ case KRAFT_VERSION ->
ControlRecordUtils.deserializeKRaftVersionRecord(value);
+ case KRAFT_VOTERS ->
ControlRecordUtils.deserializeVotersRecord(value);
+ default -> throw new
IllegalArgumentException(String.format("Unknown control record type %s",
recordType));
+ };
+ return new ControlRecord(recordType, message);
}
- public ControlRecord(ControlRecordType recordType, ApiMessage message) {
- switch (recordType) {
- case LEADER_CHANGE:
- if (!(message instanceof LeaderChangeMessage)) {
- throwIllegalArgument(recordType, message);
- }
- break;
- case SNAPSHOT_HEADER:
- if (!(message instanceof SnapshotHeaderRecord)) {
- throwIllegalArgument(recordType, message);
- }
- break;
- case SNAPSHOT_FOOTER:
- if (!(message instanceof SnapshotFooterRecord)) {
- throwIllegalArgument(recordType, message);
- }
- break;
- case KRAFT_VERSION:
- if (!(message instanceof KRaftVersionRecord)) {
- throwIllegalArgument(recordType, message);
- }
- break;
- case KRAFT_VOTERS:
- if (!(message instanceof VotersRecord)) {
- throwIllegalArgument(recordType, message);
- }
- break;
- default:
- throw new IllegalArgumentException(String.format("Unknown
control record type %s", recordType));
+ //this is for test only
+ public static ControlRecord of(ApiMessage message) {
+ ControlRecordType recordType;
+ if (message instanceof LeaderChangeMessage) {
+ recordType = ControlRecordType.LEADER_CHANGE;
+ } else if (message instanceof SnapshotHeaderRecord) {
+ recordType = ControlRecordType.SNAPSHOT_HEADER;
+ } else if (message instanceof SnapshotFooterRecord) {
+ recordType = ControlRecordType.SNAPSHOT_FOOTER;
+ } else if (message instanceof KRaftVersionRecord) {
+ recordType = ControlRecordType.KRAFT_VERSION;
+ } else if (message instanceof VotersRecord) {
+ recordType = ControlRecordType.KRAFT_VOTERS;
+ } else {
+ throw new IllegalArgumentException(String.format("Unknown control
record type %s", message.getClass()));
}
+ return new ControlRecord(recordType, message);
+ }
+ private ControlRecord(ControlRecordType recordType, ApiMessage message) {
this.recordType = recordType;
this.message = message;
}
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
index b912d748bca..b1e711bf5e4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
@@ -16,10 +16,7 @@
*/
package org.apache.kafka.raft.internals;
-import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
-import org.apache.kafka.common.record.ControlRecordType;
-import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
@@ -370,17 +367,6 @@ public final class RecordsIterator<T> implements
Iterator<Batch<T>>, AutoCloseab
throw new IllegalArgumentException("Got an unexpected empty value
in the record");
}
- ControlRecordType type = ControlRecordType.parse(key.get());
-
- final ApiMessage message = switch (type) {
- case LEADER_CHANGE ->
ControlRecordUtils.deserializeLeaderChangeMessage(value.get());
- case SNAPSHOT_HEADER ->
ControlRecordUtils.deserializeSnapshotHeaderRecord(value.get());
- case SNAPSHOT_FOOTER ->
ControlRecordUtils.deserializeSnapshotFooterRecord(value.get());
- case KRAFT_VERSION ->
ControlRecordUtils.deserializeKRaftVersionRecord(value.get());
- case KRAFT_VOTERS ->
ControlRecordUtils.deserializeVotersRecord(value.get());
- default -> throw new
IllegalArgumentException(String.format("Unknown control record type %s", type));
- };
-
- return new ControlRecord(type, message);
+ return ControlRecord.of(key.get(), value.get());
}
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/ControlRecordTest.java
b/raft/src/test/java/org/apache/kafka/raft/ControlRecordTest.java
index fed7bf21664..b6416c69be7 100644
--- a/raft/src/test/java/org/apache/kafka/raft/ControlRecordTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/ControlRecordTest.java
@@ -34,24 +34,16 @@ public final class ControlRecordTest {
@Test
void testCtr() {
// Valid constructions
- new ControlRecord(ControlRecordType.LEADER_CHANGE, new
LeaderChangeMessage());
- new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new
SnapshotHeaderRecord());
- new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, new
SnapshotFooterRecord());
- new ControlRecord(ControlRecordType.KRAFT_VERSION, new
KRaftVersionRecord());
- new ControlRecord(ControlRecordType.KRAFT_VOTERS, new VotersRecord());
+ ControlRecord.of(new LeaderChangeMessage());
+ ControlRecord.of(new SnapshotHeaderRecord());
+ ControlRecord.of(new SnapshotFooterRecord());
+ ControlRecord.of(new KRaftVersionRecord());
+ ControlRecord.of(new VotersRecord());
// Invalid constructions
assertThrows(
IllegalArgumentException.class,
- () -> new ControlRecord(ControlRecordType.ABORT, new
SnapshotFooterRecord())
- );
- assertThrows(
- IllegalArgumentException.class,
- () -> new ControlRecord(ControlRecordType.LEADER_CHANGE, new
SnapshotHeaderRecord())
- );
- assertThrows(
- IllegalArgumentException.class,
- () -> new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER,
Mockito.mock(ApiMessage.class))
+ () -> ControlRecord.of(Mockito.mock(ApiMessage.class))
);
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index cdc351d3181..b000fbbcd59 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -95,26 +95,22 @@ public class KafkaRaftClientReconfigTest {
List<List<ControlRecord>> expectedBootstrapRecords = List.of(
List.of(
- new ControlRecord(
- ControlRecordType.SNAPSHOT_HEADER,
+ ControlRecord.of(
new SnapshotHeaderRecord()
.setVersion((short) 0)
.setLastContainedLogTimestamp(0)
),
- new ControlRecord(
- ControlRecordType.KRAFT_VERSION,
+ ControlRecord.of(
new KRaftVersionRecord()
.setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION)
.setKRaftVersion((short) 1)
),
- new ControlRecord(
- ControlRecordType.KRAFT_VOTERS,
+ ControlRecord.of(
voters.toVotersRecord(ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION)
)
),
List.of(
- new ControlRecord(
- ControlRecordType.SNAPSHOT_FOOTER,
+ ControlRecord.of(
new SnapshotFooterRecord()
.setVersion((short) 0)
)
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
index a32575e93b2..ee59d918dc3 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
@@ -80,7 +80,7 @@ class RecordsBatchReaderTest {
public void testLeaderChangeControlBatch() {
// Confirm that the RecordsBatchReader is able to iterate over control
batches
MemoryRecords records =
RecordsIteratorTest.buildControlRecords(ControlRecordType.LEADER_CHANGE);
- ControlRecord expectedRecord = new
ControlRecord(ControlRecordType.LEADER_CHANGE, new LeaderChangeMessage());
+ ControlRecord expectedRecord = ControlRecord.of(new
LeaderChangeMessage());
try (RecordsBatchReader<String> reader = RecordsBatchReader.of(
0,
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
index 8016a1e5381..0d0ce4f127c 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
@@ -278,7 +278,7 @@ public final class RecordsIteratorTest {
try (RecordsIterator<String> iterator = createIterator(records,
BufferSupplier.NO_CACHING, true)) {
assertTrue(iterator.hasNext());
assertEquals(
- List.of(new ControlRecord(type, expectedMessage)),
+ List.of(ControlRecord.of(expectedMessage)),
iterator.next().controlRecords()
);
assertFalse(iterator.hasNext());