brandboat commented on code in PR #16592: URL: https://github.com/apache/kafka/pull/16592#discussion_r1686801070
########## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ########## @@ -646,71 +616,70 @@ public Optional<SnapshotFile> removeAndMarkSnapshotForDeletion(long snapshotOffs } public static List<ProducerStateEntry> readSnapshot(File file) throws IOException { - try { - byte[] buffer = Files.readAllBytes(file.toPath()); - Struct struct = PID_SNAPSHOT_MAP_SCHEMA.read(ByteBuffer.wrap(buffer)); - - Short version = struct.getShort(VERSION_FIELD); - if (version != PRODUCER_SNAPSHOT_VERSION) - throw new CorruptSnapshotException("Snapshot contained an unknown file version " + version); - - long crc = struct.getUnsignedInt(CRC_FIELD); - long computedCrc = Crc32C.compute(buffer, PRODUCER_ENTRIES_OFFSET, buffer.length - PRODUCER_ENTRIES_OFFSET); - if (crc != computedCrc) - throw new CorruptSnapshotException("Snapshot is corrupt (CRC is no longer valid). Stored crc: " + crc - + ". Computed crc: " + computedCrc); - - Object[] producerEntryFields = struct.getArray(PRODUCER_ENTRIES_FIELD); - List<ProducerStateEntry> entries = new ArrayList<>(producerEntryFields.length); - for (Object producerEntryObj : producerEntryFields) { - Struct producerEntryStruct = (Struct) producerEntryObj; - long producerId = producerEntryStruct.getLong(PRODUCER_ID_FIELD); - short producerEpoch = producerEntryStruct.getShort(PRODUCER_EPOCH_FIELD); - int seq = producerEntryStruct.getInt(LAST_SEQUENCE_FIELD); - long offset = producerEntryStruct.getLong(LAST_OFFSET_FIELD); - long timestamp = producerEntryStruct.getLong(TIMESTAMP_FIELD); - int offsetDelta = producerEntryStruct.getInt(OFFSET_DELTA_FIELD); - int coordinatorEpoch = producerEntryStruct.getInt(COORDINATOR_EPOCH_FIELD); - long currentTxnFirstOffset = producerEntryStruct.getLong(CURRENT_TXN_FIRST_OFFSET_FIELD); - - OptionalLong currentTxnFirstOffsetVal = currentTxnFirstOffset >= 0 ? OptionalLong.of(currentTxnFirstOffset) : OptionalLong.empty(); - Optional<BatchMetadata> batchMetadata = - (offset >= 0) ? Optional.of(new BatchMetadata(seq, offset, offsetDelta, timestamp)) : Optional.empty(); - entries.add(new ProducerStateEntry(producerId, producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetVal, batchMetadata)); - } + byte[] buffer = Files.readAllBytes(file.toPath()); - return entries; - } catch (SchemaException e) { + ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); + short version; + ProducerSnapshot producerSnapshot; + try { + version = byteBuffer.getShort(); + producerSnapshot = new ProducerSnapshot(new ByteBufferAccessor(byteBuffer), version); + } catch (Exception e) { throw new CorruptSnapshotException("Snapshot failed schema validation: " + e.getMessage()); } + + if (version < ProducerSnapshot.LOWEST_SUPPORTED_VERSION || version > ProducerSnapshot.HIGHEST_SUPPORTED_VERSION) + throw new CorruptSnapshotException("Snapshot contained an unknown file version " + version); + + long crc = producerSnapshot.crc(); + long computedCrc = Crc32C.compute(buffer, PRODUCER_ENTRIES_OFFSET, buffer.length - PRODUCER_ENTRIES_OFFSET); + if (crc != computedCrc) + throw new CorruptSnapshotException("Snapshot is corrupt (CRC is no longer valid). Stored crc: " + crc + + ". Computed crc: " + computedCrc); + + List<ProducerSnapshot.ProducerEntry> producerEntries = producerSnapshot.producerEntries(); + List<ProducerStateEntry> entries = new ArrayList<>(producerEntries.size()); + for (ProducerSnapshot.ProducerEntry producerEntry : producerEntries) { + long producerId = producerEntry.producerId(); + short producerEpoch = producerEntry.epoch(); + int lastSequence = producerEntry.lastSequence(); + long lastOffset = producerEntry.lastOffset(); + long timestamp = producerEntry.timestamp(); + int offsetDelta = producerEntry.offsetDelta(); + int coordinatorEpoch = producerEntry.coordinatorEpoch(); + long currentTxnFirstOffset = producerEntry.currentTxnFirstOffset(); + + OptionalLong currentTxnFirstOffsetVal = currentTxnFirstOffset >= 0 ? OptionalLong.of(currentTxnFirstOffset) : OptionalLong.empty(); + Optional<BatchMetadata> batchMetadata = + (lastOffset >= 0) ? Optional.of(new BatchMetadata(lastSequence, lastOffset, offsetDelta, timestamp)) : Optional.empty(); + entries.add(new ProducerStateEntry(producerId, producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetVal, batchMetadata)); + } + + return entries; } // visible for testing public static void writeSnapshot(File file, Map<Long, ProducerStateEntry> entries, boolean sync) throws IOException { - Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA); - struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION); - struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries - Struct[] structEntries = new Struct[entries.size()]; - int i = 0; + ProducerSnapshot producerSnapshot = new ProducerSnapshot(); + producerSnapshot.setCrc(0L); // we'll fill this after writing the entries Review Comment: Sorry I look too quick... I believe you mean there is no need to write the 0L here. We can directly write the crc32 value in setter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org