This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 707a44a6cb7 KAFKA-19068 Eliminate the duplicate type check in creating 
ControlRecord (#19346)
707a44a6cb7 is described below

commit 707a44a6cb700e4c029a8a386dc1edde1c5c7253
Author: Nick Guo <[email protected]>
AuthorDate: Sun May 11 00:07:00 2025 +0800

    KAFKA-19068 Eliminate the duplicate type check in creating ControlRecord 
(#19346)
    
    jira: https://issues.apache.org/jira/browse/KAFKA-19068
    
    `RecordsIterator#decodeControlRecord` do the type check and then
    `ControlRecord` constructor does that again.
    
    we should add a static method to ControlRecord to create `ControlRecord`
    with type check, and then `ControlRecord` constructor should be changed
    to private to ensure all instance is created by the static method.
    
    Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai
    <[email protected]>
---
 .../kafka/image/loader/MetadataLoaderTest.java     |  7 +--
 .../java/org/apache/kafka/raft/ControlRecord.java  | 69 ++++++++++------------
 .../kafka/raft/internals/RecordsIterator.java      | 16 +----
 .../org/apache/kafka/raft/ControlRecordTest.java   | 20 ++-----
 .../kafka/raft/KafkaRaftClientReconfigTest.java    | 12 ++--
 .../raft/internals/RecordsBatchReaderTest.java     |  2 +-
 .../kafka/raft/internals/RecordsIteratorTest.java  |  2 +-
 7 files changed, 48 insertions(+), 80 deletions(-)

diff --git 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index beaeefdc38c..31960252bd8 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
-import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
@@ -253,7 +252,7 @@ public class MetadataLoaderTest {
                             100,
                             4000,
                             10,
-                            List.of(new 
ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
+                            List.of(ControlRecord.of(new 
SnapshotHeaderRecord()))
                         ),
                         Batch.data(0, 0, 0, 0,
                             List.of(new ApiMessageAndVersion(new 
FeatureLevelRecord().
@@ -386,7 +385,7 @@ public class MetadataLoaderTest {
                     100,
                     4000,
                     10,
-                    List.of(new 
ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
+                    List.of(ControlRecord.of(new SnapshotHeaderRecord()))
                 )
             )
         );
@@ -485,7 +484,7 @@ public class MetadataLoaderTest {
                         100,
                         4000,
                         10,
-                        List.of(new 
ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
+                        List.of(ControlRecord.of(new SnapshotHeaderRecord()))
                     )
                 )
             ).setTime(time);
diff --git a/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java 
b/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java
index 9588f3c1138..16a0a95ffa7 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java
@@ -23,54 +23,49 @@ import org.apache.kafka.common.message.SnapshotHeaderRecord;
 import org.apache.kafka.common.message.VotersRecord;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.ControlRecordUtils;
 
+import java.nio.ByteBuffer;
 import java.util.Objects;
 
+
 public final class ControlRecord {
     private final ControlRecordType recordType;
     private final ApiMessage message;
 
-    private static void throwIllegalArgument(ControlRecordType recordType, 
ApiMessage message) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Record type %s doesn't match message class %s",
-                recordType,
-                message.getClass()
-            )
-        );
+    public static ControlRecord of(ByteBuffer key, ByteBuffer value) {
+        ControlRecordType recordType = ControlRecordType.parse(key);
+        final ApiMessage message = switch (recordType) {
+            case LEADER_CHANGE -> 
ControlRecordUtils.deserializeLeaderChangeMessage(value);
+            case SNAPSHOT_HEADER -> 
ControlRecordUtils.deserializeSnapshotHeaderRecord(value);
+            case SNAPSHOT_FOOTER -> 
ControlRecordUtils.deserializeSnapshotFooterRecord(value);
+            case KRAFT_VERSION -> 
ControlRecordUtils.deserializeKRaftVersionRecord(value);
+            case KRAFT_VOTERS -> 
ControlRecordUtils.deserializeVotersRecord(value);
+            default -> throw new 
IllegalArgumentException(String.format("Unknown control record type %s", 
recordType));
+        };
+        return new ControlRecord(recordType, message);
     }
 
-    public ControlRecord(ControlRecordType recordType, ApiMessage message) {
-        switch (recordType) {
-            case LEADER_CHANGE:
-                if (!(message instanceof LeaderChangeMessage)) {
-                    throwIllegalArgument(recordType, message);
-                }
-                break;
-            case SNAPSHOT_HEADER:
-                if (!(message instanceof SnapshotHeaderRecord)) {
-                    throwIllegalArgument(recordType, message);
-                }
-                break;
-            case SNAPSHOT_FOOTER:
-                if (!(message instanceof SnapshotFooterRecord)) {
-                    throwIllegalArgument(recordType, message);
-                }
-                break;
-            case KRAFT_VERSION:
-                if (!(message instanceof KRaftVersionRecord)) {
-                    throwIllegalArgument(recordType, message);
-                }
-                break;
-            case KRAFT_VOTERS:
-                if (!(message instanceof VotersRecord)) {
-                    throwIllegalArgument(recordType, message);
-                }
-                break;
-            default:
-                throw new IllegalArgumentException(String.format("Unknown 
control record type %s", recordType));
+    //this is for test only
+    public static ControlRecord of(ApiMessage message) {
+        ControlRecordType recordType;
+        if (message instanceof LeaderChangeMessage) {
+            recordType = ControlRecordType.LEADER_CHANGE;
+        } else if (message instanceof SnapshotHeaderRecord) {
+            recordType = ControlRecordType.SNAPSHOT_HEADER;
+        } else if (message instanceof SnapshotFooterRecord) {
+            recordType = ControlRecordType.SNAPSHOT_FOOTER;
+        } else if (message instanceof KRaftVersionRecord) {
+            recordType = ControlRecordType.KRAFT_VERSION;
+        } else if (message instanceof VotersRecord) {
+            recordType = ControlRecordType.KRAFT_VOTERS;
+        } else {
+            throw new IllegalArgumentException(String.format("Unknown control 
record type %s", message.getClass()));
         }
+        return new ControlRecord(recordType, message);
+    }
 
+    private ControlRecord(ControlRecordType recordType, ApiMessage message) {
         this.recordType = recordType;
         this.message = message;
     }
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
index b912d748bca..b1e711bf5e4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
@@ -16,10 +16,7 @@
  */
 package org.apache.kafka.raft.internals;
 
-import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
-import org.apache.kafka.common.record.ControlRecordType;
-import org.apache.kafka.common.record.ControlRecordUtils;
 import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -370,17 +367,6 @@ public final class RecordsIterator<T> implements 
Iterator<Batch<T>>, AutoCloseab
             throw new IllegalArgumentException("Got an unexpected empty value 
in the record");
         }
 
-        ControlRecordType type = ControlRecordType.parse(key.get());
-
-        final ApiMessage message = switch (type) {
-            case LEADER_CHANGE -> 
ControlRecordUtils.deserializeLeaderChangeMessage(value.get());
-            case SNAPSHOT_HEADER -> 
ControlRecordUtils.deserializeSnapshotHeaderRecord(value.get());
-            case SNAPSHOT_FOOTER -> 
ControlRecordUtils.deserializeSnapshotFooterRecord(value.get());
-            case KRAFT_VERSION -> 
ControlRecordUtils.deserializeKRaftVersionRecord(value.get());
-            case KRAFT_VOTERS -> 
ControlRecordUtils.deserializeVotersRecord(value.get());
-            default -> throw new 
IllegalArgumentException(String.format("Unknown control record type %s", type));
-        };
-
-        return new ControlRecord(type, message);
+        return ControlRecord.of(key.get(), value.get());
     }
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/ControlRecordTest.java 
b/raft/src/test/java/org/apache/kafka/raft/ControlRecordTest.java
index fed7bf21664..b6416c69be7 100644
--- a/raft/src/test/java/org/apache/kafka/raft/ControlRecordTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/ControlRecordTest.java
@@ -34,24 +34,16 @@ public final class ControlRecordTest {
     @Test
     void testCtr() {
         // Valid constructions
-        new ControlRecord(ControlRecordType.LEADER_CHANGE, new 
LeaderChangeMessage());
-        new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new 
SnapshotHeaderRecord());
-        new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, new 
SnapshotFooterRecord());
-        new ControlRecord(ControlRecordType.KRAFT_VERSION, new 
KRaftVersionRecord());
-        new ControlRecord(ControlRecordType.KRAFT_VOTERS, new VotersRecord());
+        ControlRecord.of(new LeaderChangeMessage());
+        ControlRecord.of(new SnapshotHeaderRecord());
+        ControlRecord.of(new SnapshotFooterRecord());
+        ControlRecord.of(new KRaftVersionRecord());
+        ControlRecord.of(new VotersRecord());
 
         // Invalid constructions
         assertThrows(
             IllegalArgumentException.class,
-            () -> new ControlRecord(ControlRecordType.ABORT, new 
SnapshotFooterRecord())
-        );
-        assertThrows(
-            IllegalArgumentException.class,
-            () -> new ControlRecord(ControlRecordType.LEADER_CHANGE, new 
SnapshotHeaderRecord())
-        );
-        assertThrows(
-            IllegalArgumentException.class,
-            () -> new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, 
Mockito.mock(ApiMessage.class))
+            () -> ControlRecord.of(Mockito.mock(ApiMessage.class))
         );
     }
 
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index cdc351d3181..b000fbbcd59 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -95,26 +95,22 @@ public class KafkaRaftClientReconfigTest {
 
         List<List<ControlRecord>> expectedBootstrapRecords = List.of(
             List.of(
-                new ControlRecord(
-                    ControlRecordType.SNAPSHOT_HEADER,
+                ControlRecord.of(
                     new SnapshotHeaderRecord()
                         .setVersion((short) 0)
                         .setLastContainedLogTimestamp(0)
                 ),
-                new ControlRecord(
-                    ControlRecordType.KRAFT_VERSION,
+                ControlRecord.of(
                     new KRaftVersionRecord()
                         
.setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION)
                         .setKRaftVersion((short) 1)
                 ),
-                new ControlRecord(
-                    ControlRecordType.KRAFT_VOTERS,
+                ControlRecord.of(
                     
voters.toVotersRecord(ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION)
                 )
             ),
             List.of(
-                new ControlRecord(
-                    ControlRecordType.SNAPSHOT_FOOTER,
+                ControlRecord.of(
                     new SnapshotFooterRecord()
                         .setVersion((short) 0)
                 )
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
 
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
index a32575e93b2..ee59d918dc3 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
@@ -80,7 +80,7 @@ class RecordsBatchReaderTest {
     public void testLeaderChangeControlBatch() {
         // Confirm that the RecordsBatchReader is able to iterate over control 
batches
         MemoryRecords records = 
RecordsIteratorTest.buildControlRecords(ControlRecordType.LEADER_CHANGE);
-        ControlRecord expectedRecord = new 
ControlRecord(ControlRecordType.LEADER_CHANGE, new LeaderChangeMessage());
+        ControlRecord expectedRecord = ControlRecord.of(new 
LeaderChangeMessage());
 
         try (RecordsBatchReader<String> reader = RecordsBatchReader.of(
                 0,
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
index 8016a1e5381..0d0ce4f127c 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
@@ -278,7 +278,7 @@ public final class RecordsIteratorTest {
         try (RecordsIterator<String> iterator = createIterator(records, 
BufferSupplier.NO_CACHING, true)) {
             assertTrue(iterator.hasNext());
             assertEquals(
-                List.of(new ControlRecord(type, expectedMessage)),
+                List.of(ControlRecord.of(expectedMessage)),
                 iterator.next().controlRecords()
             );
             assertFalse(iterator.hasNext());

Reply via email to