This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1d6e3d6cb3 KAFKA-13845: Add support for reading KRaft snapshots in 
kafka-dump-log (#12084)
1d6e3d6cb3 is described below

commit 1d6e3d6cb3a798983cafb0367b90c3a51579c364
Author: dengziming <[email protected]>
AuthorDate: Thu Jun 2 05:49:00 2022 +0800

    KAFKA-13845: Add support for reading KRaft snapshots in kafka-dump-log 
(#12084)
    
    The kafka-dump-log command should accept files with a suffix of 
".checkpoint". It should also decode and print using JSON the snapshot header 
and footer control records.
    
    Reviewers: José Armando García Sancio <[email protected]>
---
 .../kafka/common/record/ControlRecordUtils.java    |  6 +-
 .../main/scala/kafka/tools/DumpLogSegments.scala   | 20 ++++-
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     | 97 +++++++++++++++++++---
 .../java/org/apache/kafka/snapshot/Snapshots.java  |  2 +-
 .../kafka/snapshot/SnapshotWriterReaderTest.java   |  2 +-
 5 files changed, 106 insertions(+), 21 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java 
b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
index e74f6417fe..66a4a14d22 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
@@ -28,9 +28,9 @@ import java.nio.ByteBuffer;
  */
 public class ControlRecordUtils {
 
-    public static final short LEADER_CHANGE_SCHEMA_HIGHEST_VERSION = new 
LeaderChangeMessage().highestSupportedVersion();
-    public static final short SNAPSHOT_HEADER_HIGHEST_VERSION = new 
SnapshotHeaderRecord().highestSupportedVersion();
-    public static final short SNAPSHOT_FOOTER_HIGHEST_VERSION = new 
SnapshotFooterRecord().highestSupportedVersion();
+    public static final short LEADER_CHANGE_SCHEMA_HIGHEST_VERSION = 
LeaderChangeMessage.HIGHEST_SUPPORTED_VERSION;
+    public static final short SNAPSHOT_HEADER_HIGHEST_VERSION = 
SnapshotHeaderRecord.HIGHEST_SUPPORTED_VERSION;
+    public static final short SNAPSHOT_FOOTER_HIGHEST_VERSION = 
SnapshotFooterRecord.HIGHEST_SUPPORTED_VERSION;
 
     public static LeaderChangeMessage deserializeLeaderChangeMessage(Record 
record) {
         ControlRecordType recordType = ControlRecordType.parse(record.key());
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 28594cc12b..b57342ff29 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -18,7 +18,6 @@
 package kafka.tools
 
 import java.io._
-
 import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, 
ObjectNode, TextNode}
 import kafka.coordinator.group.GroupMetadataManager
 import kafka.coordinator.transaction.TransactionLog
@@ -26,11 +25,13 @@ import kafka.log._
 import kafka.serializer.Decoder
 import kafka.utils._
 import kafka.utils.Implicits._
+import org.apache.kafka.common.message.{SnapshotFooterRecordJsonConverter, 
SnapshotHeaderRecordJsonConverter}
 import org.apache.kafka.common.metadata.{MetadataJsonConverters, 
MetadataRecordType}
 import org.apache.kafka.common.protocol.ByteBufferAccessor
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.snapshot.Snapshots
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
@@ -57,7 +58,7 @@ object DumpLogSegments {
       val filename = file.getName
       val suffix = filename.substring(filename.lastIndexOf("."))
       suffix match {
-        case UnifiedLog.LogFileSuffix =>
+        case UnifiedLog.LogFileSuffix | Snapshots.SUFFIX =>
           dumpLog(file, opts.shouldPrintDataLog, 
nonConsecutivePairsForLogFilesMap, opts.isDeepIteration,
             opts.messageParser, opts.skipRecordMetadata, opts.maxBytes)
         case UnifiedLog.IndexFileSuffix =>
@@ -248,8 +249,13 @@ object DumpLogSegments {
                       parser: MessageParser[_, _],
                       skipRecordMetadata: Boolean,
                       maxBytes: Int): Unit = {
-    val startOffset = file.getName.split("\\.")(0).toLong
-    println("Starting offset: " + startOffset)
+    if (file.getName.endsWith(UnifiedLog.LogFileSuffix)) {
+      val startOffset = file.getName.split("\\.")(0).toLong
+      println(s"Log starting offset: $startOffset")
+    } else if (file.getName.endsWith(Snapshots.SUFFIX)) {
+      val path = Snapshots.parse(file.toPath).get()
+      println(s"Snapshot end offset: ${path.snapshotId.offset}, epoch: 
${path.snapshotId.epoch}")
+    }
     val fileRecords = FileRecords.open(file, false).slice(0, maxBytes)
     try {
       var validBytes = 0L
@@ -288,6 +294,12 @@ object DumpLogSegments {
                   case ControlRecordType.ABORT | ControlRecordType.COMMIT =>
                     val endTxnMarker = EndTransactionMarker.deserialize(record)
                     print(s" endTxnMarker: ${endTxnMarker.controlType} 
coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
+                  case ControlRecordType.SNAPSHOT_HEADER =>
+                    val header = 
ControlRecordUtils.deserializedSnapshotHeaderRecord(record)
+                    print(s" SnapshotHeader 
${SnapshotHeaderRecordJsonConverter.write(header, header.version())}")
+                  case ControlRecordType.SNAPSHOT_FOOTER =>
+                    val footer = 
ControlRecordUtils.deserializedSnapshotFooterRecord(record)
+                    print(s" SnapshotFooter 
${SnapshotFooterRecordJsonConverter.write(footer, footer.version())}")
                   case controlType =>
                     print(s" controlType: $controlType($controlTypeId)")
                 }
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala 
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index edc02d8fd8..5d5e462b5a 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -22,17 +22,21 @@ import java.nio.ByteBuffer
 import java.util
 import java.util.Properties
 
-import kafka.log.{AppendOrigin, UnifiedLog, LogConfig, LogManager, 
LogTestUtils}
-import kafka.server.{BrokerTopicStats, FetchLogEnd, LogDirFailureChannel}
+import kafka.log.{AppendOrigin, Defaults, LogConfig, LogManager, LogTestUtils, 
UnifiedLog}
+import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
+import kafka.server.{BrokerTopicStats, FetchLogEnd, KafkaRaftServer, 
LogDirFailureChannel}
 import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metadata.{PartitionChangeRecord, 
RegisterBrokerRecord, TopicRecord}
 import org.apache.kafka.common.protocol.{ByteBufferAccessor, 
ObjectSerializationCache}
 import org.apache.kafka.common.record.{CompressionType, ControlRecordType, 
EndTransactionMarker, MemoryRecords, RecordVersion, SimpleRecord}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch}
 import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.RecordsSnapshotWriter
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
@@ -49,6 +53,7 @@ class DumpLogSegmentsTest {
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
   val segmentName = "00000000000000000000"
   val logFilePath = s"$logDir/$segmentName.log"
+  val snapshotPath = s"$logDir/00000000000000000000-0000000000.checkpoint"
   val indexFilePath = s"$logDir/$segmentName.index"
   val timeIndexFilePath = s"$logDir/$segmentName.timeindex"
   val time = new MockTime(0, 0)
@@ -256,13 +261,14 @@ class DumpLogSegmentsTest {
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 
records:_*), leaderEpoch = 1)
     log.flush(false)
 
-    var output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"false", "--files", logFilePath))
-    assert(output.contains("TOPIC_RECORD"))
-    assert(output.contains("BROKER_RECORD"))
+    var output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"--files", logFilePath))
+    assertTrue(output.contains("Log starting offset: 0"))
+    assertTrue(output.contains("TOPIC_RECORD"))
+    assertTrue(output.contains("BROKER_RECORD"))
 
-    output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"--skip-record-metadata", "false", "--files", logFilePath))
-    assert(output.contains("TOPIC_RECORD"))
-    assert(output.contains("BROKER_RECORD"))
+    output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"--skip-record-metadata", "--files", logFilePath))
+    assertTrue(output.contains("TOPIC_RECORD"))
+    assertTrue(output.contains("BROKER_RECORD"))
 
     // Bogus metadata record
     val buf = ByteBuffer.allocate(4)
@@ -272,10 +278,77 @@ class DumpLogSegmentsTest {
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord(null, buf.array)), leaderEpoch = 2)
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 
records:_*), leaderEpoch = 2)
 
-    output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"--skip-record-metadata", "false", "--files", logFilePath))
-    assert(output.contains("TOPIC_RECORD"))
-    assert(output.contains("BROKER_RECORD"))
-    assert(output.contains("skipping"))
+    output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"--skip-record-metadata", "--files", logFilePath))
+    assertTrue(output.contains("TOPIC_RECORD"))
+    assertTrue(output.contains("BROKER_RECORD"))
+    assertTrue(output.contains("skipping"))
+  }
+
+  @Test
+  def testDumpMetadataSnapshot(): Unit = {
+    val metadataRecords = Seq(
+      new ApiMessageAndVersion(
+        new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(10), 
0.toShort),
+      new ApiMessageAndVersion(
+        new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(20), 
0.toShort),
+      new ApiMessageAndVersion(
+        new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()), 
0.toShort),
+      new ApiMessageAndVersion(
+        new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
+          setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+    )
+
+    val metadataLog = KafkaMetadataLog(
+      KafkaRaftServer.MetadataPartition,
+      KafkaRaftServer.MetadataTopicId,
+      logDir,
+      time,
+      time.scheduler,
+      MetadataLogConfig(
+        logSegmentBytes = 100 * 1024,
+        logSegmentMinBytes = 100 * 1024,
+        logSegmentMillis = 10 * 1000,
+        retentionMaxBytes = 100 * 1024,
+        retentionMillis = 60 * 1000,
+        maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+        maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+        fileDeleteDelayMs = Defaults.FileDeleteDelayMs,
+        nodeId = 1
+      )
+    )
+
+    val lastContainedLogTimestamp = 10000
+
+    TestUtils.resource(
+      RecordsSnapshotWriter.createWithHeader(
+        () => metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)),
+        1024,
+        MemoryPool.NONE,
+        new MockTime,
+        lastContainedLogTimestamp,
+        CompressionType.NONE,
+        new MetadataRecordSerde
+      ).get()
+    ) { snapshotWriter =>
+      snapshotWriter.append(metadataRecords.asJava)
+      snapshotWriter.freeze()
+    }
+
+    var output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"--files", snapshotPath))
+    assertTrue(output.contains("Snapshot end offset: 0, epoch: 0"))
+    assertTrue(output.contains("TOPIC_RECORD"))
+    assertTrue(output.contains("BROKER_RECORD"))
+    assertTrue(output.contains("SnapshotHeader"))
+    assertTrue(output.contains("SnapshotFooter"))
+    
assertTrue(output.contains(s""""lastContainedLogTimestamp":$lastContainedLogTimestamp"""))
+
+    output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"--skip-record-metadata", "--files", snapshotPath))
+    assertTrue(output.contains("Snapshot end offset: 0, epoch: 0"))
+    assertTrue(output.contains("TOPIC_RECORD"))
+    assertTrue(output.contains("BROKER_RECORD"))
+    assertFalse(output.contains("SnapshotHeader"))
+    assertFalse(output.contains("SnapshotFooter"))
+    assertFalse(output.contains(s""""lastContainedLogTimestamp": 
$lastContainedLogTimestamp"""))
   }
 
   @Test
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java 
b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
index a4d3b5a8cd..337e56a7f8 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
@@ -30,7 +30,7 @@ import java.util.Optional;
 
 public final class Snapshots {
     private static final Logger log = LoggerFactory.getLogger(Snapshots.class);
-    private static final String SUFFIX = ".checkpoint";
+    public static final String SUFFIX = ".checkpoint";
     private static final String PARTIAL_SUFFIX = String.format("%s.part", 
SUFFIX);
     private static final String DELETE_SUFFIX = String.format("%s.deleted", 
SUFFIX);
 
diff --git 
a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java 
b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
index d251e36359..05d1929f27 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
@@ -238,7 +238,7 @@ final public class SnapshotWriterReaderTest {
         assertTrue(batch.isControlBatch());
 
         SnapshotFooterRecord footerRecord = 
ControlRecordUtils.deserializedSnapshotFooterRecord(record);
-        assertEquals(footerRecord.version(), 
ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION);
+        assertEquals(footerRecord.version(), 
ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
 
         return countRecords;
     }

Reply via email to