kafka git commit: KAFKA-5747; Producer snapshot loading should cover schema errors

2017-08-21 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 fbd41d64d -> b4f3345b5


KAFKA-5747; Producer snapshot loading should cover schema errors

Author: Jason Gustafson 

Reviewers: Apurva Mehta , Ismael Juma 

Closes #3688 from hachikuji/KAFKA-5747

(cherry picked from commit bf3bfd6749610eb49a4e47bdda2e667ca7458ff2)
Signed-off-by: Jason Gustafson 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b4f3345b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b4f3345b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b4f3345b

Branch: refs/heads/0.11.0
Commit: b4f3345b5188c134cdf09cb067053b44976b2b0f
Parents: fbd41d6
Author: Jason Gustafson 
Authored: Mon Aug 21 10:22:53 2017 -0700
Committer: Jason Gustafson 
Committed: Mon Aug 21 10:23:41 2017 -0700

--
 .../scala/kafka/log/ProducerStateManager.scala  | 61 +++-
 .../kafka/log/ProducerStateManagerTest.scala| 58 +++
 2 files changed, 91 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/b4f3345b/core/src/main/scala/kafka/log/ProducerStateManager.scala
--
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 7cc8e8e..7a1962a 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -285,32 +285,37 @@ object ProducerStateManager {
 new Field(ProducerEntriesField, new ArrayOf(ProducerSnapshotEntrySchema), 
"The entries in the producer table"))
 
   def readSnapshot(file: File): Iterable[ProducerIdEntry] = {
-val buffer = Files.readAllBytes(file.toPath)
-val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
-
-val version = struct.getShort(VersionField)
-if (version != ProducerSnapshotVersion)
-  throw new IllegalArgumentException(s"Unhandled snapshot file version 
$version")
-
-val crc = struct.getUnsignedInt(CrcField)
-val computedCrc =  Crc32C.compute(buffer, ProducerEntriesOffset, 
buffer.length - ProducerEntriesOffset)
-if (crc != computedCrc)
-  throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted 
(CRC is no longer valid). " +
-s"Stored crc: $crc. Computed crc: $computedCrc")
-
-struct.getArray(ProducerEntriesField).map { producerEntryObj =>
-  val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
-  val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
-  val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
-  val seq = producerEntryStruct.getInt(LastSequenceField)
-  val offset = producerEntryStruct.getLong(LastOffsetField)
-  val timestamp = producerEntryStruct.getLong(TimestampField)
-  val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
-  val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
-  val currentTxnFirstOffset = 
producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
-  val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, 
offsetDelta, timestamp,
-coordinatorEpoch, if (currentTxnFirstOffset >= 0) 
Some(currentTxnFirstOffset) else None)
-  newEntry
+try {
+  val buffer = Files.readAllBytes(file.toPath)
+  val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
+
+  val version = struct.getShort(VersionField)
+  if (version != ProducerSnapshotVersion)
+throw new CorruptSnapshotException(s"Snapshot contained an unknown 
file version $version")
+
+  val crc = struct.getUnsignedInt(CrcField)
+  val computedCrc =  Crc32C.compute(buffer, ProducerEntriesOffset, 
buffer.length - ProducerEntriesOffset)
+  if (crc != computedCrc)
+throw new CorruptSnapshotException(s"Snapshot is corrupt (CRC is no 
longer valid). " +
+  s"Stored crc: $crc. Computed crc: $computedCrc")
+
+  struct.getArray(ProducerEntriesField).map { producerEntryObj =>
+val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
+val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
+val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
+val seq = producerEntryStruct.getInt(LastSequenceField)
+val offset = producerEntryStruct.getLong(LastOffsetField)
+val timestamp = producerEntryStruct.getLong(TimestampField)
+val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
+val coordinatorEpoch = 
producerEntryStruct.getInt(CoordinatorEpochField)
+val 

kafka git commit: KAFKA-5747; Producer snapshot loading should cover schema errors

2017-08-21 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/trunk 8a5a84dc6 -> bf3bfd674


KAFKA-5747; Producer snapshot loading should cover schema errors

Author: Jason Gustafson 

Reviewers: Apurva Mehta , Ismael Juma 

Closes #3688 from hachikuji/KAFKA-5747


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bf3bfd67
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bf3bfd67
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bf3bfd67

Branch: refs/heads/trunk
Commit: bf3bfd6749610eb49a4e47bdda2e667ca7458ff2
Parents: 8a5a84d
Author: Jason Gustafson 
Authored: Mon Aug 21 10:22:53 2017 -0700
Committer: Jason Gustafson 
Committed: Mon Aug 21 10:22:53 2017 -0700

--
 .../scala/kafka/log/ProducerStateManager.scala  | 61 +++-
 .../kafka/log/ProducerStateManagerTest.scala| 58 +++
 2 files changed, 91 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/bf3bfd67/core/src/main/scala/kafka/log/ProducerStateManager.scala
--
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 974a50e..fc2e340 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -285,32 +285,37 @@ object ProducerStateManager {
 new Field(ProducerEntriesField, new ArrayOf(ProducerSnapshotEntrySchema), 
"The entries in the producer table"))
 
   def readSnapshot(file: File): Iterable[ProducerIdEntry] = {
-val buffer = Files.readAllBytes(file.toPath)
-val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
-
-val version = struct.getShort(VersionField)
-if (version != ProducerSnapshotVersion)
-  throw new IllegalArgumentException(s"Unhandled snapshot file version 
$version")
-
-val crc = struct.getUnsignedInt(CrcField)
-val computedCrc =  Crc32C.compute(buffer, ProducerEntriesOffset, 
buffer.length - ProducerEntriesOffset)
-if (crc != computedCrc)
-  throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted 
(CRC is no longer valid). " +
-s"Stored crc: $crc. Computed crc: $computedCrc")
-
-struct.getArray(ProducerEntriesField).map { producerEntryObj =>
-  val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
-  val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
-  val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
-  val seq = producerEntryStruct.getInt(LastSequenceField)
-  val offset = producerEntryStruct.getLong(LastOffsetField)
-  val timestamp = producerEntryStruct.getLong(TimestampField)
-  val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
-  val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
-  val currentTxnFirstOffset = 
producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
-  val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, 
offsetDelta, timestamp,
-coordinatorEpoch, if (currentTxnFirstOffset >= 0) 
Some(currentTxnFirstOffset) else None)
-  newEntry
+try {
+  val buffer = Files.readAllBytes(file.toPath)
+  val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
+
+  val version = struct.getShort(VersionField)
+  if (version != ProducerSnapshotVersion)
+throw new CorruptSnapshotException(s"Snapshot contained an unknown 
file version $version")
+
+  val crc = struct.getUnsignedInt(CrcField)
+  val computedCrc =  Crc32C.compute(buffer, ProducerEntriesOffset, 
buffer.length - ProducerEntriesOffset)
+  if (crc != computedCrc)
+throw new CorruptSnapshotException(s"Snapshot is corrupt (CRC is no 
longer valid). " +
+  s"Stored crc: $crc. Computed crc: $computedCrc")
+
+  struct.getArray(ProducerEntriesField).map { producerEntryObj =>
+val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
+val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
+val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
+val seq = producerEntryStruct.getInt(LastSequenceField)
+val offset = producerEntryStruct.getLong(LastOffsetField)
+val timestamp = producerEntryStruct.getLong(TimestampField)
+val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
+val coordinatorEpoch = 
producerEntryStruct.getInt(CoordinatorEpochField)
+val currentTxnFirstOffset = 
producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
+val newEntry = ProducerIdEntry(producerId,