brandboat commented on code in PR #16592:
URL: https://github.com/apache/kafka/pull/16592#discussion_r1686801070


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -646,71 +616,70 @@ public Optional<SnapshotFile> 
removeAndMarkSnapshotForDeletion(long snapshotOffs
     }
 
     public static List<ProducerStateEntry> readSnapshot(File file) throws 
IOException {
-        try {
-            byte[] buffer = Files.readAllBytes(file.toPath());
-            Struct struct = 
PID_SNAPSHOT_MAP_SCHEMA.read(ByteBuffer.wrap(buffer));
-
-            Short version = struct.getShort(VERSION_FIELD);
-            if (version != PRODUCER_SNAPSHOT_VERSION)
-                throw new CorruptSnapshotException("Snapshot contained an 
unknown file version " + version);
-
-            long crc = struct.getUnsignedInt(CRC_FIELD);
-            long computedCrc = Crc32C.compute(buffer, PRODUCER_ENTRIES_OFFSET, 
buffer.length - PRODUCER_ENTRIES_OFFSET);
-            if (crc != computedCrc)
-                throw new CorruptSnapshotException("Snapshot is corrupt (CRC 
is no longer valid). Stored crc: " + crc
-                        + ". Computed crc: " + computedCrc);
-
-            Object[] producerEntryFields = 
struct.getArray(PRODUCER_ENTRIES_FIELD);
-            List<ProducerStateEntry> entries = new 
ArrayList<>(producerEntryFields.length);
-            for (Object producerEntryObj : producerEntryFields) {
-                Struct producerEntryStruct = (Struct) producerEntryObj;
-                long producerId = 
producerEntryStruct.getLong(PRODUCER_ID_FIELD);
-                short producerEpoch = 
producerEntryStruct.getShort(PRODUCER_EPOCH_FIELD);
-                int seq = producerEntryStruct.getInt(LAST_SEQUENCE_FIELD);
-                long offset = producerEntryStruct.getLong(LAST_OFFSET_FIELD);
-                long timestamp = producerEntryStruct.getLong(TIMESTAMP_FIELD);
-                int offsetDelta = 
producerEntryStruct.getInt(OFFSET_DELTA_FIELD);
-                int coordinatorEpoch = 
producerEntryStruct.getInt(COORDINATOR_EPOCH_FIELD);
-                long currentTxnFirstOffset = 
producerEntryStruct.getLong(CURRENT_TXN_FIRST_OFFSET_FIELD);
-
-                OptionalLong currentTxnFirstOffsetVal = currentTxnFirstOffset 
>= 0 ? OptionalLong.of(currentTxnFirstOffset) : OptionalLong.empty();
-                Optional<BatchMetadata> batchMetadata =
-                        (offset >= 0) ? Optional.of(new BatchMetadata(seq, 
offset, offsetDelta, timestamp)) : Optional.empty();
-                entries.add(new ProducerStateEntry(producerId, producerEpoch, 
coordinatorEpoch, timestamp, currentTxnFirstOffsetVal, batchMetadata));
-            }
+        byte[] buffer = Files.readAllBytes(file.toPath());
 
-            return entries;
-        } catch (SchemaException e) {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
+        short version;
+        ProducerSnapshot producerSnapshot;
+        try {
+            version = byteBuffer.getShort();
+            producerSnapshot = new ProducerSnapshot(new 
ByteBufferAccessor(byteBuffer), version);
+        } catch (Exception e) {
             throw new CorruptSnapshotException("Snapshot failed schema 
validation: " + e.getMessage());
         }
+
+        if (version < ProducerSnapshot.LOWEST_SUPPORTED_VERSION || version > 
ProducerSnapshot.HIGHEST_SUPPORTED_VERSION)
+            throw new CorruptSnapshotException("Snapshot contained an unknown 
file version " + version);
+
+        long crc = producerSnapshot.crc();
+        long computedCrc = Crc32C.compute(buffer, PRODUCER_ENTRIES_OFFSET, 
buffer.length - PRODUCER_ENTRIES_OFFSET);
+        if (crc != computedCrc)
+            throw new CorruptSnapshotException("Snapshot is corrupt (CRC is no 
longer valid). Stored crc: " + crc
+                    + ". Computed crc: " + computedCrc);
+
+        List<ProducerSnapshot.ProducerEntry> producerEntries = 
producerSnapshot.producerEntries();
+        List<ProducerStateEntry> entries = new 
ArrayList<>(producerEntries.size());
+        for (ProducerSnapshot.ProducerEntry producerEntry : producerEntries) {
+            long producerId = producerEntry.producerId();
+            short producerEpoch = producerEntry.epoch();
+            int lastSequence = producerEntry.lastSequence();
+            long lastOffset = producerEntry.lastOffset();
+            long timestamp = producerEntry.timestamp();
+            int offsetDelta = producerEntry.offsetDelta();
+            int coordinatorEpoch = producerEntry.coordinatorEpoch();
+            long currentTxnFirstOffset = producerEntry.currentTxnFirstOffset();
+
+            OptionalLong currentTxnFirstOffsetVal = currentTxnFirstOffset >= 0 
? OptionalLong.of(currentTxnFirstOffset) : OptionalLong.empty();
+            Optional<BatchMetadata> batchMetadata =
+                    (lastOffset >= 0) ? Optional.of(new 
BatchMetadata(lastSequence, lastOffset, offsetDelta, timestamp)) : 
Optional.empty();
+            entries.add(new ProducerStateEntry(producerId, producerEpoch, 
coordinatorEpoch, timestamp, currentTxnFirstOffsetVal, batchMetadata));
+        }
+
+        return entries;
     }
 
     // visible for testing
     public static void writeSnapshot(File file, Map<Long, ProducerStateEntry> 
entries, boolean sync) throws IOException {
-        Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA);
-        struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION);
-        struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries
-        Struct[] structEntries = new Struct[entries.size()];
-        int i = 0;
+        ProducerSnapshot producerSnapshot = new ProducerSnapshot();
+        producerSnapshot.setCrc(0L); // we'll fill this after writing the 
entries

Review Comment:
   Sorry I look too quick... I believe you mean there is no need to write the 
0L here. We can directly write the crc32 value in setter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to