chia7712 commented on code in PR #16592:
URL: https://github.com/apache/kafka/pull/16592#discussion_r1686673282
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -646,71 +616,70 @@ public Optional<SnapshotFile>
removeAndMarkSnapshotForDeletion(long snapshotOffs
}
public static List<ProducerStateEntry> readSnapshot(File file) throws
IOException {
- try {
- byte[] buffer = Files.readAllBytes(file.toPath());
- Struct struct =
PID_SNAPSHOT_MAP_SCHEMA.read(ByteBuffer.wrap(buffer));
-
- Short version = struct.getShort(VERSION_FIELD);
- if (version != PRODUCER_SNAPSHOT_VERSION)
- throw new CorruptSnapshotException("Snapshot contained an
unknown file version " + version);
-
- long crc = struct.getUnsignedInt(CRC_FIELD);
- long computedCrc = Crc32C.compute(buffer, PRODUCER_ENTRIES_OFFSET,
buffer.length - PRODUCER_ENTRIES_OFFSET);
- if (crc != computedCrc)
- throw new CorruptSnapshotException("Snapshot is corrupt (CRC
is no longer valid). Stored crc: " + crc
- + ". Computed crc: " + computedCrc);
-
- Object[] producerEntryFields =
struct.getArray(PRODUCER_ENTRIES_FIELD);
- List<ProducerStateEntry> entries = new
ArrayList<>(producerEntryFields.length);
- for (Object producerEntryObj : producerEntryFields) {
- Struct producerEntryStruct = (Struct) producerEntryObj;
- long producerId =
producerEntryStruct.getLong(PRODUCER_ID_FIELD);
- short producerEpoch =
producerEntryStruct.getShort(PRODUCER_EPOCH_FIELD);
- int seq = producerEntryStruct.getInt(LAST_SEQUENCE_FIELD);
- long offset = producerEntryStruct.getLong(LAST_OFFSET_FIELD);
- long timestamp = producerEntryStruct.getLong(TIMESTAMP_FIELD);
- int offsetDelta =
producerEntryStruct.getInt(OFFSET_DELTA_FIELD);
- int coordinatorEpoch =
producerEntryStruct.getInt(COORDINATOR_EPOCH_FIELD);
- long currentTxnFirstOffset =
producerEntryStruct.getLong(CURRENT_TXN_FIRST_OFFSET_FIELD);
-
- OptionalLong currentTxnFirstOffsetVal = currentTxnFirstOffset
>= 0 ? OptionalLong.of(currentTxnFirstOffset) : OptionalLong.empty();
- Optional<BatchMetadata> batchMetadata =
- (offset >= 0) ? Optional.of(new BatchMetadata(seq,
offset, offsetDelta, timestamp)) : Optional.empty();
- entries.add(new ProducerStateEntry(producerId, producerEpoch,
coordinatorEpoch, timestamp, currentTxnFirstOffsetVal, batchMetadata));
- }
+ byte[] buffer = Files.readAllBytes(file.toPath());
- return entries;
- } catch (SchemaException e) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
+ short version;
+ ProducerSnapshot producerSnapshot;
+ try {
+ version = byteBuffer.getShort();
+ producerSnapshot = new ProducerSnapshot(new
ByteBufferAccessor(byteBuffer), version);
+ } catch (Exception e) {
throw new CorruptSnapshotException("Snapshot failed schema
validation: " + e.getMessage());
}
+
+ if (version < ProducerSnapshot.LOWEST_SUPPORTED_VERSION || version >
ProducerSnapshot.HIGHEST_SUPPORTED_VERSION)
+ throw new CorruptSnapshotException("Snapshot contained an unknown
file version " + version);
+
+ long crc = producerSnapshot.crc();
+ long computedCrc = Crc32C.compute(buffer, PRODUCER_ENTRIES_OFFSET,
buffer.length - PRODUCER_ENTRIES_OFFSET);
+ if (crc != computedCrc)
+ throw new CorruptSnapshotException("Snapshot is corrupt (CRC is no
longer valid). Stored crc: " + crc
+ + ". Computed crc: " + computedCrc);
+
+ List<ProducerSnapshot.ProducerEntry> producerEntries =
producerSnapshot.producerEntries();
+ List<ProducerStateEntry> entries = new
ArrayList<>(producerEntries.size());
+ for (ProducerSnapshot.ProducerEntry producerEntry : producerEntries) {
+ long producerId = producerEntry.producerId();
+ short producerEpoch = producerEntry.epoch();
+ int lastSequence = producerEntry.lastSequence();
+ long lastOffset = producerEntry.lastOffset();
+ long timestamp = producerEntry.timestamp();
+ int offsetDelta = producerEntry.offsetDelta();
+ int coordinatorEpoch = producerEntry.coordinatorEpoch();
+ long currentTxnFirstOffset = producerEntry.currentTxnFirstOffset();
+
+ OptionalLong currentTxnFirstOffsetVal = currentTxnFirstOffset >= 0
? OptionalLong.of(currentTxnFirstOffset) : OptionalLong.empty();
+ Optional<BatchMetadata> batchMetadata =
+ (lastOffset >= 0) ? Optional.of(new
BatchMetadata(lastSequence, lastOffset, offsetDelta, timestamp)) :
Optional.empty();
+ entries.add(new ProducerStateEntry(producerId, producerEpoch,
coordinatorEpoch, timestamp, currentTxnFirstOffsetVal, batchMetadata));
+ }
+
+ return entries;
}
// visible for testing
public static void writeSnapshot(File file, Map<Long, ProducerStateEntry>
entries, boolean sync) throws IOException {
- Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA);
- struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION);
- struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries
- Struct[] structEntries = new Struct[entries.size()];
- int i = 0;
+ ProducerSnapshot producerSnapshot = new ProducerSnapshot();
+ producerSnapshot.setCrc(0L); // we'll fill this after writing the
entries
Review Comment:
The previous impl is writing data into `ByteBuffer`, so it has to put a
temporary data as placeholder. We don't need this workaround now, as we are
using our powerful serialization, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]