chia7712 commented on code in PR #16592:
URL: https://github.com/apache/kafka/pull/16592#discussion_r1686623025


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -646,71 +616,70 @@ public Optional<SnapshotFile> 
removeAndMarkSnapshotForDeletion(long snapshotOffs
     }
 
     public static List<ProducerStateEntry> readSnapshot(File file) throws 
IOException {
-        try {
-            byte[] buffer = Files.readAllBytes(file.toPath());
-            Struct struct = 
PID_SNAPSHOT_MAP_SCHEMA.read(ByteBuffer.wrap(buffer));
-
-            Short version = struct.getShort(VERSION_FIELD);
-            if (version != PRODUCER_SNAPSHOT_VERSION)
-                throw new CorruptSnapshotException("Snapshot contained an 
unknown file version " + version);
-
-            long crc = struct.getUnsignedInt(CRC_FIELD);
-            long computedCrc = Crc32C.compute(buffer, PRODUCER_ENTRIES_OFFSET, 
buffer.length - PRODUCER_ENTRIES_OFFSET);
-            if (crc != computedCrc)
-                throw new CorruptSnapshotException("Snapshot is corrupt (CRC 
is no longer valid). Stored crc: " + crc
-                        + ". Computed crc: " + computedCrc);
-
-            Object[] producerEntryFields = 
struct.getArray(PRODUCER_ENTRIES_FIELD);
-            List<ProducerStateEntry> entries = new 
ArrayList<>(producerEntryFields.length);
-            for (Object producerEntryObj : producerEntryFields) {
-                Struct producerEntryStruct = (Struct) producerEntryObj;
-                long producerId = 
producerEntryStruct.getLong(PRODUCER_ID_FIELD);
-                short producerEpoch = 
producerEntryStruct.getShort(PRODUCER_EPOCH_FIELD);
-                int seq = producerEntryStruct.getInt(LAST_SEQUENCE_FIELD);
-                long offset = producerEntryStruct.getLong(LAST_OFFSET_FIELD);
-                long timestamp = producerEntryStruct.getLong(TIMESTAMP_FIELD);
-                int offsetDelta = 
producerEntryStruct.getInt(OFFSET_DELTA_FIELD);
-                int coordinatorEpoch = 
producerEntryStruct.getInt(COORDINATOR_EPOCH_FIELD);
-                long currentTxnFirstOffset = 
producerEntryStruct.getLong(CURRENT_TXN_FIRST_OFFSET_FIELD);
-
-                OptionalLong currentTxnFirstOffsetVal = currentTxnFirstOffset 
>= 0 ? OptionalLong.of(currentTxnFirstOffset) : OptionalLong.empty();
-                Optional<BatchMetadata> batchMetadata =
-                        (offset >= 0) ? Optional.of(new BatchMetadata(seq, 
offset, offsetDelta, timestamp)) : Optional.empty();
-                entries.add(new ProducerStateEntry(producerId, producerEpoch, 
coordinatorEpoch, timestamp, currentTxnFirstOffsetVal, batchMetadata));
-            }
+        byte[] buffer = Files.readAllBytes(file.toPath());
 
-            return entries;
-        } catch (SchemaException e) {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
+        short version;
+        ProducerSnapshot producerSnapshot;
+        try {
+            version = byteBuffer.getShort();
+            producerSnapshot = new ProducerSnapshot(new 
ByteBufferAccessor(byteBuffer), version);
+        } catch (Exception e) {
             throw new CorruptSnapshotException("Snapshot failed schema 
validation: " + e.getMessage());
         }
+
+        if (version < ProducerSnapshot.LOWEST_SUPPORTED_VERSION || version > 
ProducerSnapshot.HIGHEST_SUPPORTED_VERSION)

Review Comment:
   We should check the version before deserializing



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -646,71 +616,70 @@ public Optional<SnapshotFile> 
removeAndMarkSnapshotForDeletion(long snapshotOffs
     }
 
     public static List<ProducerStateEntry> readSnapshot(File file) throws 
IOException {
-        try {
-            byte[] buffer = Files.readAllBytes(file.toPath());
-            Struct struct = 
PID_SNAPSHOT_MAP_SCHEMA.read(ByteBuffer.wrap(buffer));
-
-            Short version = struct.getShort(VERSION_FIELD);
-            if (version != PRODUCER_SNAPSHOT_VERSION)
-                throw new CorruptSnapshotException("Snapshot contained an 
unknown file version " + version);
-
-            long crc = struct.getUnsignedInt(CRC_FIELD);
-            long computedCrc = Crc32C.compute(buffer, PRODUCER_ENTRIES_OFFSET, 
buffer.length - PRODUCER_ENTRIES_OFFSET);
-            if (crc != computedCrc)
-                throw new CorruptSnapshotException("Snapshot is corrupt (CRC 
is no longer valid). Stored crc: " + crc
-                        + ". Computed crc: " + computedCrc);
-
-            Object[] producerEntryFields = 
struct.getArray(PRODUCER_ENTRIES_FIELD);
-            List<ProducerStateEntry> entries = new 
ArrayList<>(producerEntryFields.length);
-            for (Object producerEntryObj : producerEntryFields) {
-                Struct producerEntryStruct = (Struct) producerEntryObj;
-                long producerId = 
producerEntryStruct.getLong(PRODUCER_ID_FIELD);
-                short producerEpoch = 
producerEntryStruct.getShort(PRODUCER_EPOCH_FIELD);
-                int seq = producerEntryStruct.getInt(LAST_SEQUENCE_FIELD);
-                long offset = producerEntryStruct.getLong(LAST_OFFSET_FIELD);
-                long timestamp = producerEntryStruct.getLong(TIMESTAMP_FIELD);
-                int offsetDelta = 
producerEntryStruct.getInt(OFFSET_DELTA_FIELD);
-                int coordinatorEpoch = 
producerEntryStruct.getInt(COORDINATOR_EPOCH_FIELD);
-                long currentTxnFirstOffset = 
producerEntryStruct.getLong(CURRENT_TXN_FIRST_OFFSET_FIELD);
-
-                OptionalLong currentTxnFirstOffsetVal = currentTxnFirstOffset 
>= 0 ? OptionalLong.of(currentTxnFirstOffset) : OptionalLong.empty();
-                Optional<BatchMetadata> batchMetadata =
-                        (offset >= 0) ? Optional.of(new BatchMetadata(seq, 
offset, offsetDelta, timestamp)) : Optional.empty();
-                entries.add(new ProducerStateEntry(producerId, producerEpoch, 
coordinatorEpoch, timestamp, currentTxnFirstOffsetVal, batchMetadata));
-            }
+        byte[] buffer = Files.readAllBytes(file.toPath());
 
-            return entries;
-        } catch (SchemaException e) {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
+        short version;
+        ProducerSnapshot producerSnapshot;
+        try {
+            version = byteBuffer.getShort();
+            producerSnapshot = new ProducerSnapshot(new 
ByteBufferAccessor(byteBuffer), version);
+        } catch (Exception e) {
             throw new CorruptSnapshotException("Snapshot failed schema 
validation: " + e.getMessage());
         }
+
+        if (version < ProducerSnapshot.LOWEST_SUPPORTED_VERSION || version > 
ProducerSnapshot.HIGHEST_SUPPORTED_VERSION)
+            throw new CorruptSnapshotException("Snapshot contained an unknown 
file version " + version);
+
+        long crc = producerSnapshot.crc();
+        long computedCrc = Crc32C.compute(buffer, PRODUCER_ENTRIES_OFFSET, 
buffer.length - PRODUCER_ENTRIES_OFFSET);
+        if (crc != computedCrc)
+            throw new CorruptSnapshotException("Snapshot is corrupt (CRC is no 
longer valid). Stored crc: " + crc
+                    + ". Computed crc: " + computedCrc);
+
+        List<ProducerSnapshot.ProducerEntry> producerEntries = 
producerSnapshot.producerEntries();
+        List<ProducerStateEntry> entries = new 
ArrayList<>(producerEntries.size());
+        for (ProducerSnapshot.ProducerEntry producerEntry : producerEntries) {
+            long producerId = producerEntry.producerId();
+            short producerEpoch = producerEntry.epoch();
+            int lastSequence = producerEntry.lastSequence();
+            long lastOffset = producerEntry.lastOffset();
+            long timestamp = producerEntry.timestamp();
+            int offsetDelta = producerEntry.offsetDelta();
+            int coordinatorEpoch = producerEntry.coordinatorEpoch();
+            long currentTxnFirstOffset = producerEntry.currentTxnFirstOffset();
+
+            OptionalLong currentTxnFirstOffsetVal = currentTxnFirstOffset >= 0 
? OptionalLong.of(currentTxnFirstOffset) : OptionalLong.empty();
+            Optional<BatchMetadata> batchMetadata =
+                    (lastOffset >= 0) ? Optional.of(new 
BatchMetadata(lastSequence, lastOffset, offsetDelta, timestamp)) : 
Optional.empty();
+            entries.add(new ProducerStateEntry(producerId, producerEpoch, 
coordinatorEpoch, timestamp, currentTxnFirstOffsetVal, batchMetadata));
+        }
+
+        return entries;
     }
 
     // visible for testing
     public static void writeSnapshot(File file, Map<Long, ProducerStateEntry> 
entries, boolean sync) throws IOException {
-        Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA);
-        struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION);
-        struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries
-        Struct[] structEntries = new Struct[entries.size()];
-        int i = 0;
+        ProducerSnapshot producerSnapshot = new ProducerSnapshot();
+        producerSnapshot.setCrc(0L); // we'll fill this after writing the 
entries
+        List<ProducerSnapshot.ProducerEntry> producerEntries = new 
ArrayList<>(entries.size());
         for (Map.Entry<Long, ProducerStateEntry> producerIdEntry : 
entries.entrySet()) {
             Long producerId = producerIdEntry.getKey();
             ProducerStateEntry entry = producerIdEntry.getValue();
-            Struct producerEntryStruct = 
struct.instance(PRODUCER_ENTRIES_FIELD);
-            producerEntryStruct.set(PRODUCER_ID_FIELD, producerId)
-                    .set(PRODUCER_EPOCH_FIELD, entry.producerEpoch())
-                    .set(LAST_SEQUENCE_FIELD, entry.lastSeq())
-                    .set(LAST_OFFSET_FIELD, entry.lastDataOffset())
-                    .set(OFFSET_DELTA_FIELD, entry.lastOffsetDelta())
-                    .set(TIMESTAMP_FIELD, entry.lastTimestamp())
-                    .set(COORDINATOR_EPOCH_FIELD, entry.coordinatorEpoch())
-                    .set(CURRENT_TXN_FIRST_OFFSET_FIELD, 
entry.currentTxnFirstOffset().orElse(-1L));
-            structEntries[i++] = producerEntryStruct;
+            ProducerSnapshot.ProducerEntry producerEntry = new 
ProducerSnapshot.ProducerEntry();
+            producerEntry.setProducerId(producerId);
+            producerEntry.setEpoch(entry.producerEpoch());
+            producerEntry.setLastSequence(entry.lastSeq());
+            producerEntry.setLastOffset(entry.lastDataOffset());
+            producerEntry.setOffsetDelta(entry.lastOffsetDelta());
+            producerEntry.setTimestamp(entry.lastTimestamp());
+            producerEntry.setCoordinatorEpoch(entry.coordinatorEpoch());
+            
producerEntry.setCurrentTxnFirstOffset(entry.currentTxnFirstOffset().orElse(-1L));
+            producerEntries.add(producerEntry);

Review Comment:
   Could you please use fluent pattern?
   ```java
               producerEntries.add(new ProducerSnapshot.ProducerEntry()
                   .setProducerId(producerIdEntry.getKey())
                   .setEpoch(entry.producerEpoch()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to