Repository: kafka
Updated Branches:
  refs/heads/0.11.0 fbd41d64d -> b4f3345b5


KAFKA-5747; Producer snapshot loading should cover schema errors

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <apu...@confluent.io>, Ismael Juma <ism...@juma.me.uk>

Closes #3688 from hachikuji/KAFKA-5747

(cherry picked from commit bf3bfd6749610eb49a4e47bdda2e667ca7458ff2)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b4f3345b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b4f3345b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b4f3345b

Branch: refs/heads/0.11.0
Commit: b4f3345b5188c134cdf09cb067053b44976b2b0f
Parents: fbd41d6
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Aug 21 10:22:53 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Aug 21 10:23:41 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/log/ProducerStateManager.scala  | 61 +++++++++++---------
 .../kafka/log/ProducerStateManagerTest.scala    | 58 +++++++++++++++++++
 2 files changed, 91 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b4f3345b/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 7cc8e8e..7a1962a 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -285,32 +285,37 @@ object ProducerStateManager {
     new Field(ProducerEntriesField, new ArrayOf(ProducerSnapshotEntrySchema), 
"The entries in the producer table"))
 
   def readSnapshot(file: File): Iterable[ProducerIdEntry] = {
-    val buffer = Files.readAllBytes(file.toPath)
-    val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
-
-    val version = struct.getShort(VersionField)
-    if (version != ProducerSnapshotVersion)
-      throw new IllegalArgumentException(s"Unhandled snapshot file version 
$version")
-
-    val crc = struct.getUnsignedInt(CrcField)
-    val computedCrc =  Crc32C.compute(buffer, ProducerEntriesOffset, 
buffer.length - ProducerEntriesOffset)
-    if (crc != computedCrc)
-      throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted 
(CRC is no longer valid). " +
-        s"Stored crc: $crc. Computed crc: $computedCrc")
-
-    struct.getArray(ProducerEntriesField).map { producerEntryObj =>
-      val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
-      val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
-      val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
-      val seq = producerEntryStruct.getInt(LastSequenceField)
-      val offset = producerEntryStruct.getLong(LastOffsetField)
-      val timestamp = producerEntryStruct.getLong(TimestampField)
-      val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
-      val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
-      val currentTxnFirstOffset = 
producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
-      val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, 
offsetDelta, timestamp,
-        coordinatorEpoch, if (currentTxnFirstOffset >= 0) 
Some(currentTxnFirstOffset) else None)
-      newEntry
+    try {
+      val buffer = Files.readAllBytes(file.toPath)
+      val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
+
+      val version = struct.getShort(VersionField)
+      if (version != ProducerSnapshotVersion)
+        throw new CorruptSnapshotException(s"Snapshot contained an unknown 
file version $version")
+
+      val crc = struct.getUnsignedInt(CrcField)
+      val computedCrc =  Crc32C.compute(buffer, ProducerEntriesOffset, 
buffer.length - ProducerEntriesOffset)
+      if (crc != computedCrc)
+        throw new CorruptSnapshotException(s"Snapshot is corrupt (CRC is no 
longer valid). " +
+          s"Stored crc: $crc. Computed crc: $computedCrc")
+
+      struct.getArray(ProducerEntriesField).map { producerEntryObj =>
+        val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
+        val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
+        val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
+        val seq = producerEntryStruct.getInt(LastSequenceField)
+        val offset = producerEntryStruct.getLong(LastOffsetField)
+        val timestamp = producerEntryStruct.getLong(TimestampField)
+        val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
+        val coordinatorEpoch = 
producerEntryStruct.getInt(CoordinatorEpochField)
+        val currentTxnFirstOffset = 
producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
+        val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, 
offsetDelta, timestamp,
+          coordinatorEpoch, if (currentTxnFirstOffset >= 0) 
Some(currentTxnFirstOffset) else None)
+        newEntry
+      }
+    } catch {
+      case e: SchemaException =>
+        throw new CorruptSnapshotException(s"Snapshot failed schema 
validation: ${e.getMessage}")
     }
   }
 
@@ -436,7 +441,7 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
       latestSnapshotFile match {
         case Some(file) =>
           try {
-            info(s"Loading producer state from snapshot file ${file.getName} 
for partition $topicPartition")
+            info(s"Loading producer state from snapshot file '$file' for 
partition $topicPartition")
             val loadedProducers = readSnapshot(file).filter { producerEntry =>
               isProducerRetained(producerEntry, logStartOffset) && 
!isProducerExpired(currentTime, producerEntry)
             }
@@ -446,7 +451,7 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
             return
           } catch {
             case e: CorruptSnapshotException =>
-              error(s"Snapshot file at ${file.getPath} is corrupt: 
${e.getMessage}")
+              warn(s"Failed to load producer snapshot from '$file': 
${e.getMessage}")
               Files.deleteIfExists(file.toPath)
           }
         case None =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/b4f3345b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 3cc68ad..9a324aa 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -18,6 +18,9 @@
 package kafka.log
 
 import java.io.File
+import java.nio.ByteBuffer
+import java.nio.channels.FileChannel
+import java.nio.file.{OpenOption, StandardOpenOption}
 
 import kafka.server.LogOffsetMetadata
 import kafka.utils.TestUtils
@@ -608,6 +611,61 @@ class ProducerStateManagerTest extends JUnitSuite {
     appendEndTxnMarker(stateManager, producerId, producerEpoch, 
ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 0)
   }
 
+  @Test
+  def testLoadFromEmptySnapshotFile(): Unit = {
+    testLoadFromCorruptSnapshot { file =>
+      file.truncate(0L)
+    }
+  }
+
+  @Test
+  def testLoadFromTruncatedSnapshotFile(): Unit = {
+    testLoadFromCorruptSnapshot { file =>
+      // truncate to some arbitrary point in the middle of the snapshot
+      assertTrue(file.size > 2)
+      file.truncate(file.size / 2)
+    }
+  }
+
+  @Test
+  def testLoadFromCorruptSnapshotFile(): Unit = {
+    testLoadFromCorruptSnapshot { file =>
+      // write some garbage somewhere in the file
+      assertTrue(file.size > 2)
+      file.write(ByteBuffer.wrap(Array[Byte](37)), file.size / 2)
+    }
+  }
+
+  private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel => 
Unit): Unit = {
+    val epoch = 0.toShort
+    val producerId = 1L
+
+    append(stateManager, producerId, epoch, seq = 0, offset = 0L)
+    stateManager.takeSnapshot()
+
+    append(stateManager, producerId, epoch, seq = 1, offset = 1L)
+    stateManager.takeSnapshot()
+
+    // Truncate the last snapshot
+    val latestSnapshotOffset = stateManager.latestSnapshotOffset
+    assertEquals(Some(2L), latestSnapshotOffset)
+    val snapshotToTruncate = Log.producerSnapshotFile(logDir, 
latestSnapshotOffset.get)
+    val channel = FileChannel.open(snapshotToTruncate.toPath, 
StandardOpenOption.WRITE)
+    try {
+      makeFileCorrupt(channel)
+    } finally {
+      channel.close()
+    }
+
+    // Ensure that the truncated snapshot is deleted and producer state is 
loaded from the previous snapshot
+    val reloadedStateManager = new ProducerStateManager(partition, logDir, 
maxPidExpirationMs)
+    reloadedStateManager.truncateAndReload(0L, 20L, time.milliseconds())
+    assertFalse(snapshotToTruncate.exists())
+
+    val loadedProducerState = reloadedStateManager.activeProducers(producerId)
+    assertEquals(0L, loadedProducerState.lastOffset)
+  }
+
   private def appendEndTxnMarker(mapping: ProducerStateManager,
                                  producerId: Long,
                                  producerEpoch: Short,

Reply via email to