kafka git commit: KAFKA-5747; Producer snapshot loading should cover schema errors
Repository: kafka Updated Branches: refs/heads/0.11.0 fbd41d64d -> b4f3345b5 KAFKA-5747; Producer snapshot loading should cover schema errors Author: Jason GustafsonReviewers: Apurva Mehta , Ismael Juma Closes #3688 from hachikuji/KAFKA-5747 (cherry picked from commit bf3bfd6749610eb49a4e47bdda2e667ca7458ff2) Signed-off-by: Jason Gustafson Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b4f3345b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b4f3345b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b4f3345b Branch: refs/heads/0.11.0 Commit: b4f3345b5188c134cdf09cb067053b44976b2b0f Parents: fbd41d6 Author: Jason Gustafson Authored: Mon Aug 21 10:22:53 2017 -0700 Committer: Jason Gustafson Committed: Mon Aug 21 10:23:41 2017 -0700 -- .../scala/kafka/log/ProducerStateManager.scala | 61 +++- .../kafka/log/ProducerStateManagerTest.scala| 58 +++ 2 files changed, 91 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/b4f3345b/core/src/main/scala/kafka/log/ProducerStateManager.scala -- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 7cc8e8e..7a1962a 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -285,32 +285,37 @@ object ProducerStateManager { new Field(ProducerEntriesField, new ArrayOf(ProducerSnapshotEntrySchema), "The entries in the producer table")) def readSnapshot(file: File): Iterable[ProducerIdEntry] = { -val buffer = Files.readAllBytes(file.toPath) -val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer)) - -val version = struct.getShort(VersionField) -if (version != ProducerSnapshotVersion) - throw new IllegalArgumentException(s"Unhandled snapshot file version $version") - -val crc = struct.getUnsignedInt(CrcField) -val computedCrc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset) -if (crc != computedCrc) - throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). " + -s"Stored crc: $crc. Computed crc: $computedCrc") - -struct.getArray(ProducerEntriesField).map { producerEntryObj => - val producerEntryStruct = producerEntryObj.asInstanceOf[Struct] - val producerId: Long = producerEntryStruct.getLong(ProducerIdField) - val producerEpoch = producerEntryStruct.getShort(ProducerEpochField) - val seq = producerEntryStruct.getInt(LastSequenceField) - val offset = producerEntryStruct.getLong(LastOffsetField) - val timestamp = producerEntryStruct.getLong(TimestampField) - val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) - val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) - val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) - val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, offsetDelta, timestamp, -coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None) - newEntry +try { + val buffer = Files.readAllBytes(file.toPath) + val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer)) + + val version = struct.getShort(VersionField) + if (version != ProducerSnapshotVersion) +throw new CorruptSnapshotException(s"Snapshot contained an unknown file version $version") + + val crc = struct.getUnsignedInt(CrcField) + val computedCrc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset) + if (crc != computedCrc) +throw new CorruptSnapshotException(s"Snapshot is corrupt (CRC is no longer valid). " + + s"Stored crc: $crc. Computed crc: $computedCrc") + + struct.getArray(ProducerEntriesField).map { producerEntryObj => +val producerEntryStruct = producerEntryObj.asInstanceOf[Struct] +val producerId: Long = producerEntryStruct.getLong(ProducerIdField) +val producerEpoch = producerEntryStruct.getShort(ProducerEpochField) +val seq = producerEntryStruct.getInt(LastSequenceField) +val offset = producerEntryStruct.getLong(LastOffsetField) +val timestamp = producerEntryStruct.getLong(TimestampField) +val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) +val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) +val
kafka git commit: KAFKA-5747; Producer snapshot loading should cover schema errors
Repository: kafka Updated Branches: refs/heads/trunk 8a5a84dc6 -> bf3bfd674 KAFKA-5747; Producer snapshot loading should cover schema errors Author: Jason GustafsonReviewers: Apurva Mehta , Ismael Juma Closes #3688 from hachikuji/KAFKA-5747 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bf3bfd67 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bf3bfd67 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bf3bfd67 Branch: refs/heads/trunk Commit: bf3bfd6749610eb49a4e47bdda2e667ca7458ff2 Parents: 8a5a84d Author: Jason Gustafson Authored: Mon Aug 21 10:22:53 2017 -0700 Committer: Jason Gustafson Committed: Mon Aug 21 10:22:53 2017 -0700 -- .../scala/kafka/log/ProducerStateManager.scala | 61 +++- .../kafka/log/ProducerStateManagerTest.scala| 58 +++ 2 files changed, 91 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/bf3bfd67/core/src/main/scala/kafka/log/ProducerStateManager.scala -- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 974a50e..fc2e340 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -285,32 +285,37 @@ object ProducerStateManager { new Field(ProducerEntriesField, new ArrayOf(ProducerSnapshotEntrySchema), "The entries in the producer table")) def readSnapshot(file: File): Iterable[ProducerIdEntry] = { -val buffer = Files.readAllBytes(file.toPath) -val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer)) - -val version = struct.getShort(VersionField) -if (version != ProducerSnapshotVersion) - throw new IllegalArgumentException(s"Unhandled snapshot file version $version") - -val crc = struct.getUnsignedInt(CrcField) -val computedCrc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset) -if (crc != computedCrc) - throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). " + -s"Stored crc: $crc. Computed crc: $computedCrc") - -struct.getArray(ProducerEntriesField).map { producerEntryObj => - val producerEntryStruct = producerEntryObj.asInstanceOf[Struct] - val producerId: Long = producerEntryStruct.getLong(ProducerIdField) - val producerEpoch = producerEntryStruct.getShort(ProducerEpochField) - val seq = producerEntryStruct.getInt(LastSequenceField) - val offset = producerEntryStruct.getLong(LastOffsetField) - val timestamp = producerEntryStruct.getLong(TimestampField) - val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) - val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) - val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) - val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, offsetDelta, timestamp, -coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None) - newEntry +try { + val buffer = Files.readAllBytes(file.toPath) + val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer)) + + val version = struct.getShort(VersionField) + if (version != ProducerSnapshotVersion) +throw new CorruptSnapshotException(s"Snapshot contained an unknown file version $version") + + val crc = struct.getUnsignedInt(CrcField) + val computedCrc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset) + if (crc != computedCrc) +throw new CorruptSnapshotException(s"Snapshot is corrupt (CRC is no longer valid). " + + s"Stored crc: $crc. Computed crc: $computedCrc") + + struct.getArray(ProducerEntriesField).map { producerEntryObj => +val producerEntryStruct = producerEntryObj.asInstanceOf[Struct] +val producerId: Long = producerEntryStruct.getLong(ProducerIdField) +val producerEpoch = producerEntryStruct.getShort(ProducerEpochField) +val seq = producerEntryStruct.getInt(LastSequenceField) +val offset = producerEntryStruct.getLong(LastOffsetField) +val timestamp = producerEntryStruct.getLong(TimestampField) +val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) +val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) +val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) +val newEntry = ProducerIdEntry(producerId,