This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1d6e3d6cb3 KAFKA-13845: Add support for reading KRaft snapshots in
kafka-dump-log (#12084)
1d6e3d6cb3 is described below
commit 1d6e3d6cb3a798983cafb0367b90c3a51579c364
Author: dengziming <[email protected]>
AuthorDate: Thu Jun 2 05:49:00 2022 +0800
KAFKA-13845: Add support for reading KRaft snapshots in kafka-dump-log
(#12084)
The kafka-dump-log command should accept files with a suffix of
".checkpoint". It should also decode and print using JSON the snapshot header
and footer control records.
Reviewers: José Armando García Sancio <[email protected]>
---
.../kafka/common/record/ControlRecordUtils.java | 6 +-
.../main/scala/kafka/tools/DumpLogSegments.scala | 20 ++++-
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 97 +++++++++++++++++++---
.../java/org/apache/kafka/snapshot/Snapshots.java | 2 +-
.../kafka/snapshot/SnapshotWriterReaderTest.java | 2 +-
5 files changed, 106 insertions(+), 21 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
index e74f6417fe..66a4a14d22 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
@@ -28,9 +28,9 @@ import java.nio.ByteBuffer;
*/
public class ControlRecordUtils {
- public static final short LEADER_CHANGE_SCHEMA_HIGHEST_VERSION = new
LeaderChangeMessage().highestSupportedVersion();
- public static final short SNAPSHOT_HEADER_HIGHEST_VERSION = new
SnapshotHeaderRecord().highestSupportedVersion();
- public static final short SNAPSHOT_FOOTER_HIGHEST_VERSION = new
SnapshotFooterRecord().highestSupportedVersion();
+ public static final short LEADER_CHANGE_SCHEMA_HIGHEST_VERSION =
LeaderChangeMessage.HIGHEST_SUPPORTED_VERSION;
+ public static final short SNAPSHOT_HEADER_HIGHEST_VERSION =
SnapshotHeaderRecord.HIGHEST_SUPPORTED_VERSION;
+ public static final short SNAPSHOT_FOOTER_HIGHEST_VERSION =
SnapshotFooterRecord.HIGHEST_SUPPORTED_VERSION;
public static LeaderChangeMessage deserializeLeaderChangeMessage(Record
record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 28594cc12b..b57342ff29 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -18,7 +18,6 @@
package kafka.tools
import java.io._
-
import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory,
ObjectNode, TextNode}
import kafka.coordinator.group.GroupMetadataManager
import kafka.coordinator.transaction.TransactionLog
@@ -26,11 +25,13 @@ import kafka.log._
import kafka.serializer.Decoder
import kafka.utils._
import kafka.utils.Implicits._
+import org.apache.kafka.common.message.{SnapshotFooterRecordJsonConverter,
SnapshotHeaderRecordJsonConverter}
import org.apache.kafka.common.metadata.{MetadataJsonConverters,
MetadataRecordType}
import org.apache.kafka.common.protocol.ByteBufferAccessor
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.snapshot.Snapshots
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@@ -57,7 +58,7 @@ object DumpLogSegments {
val filename = file.getName
val suffix = filename.substring(filename.lastIndexOf("."))
suffix match {
- case UnifiedLog.LogFileSuffix =>
+ case UnifiedLog.LogFileSuffix | Snapshots.SUFFIX =>
dumpLog(file, opts.shouldPrintDataLog,
nonConsecutivePairsForLogFilesMap, opts.isDeepIteration,
opts.messageParser, opts.skipRecordMetadata, opts.maxBytes)
case UnifiedLog.IndexFileSuffix =>
@@ -248,8 +249,13 @@ object DumpLogSegments {
parser: MessageParser[_, _],
skipRecordMetadata: Boolean,
maxBytes: Int): Unit = {
- val startOffset = file.getName.split("\\.")(0).toLong
- println("Starting offset: " + startOffset)
+ if (file.getName.endsWith(UnifiedLog.LogFileSuffix)) {
+ val startOffset = file.getName.split("\\.")(0).toLong
+ println(s"Log starting offset: $startOffset")
+ } else if (file.getName.endsWith(Snapshots.SUFFIX)) {
+ val path = Snapshots.parse(file.toPath).get()
+ println(s"Snapshot end offset: ${path.snapshotId.offset}, epoch:
${path.snapshotId.epoch}")
+ }
val fileRecords = FileRecords.open(file, false).slice(0, maxBytes)
try {
var validBytes = 0L
@@ -288,6 +294,12 @@ object DumpLogSegments {
case ControlRecordType.ABORT | ControlRecordType.COMMIT =>
val endTxnMarker = EndTransactionMarker.deserialize(record)
print(s" endTxnMarker: ${endTxnMarker.controlType}
coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
+ case ControlRecordType.SNAPSHOT_HEADER =>
+ val header =
ControlRecordUtils.deserializedSnapshotHeaderRecord(record)
+ print(s" SnapshotHeader
${SnapshotHeaderRecordJsonConverter.write(header, header.version())}")
+ case ControlRecordType.SNAPSHOT_FOOTER =>
+ val footer =
ControlRecordUtils.deserializedSnapshotFooterRecord(record)
+ print(s" SnapshotFooter
${SnapshotFooterRecordJsonConverter.write(footer, footer.version())}")
case controlType =>
print(s" controlType: $controlType($controlTypeId)")
}
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index edc02d8fd8..5d5e462b5a 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -22,17 +22,21 @@ import java.nio.ByteBuffer
import java.util
import java.util.Properties
-import kafka.log.{AppendOrigin, UnifiedLog, LogConfig, LogManager,
LogTestUtils}
-import kafka.server.{BrokerTopicStats, FetchLogEnd, LogDirFailureChannel}
+import kafka.log.{AppendOrigin, Defaults, LogConfig, LogManager, LogTestUtils,
UnifiedLog}
+import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
+import kafka.server.{BrokerTopicStats, FetchLogEnd, KafkaRaftServer,
LogDirFailureChannel}
import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metadata.{PartitionChangeRecord,
RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.protocol.{ByteBufferAccessor,
ObjectSerializationCache}
import org.apache.kafka.common.record.{CompressionType, ControlRecordType,
EndTransactionMarker, MemoryRecords, RecordVersion, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch}
import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.RecordsSnapshotWriter
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -49,6 +53,7 @@ class DumpLogSegmentsTest {
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val segmentName = "00000000000000000000"
val logFilePath = s"$logDir/$segmentName.log"
+ val snapshotPath = s"$logDir/00000000000000000000-0000000000.checkpoint"
val indexFilePath = s"$logDir/$segmentName.index"
val timeIndexFilePath = s"$logDir/$segmentName.timeindex"
val time = new MockTime(0, 0)
@@ -256,13 +261,14 @@ class DumpLogSegmentsTest {
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
records:_*), leaderEpoch = 1)
log.flush(false)
- var output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"false", "--files", logFilePath))
- assert(output.contains("TOPIC_RECORD"))
- assert(output.contains("BROKER_RECORD"))
+ var output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"--files", logFilePath))
+ assertTrue(output.contains("Log starting offset: 0"))
+ assertTrue(output.contains("TOPIC_RECORD"))
+ assertTrue(output.contains("BROKER_RECORD"))
- output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"--skip-record-metadata", "false", "--files", logFilePath))
- assert(output.contains("TOPIC_RECORD"))
- assert(output.contains("BROKER_RECORD"))
+ output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"--skip-record-metadata", "--files", logFilePath))
+ assertTrue(output.contains("TOPIC_RECORD"))
+ assertTrue(output.contains("BROKER_RECORD"))
// Bogus metadata record
val buf = ByteBuffer.allocate(4)
@@ -272,10 +278,77 @@ class DumpLogSegmentsTest {
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord(null, buf.array)), leaderEpoch = 2)
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
records:_*), leaderEpoch = 2)
- output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"--skip-record-metadata", "false", "--files", logFilePath))
- assert(output.contains("TOPIC_RECORD"))
- assert(output.contains("BROKER_RECORD"))
- assert(output.contains("skipping"))
+ output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"--skip-record-metadata", "--files", logFilePath))
+ assertTrue(output.contains("TOPIC_RECORD"))
+ assertTrue(output.contains("BROKER_RECORD"))
+ assertTrue(output.contains("skipping"))
+ }
+
+ @Test
+ def testDumpMetadataSnapshot(): Unit = {
+ val metadataRecords = Seq(
+ new ApiMessageAndVersion(
+ new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(10),
0.toShort),
+ new ApiMessageAndVersion(
+ new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(20),
0.toShort),
+ new ApiMessageAndVersion(
+ new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()),
0.toShort),
+ new ApiMessageAndVersion(
+ new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
+ setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+ )
+
+ val metadataLog = KafkaMetadataLog(
+ KafkaRaftServer.MetadataPartition,
+ KafkaRaftServer.MetadataTopicId,
+ logDir,
+ time,
+ time.scheduler,
+ MetadataLogConfig(
+ logSegmentBytes = 100 * 1024,
+ logSegmentMinBytes = 100 * 1024,
+ logSegmentMillis = 10 * 1000,
+ retentionMaxBytes = 100 * 1024,
+ retentionMillis = 60 * 1000,
+ maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+ fileDeleteDelayMs = Defaults.FileDeleteDelayMs,
+ nodeId = 1
+ )
+ )
+
+ val lastContainedLogTimestamp = 10000
+
+ TestUtils.resource(
+ RecordsSnapshotWriter.createWithHeader(
+ () => metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)),
+ 1024,
+ MemoryPool.NONE,
+ new MockTime,
+ lastContainedLogTimestamp,
+ CompressionType.NONE,
+ new MetadataRecordSerde
+ ).get()
+ ) { snapshotWriter =>
+ snapshotWriter.append(metadataRecords.asJava)
+ snapshotWriter.freeze()
+ }
+
+ var output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"--files", snapshotPath))
+ assertTrue(output.contains("Snapshot end offset: 0, epoch: 0"))
+ assertTrue(output.contains("TOPIC_RECORD"))
+ assertTrue(output.contains("BROKER_RECORD"))
+ assertTrue(output.contains("SnapshotHeader"))
+ assertTrue(output.contains("SnapshotFooter"))
+
assertTrue(output.contains(s""""lastContainedLogTimestamp":$lastContainedLogTimestamp"""))
+
+ output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"--skip-record-metadata", "--files", snapshotPath))
+ assertTrue(output.contains("Snapshot end offset: 0, epoch: 0"))
+ assertTrue(output.contains("TOPIC_RECORD"))
+ assertTrue(output.contains("BROKER_RECORD"))
+ assertFalse(output.contains("SnapshotHeader"))
+ assertFalse(output.contains("SnapshotFooter"))
+ assertFalse(output.contains(s""""lastContainedLogTimestamp":
$lastContainedLogTimestamp"""))
}
@Test
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
index a4d3b5a8cd..337e56a7f8 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
@@ -30,7 +30,7 @@ import java.util.Optional;
public final class Snapshots {
private static final Logger log = LoggerFactory.getLogger(Snapshots.class);
- private static final String SUFFIX = ".checkpoint";
+ public static final String SUFFIX = ".checkpoint";
private static final String PARTIAL_SUFFIX = String.format("%s.part",
SUFFIX);
private static final String DELETE_SUFFIX = String.format("%s.deleted",
SUFFIX);
diff --git
a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
index d251e36359..05d1929f27 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
@@ -238,7 +238,7 @@ final public class SnapshotWriterReaderTest {
assertTrue(batch.isControlBatch());
SnapshotFooterRecord footerRecord =
ControlRecordUtils.deserializedSnapshotFooterRecord(record);
- assertEquals(footerRecord.version(),
ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION);
+ assertEquals(footerRecord.version(),
ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
return countRecords;
}