Repository: kafka Updated Branches: refs/heads/0.11.0 fbd41d64d -> b4f3345b5
KAFKA-5747; Producer snapshot loading should cover schema errors Author: Jason Gustafson <ja...@confluent.io> Reviewers: Apurva Mehta <apu...@confluent.io>, Ismael Juma <ism...@juma.me.uk> Closes #3688 from hachikuji/KAFKA-5747 (cherry picked from commit bf3bfd6749610eb49a4e47bdda2e667ca7458ff2) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b4f3345b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b4f3345b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b4f3345b Branch: refs/heads/0.11.0 Commit: b4f3345b5188c134cdf09cb067053b44976b2b0f Parents: fbd41d6 Author: Jason Gustafson <ja...@confluent.io> Authored: Mon Aug 21 10:22:53 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Aug 21 10:23:41 2017 -0700 ---------------------------------------------------------------------- .../scala/kafka/log/ProducerStateManager.scala | 61 +++++++++++--------- .../kafka/log/ProducerStateManagerTest.scala | 58 +++++++++++++++++++ 2 files changed, 91 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b4f3345b/core/src/main/scala/kafka/log/ProducerStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 7cc8e8e..7a1962a 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -285,32 +285,37 @@ object ProducerStateManager { new Field(ProducerEntriesField, new ArrayOf(ProducerSnapshotEntrySchema), "The entries in the producer table")) def readSnapshot(file: File): Iterable[ProducerIdEntry] = { - val buffer = Files.readAllBytes(file.toPath) - val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer)) - - val version = struct.getShort(VersionField) - if (version != ProducerSnapshotVersion) - throw new IllegalArgumentException(s"Unhandled snapshot file version $version") - - val crc = struct.getUnsignedInt(CrcField) - val computedCrc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset) - if (crc != computedCrc) - throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). " + - s"Stored crc: $crc. Computed crc: $computedCrc") - - struct.getArray(ProducerEntriesField).map { producerEntryObj => - val producerEntryStruct = producerEntryObj.asInstanceOf[Struct] - val producerId: Long = producerEntryStruct.getLong(ProducerIdField) - val producerEpoch = producerEntryStruct.getShort(ProducerEpochField) - val seq = producerEntryStruct.getInt(LastSequenceField) - val offset = producerEntryStruct.getLong(LastOffsetField) - val timestamp = producerEntryStruct.getLong(TimestampField) - val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) - val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) - val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) - val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, offsetDelta, timestamp, - coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None) - newEntry + try { + val buffer = Files.readAllBytes(file.toPath) + val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer)) + + val version = struct.getShort(VersionField) + if (version != ProducerSnapshotVersion) + throw new CorruptSnapshotException(s"Snapshot contained an unknown file version $version") + + val crc = struct.getUnsignedInt(CrcField) + val computedCrc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset) + if (crc != computedCrc) + throw new CorruptSnapshotException(s"Snapshot is corrupt (CRC is no longer valid). " + + s"Stored crc: $crc. Computed crc: $computedCrc") + + struct.getArray(ProducerEntriesField).map { producerEntryObj => + val producerEntryStruct = producerEntryObj.asInstanceOf[Struct] + val producerId: Long = producerEntryStruct.getLong(ProducerIdField) + val producerEpoch = producerEntryStruct.getShort(ProducerEpochField) + val seq = producerEntryStruct.getInt(LastSequenceField) + val offset = producerEntryStruct.getLong(LastOffsetField) + val timestamp = producerEntryStruct.getLong(TimestampField) + val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) + val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) + val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) + val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, offsetDelta, timestamp, + coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None) + newEntry + } + } catch { + case e: SchemaException => + throw new CorruptSnapshotException(s"Snapshot failed schema validation: ${e.getMessage}") } } @@ -436,7 +441,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, latestSnapshotFile match { case Some(file) => try { - info(s"Loading producer state from snapshot file ${file.getName} for partition $topicPartition") + info(s"Loading producer state from snapshot file '$file' for partition $topicPartition") val loadedProducers = readSnapshot(file).filter { producerEntry => isProducerRetained(producerEntry, logStartOffset) && !isProducerExpired(currentTime, producerEntry) } @@ -446,7 +451,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, return } catch { case e: CorruptSnapshotException => - error(s"Snapshot file at ${file.getPath} is corrupt: ${e.getMessage}") + warn(s"Failed to load producer snapshot from '$file': ${e.getMessage}") Files.deleteIfExists(file.toPath) } case None => http://git-wip-us.apache.org/repos/asf/kafka/blob/b4f3345b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 3cc68ad..9a324aa 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -18,6 +18,9 @@ package kafka.log import java.io.File +import java.nio.ByteBuffer +import java.nio.channels.FileChannel +import java.nio.file.{OpenOption, StandardOpenOption} import kafka.server.LogOffsetMetadata import kafka.utils.TestUtils @@ -608,6 +611,61 @@ class ProducerStateManagerTest extends JUnitSuite { appendEndTxnMarker(stateManager, producerId, producerEpoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 0) } + @Test + def testLoadFromEmptySnapshotFile(): Unit = { + testLoadFromCorruptSnapshot { file => + file.truncate(0L) + } + } + + @Test + def testLoadFromTruncatedSnapshotFile(): Unit = { + testLoadFromCorruptSnapshot { file => + // truncate to some arbitrary point in the middle of the snapshot + assertTrue(file.size > 2) + file.truncate(file.size / 2) + } + } + + @Test + def testLoadFromCorruptSnapshotFile(): Unit = { + testLoadFromCorruptSnapshot { file => + // write some garbage somewhere in the file + assertTrue(file.size > 2) + file.write(ByteBuffer.wrap(Array[Byte](37)), file.size / 2) + } + } + + private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel => Unit): Unit = { + val epoch = 0.toShort + val producerId = 1L + + append(stateManager, producerId, epoch, seq = 0, offset = 0L) + stateManager.takeSnapshot() + + append(stateManager, producerId, epoch, seq = 1, offset = 1L) + stateManager.takeSnapshot() + + // Truncate the last snapshot + val latestSnapshotOffset = stateManager.latestSnapshotOffset + assertEquals(Some(2L), latestSnapshotOffset) + val snapshotToTruncate = Log.producerSnapshotFile(logDir, latestSnapshotOffset.get) + val channel = FileChannel.open(snapshotToTruncate.toPath, StandardOpenOption.WRITE) + try { + makeFileCorrupt(channel) + } finally { + channel.close() + } + + // Ensure that the truncated snapshot is deleted and producer state is loaded from the previous snapshot + val reloadedStateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs) + reloadedStateManager.truncateAndReload(0L, 20L, time.milliseconds()) + assertFalse(snapshotToTruncate.exists()) + + val loadedProducerState = reloadedStateManager.activeProducers(producerId) + assertEquals(0L, loadedProducerState.lastOffset) + } + private def appendEndTxnMarker(mapping: ProducerStateManager, producerId: Long, producerEpoch: Short,