Repository: kafka Updated Branches: refs/heads/trunk d06f2cc7a -> 8d8ab2ebc
KAFKA-5108; Add support for reading PID snapshot files to DumpLogSegments Author: Jason Gustafson <ja...@confluent.io> Reviewers: Guozhang Wang <wangg...@gmail.com> Closes #2922 from hachikuji/KAFKA-5108 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d8ab2eb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d8ab2eb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d8ab2eb Branch: refs/heads/trunk Commit: 8d8ab2ebcd498982a9dd6be873f4d7cc032c65d8 Parents: d06f2cc Author: Jason Gustafson <ja...@confluent.io> Authored: Wed Apr 26 13:46:35 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Apr 26 13:46:35 2017 -0700 ---------------------------------------------------------------------- .../scala/kafka/log/ProducerIdMapping.scala | 6 ++-- .../scala/kafka/tools/DumpLogSegments.scala | 38 ++++++++++++++------ 2 files changed, 31 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8ab2eb/core/src/main/scala/kafka/log/ProducerIdMapping.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerIdMapping.scala b/core/src/main/scala/kafka/log/ProducerIdMapping.scala index fc6ff0b..bcadce5 100644 --- a/core/src/main/scala/kafka/log/ProducerIdMapping.scala +++ b/core/src/main/scala/kafka/log/ProducerIdMapping.scala @@ -95,7 +95,7 @@ private[log] class ProducerAppendInfo(val pid: Long, initialEntry: ProducerIdEnt ProducerIdEntry(epoch, lastSeq, lastOffset, lastSeq - firstSeq, maxTimestamp) } -private[log] class CorruptSnapshotException(msg: String) extends KafkaException(msg) +class CorruptSnapshotException(msg: String) extends KafkaException(msg) object ProducerIdMapping { private val PidSnapshotVersion: Short = 1 @@ -127,7 +127,7 @@ object ProducerIdMapping { new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"), new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID table")) - private def readSnapshot(file: File): Iterable[(Long, ProducerIdEntry)] = { + def readSnapshot(file: File): Iterable[(Long, ProducerIdEntry)] = { val buffer = Files.readAllBytes(file.toPath) val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer)) @@ -138,7 +138,7 @@ object ProducerIdMapping { val crc = struct.getUnsignedInt(CrcField) val computedCrc = Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset) if (crc != computedCrc) - throw new CorruptSnapshotException(s"Snapshot file is corrupted (CRC is no longer valid). Stored crc: ${crc}. Computed crc: ${computedCrc}") + throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). Stored crc: $crc. Computed crc: $computedCrc") struct.getArray(PidEntriesField).map { pidEntryObj => val pidEntryStruct = pidEntryObj.asInstanceOf[Struct] http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8ab2eb/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 038c15b..bea6e9e 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -92,15 +92,21 @@ object DumpLogSegments { for(arg <- files) { val file = new File(arg) - if(file.getName.endsWith(Log.LogFileSuffix)) { - println("Dumping " + file) - dumpLog(file, printDataLog, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize , messageParser) - } else if(file.getName.endsWith(Log.IndexFileSuffix)) { - println("Dumping " + file) - dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize) - } else if(file.getName.endsWith(Log.TimeIndexFileSuffix)) { - println("Dumping " + file) - dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize) + println(s"Dumping $file") + + val filename = file.getName + val suffix = filename.substring(filename.lastIndexOf(".")) + suffix match { + case Log.LogFileSuffix => + dumpLog(file, printDataLog, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize , messageParser) + case Log.IndexFileSuffix => + dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize) + case Log.TimeIndexFileSuffix => + dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize) + case Log.PidSnapshotFileSuffix => + dumpPidSnapshot(file) + case _ => + System.err.println(s"Ignoring unknown file $file") } } @@ -117,7 +123,7 @@ object DumpLogSegments { nonConsecutivePairsForLogFilesMap.foreach { case (fileName, listOfNonConsecutivePairs) => { - System.err.println("Non-secutive offsets in :" + fileName) + System.err.println("Non-consecutive offsets in :" + fileName) listOfNonConsecutivePairs.foreach(m => { System.err.println(" %d is followed by %d".format(m._1, m._2)) }) @@ -125,6 +131,18 @@ object DumpLogSegments { } } + private def dumpPidSnapshot(file: File): Unit = { + try { + ProducerIdMapping.readSnapshot(file).foreach { case (pid, entry) => + println(s"pid: $pid epoch: ${entry.epoch} lastSequence: ${entry.lastSeq} lastOffset: ${entry.lastOffset} " + + s"offsetDelta: ${entry.offsetDelta} lastTimestamp: ${entry.timestamp}") + } + } catch { + case e: CorruptSnapshotException => + System.err.println(e.getMessage) + } + } + /* print out the contents of the index */ private def dumpIndex(file: File, indexSanityOnly: Boolean,