Do anyone have feedback on this? ☺

From: Mohanraj Nagasamy <techie.mohan...@gmail.com>
Date: Wednesday, July 1, 2020 at 6:29 PM
To: "dev@kafka.apache.org" <dev@kafka.apache.org>
Subject: Feedback: Print schemaId using bin/kafka-dump-log.sh

When I try to dump kafka logs for diagnosing or debugging a problem, It's handy 
to see if the kafka log message schemaId or not. If it has got, print the 
schemaId.

I'm soliciting feedback as to whether it is worth making this change to 
kafka-core codebase.

I'm new to the kafka-community - forgive me if this wasn't part of the process.

This is the change I made:

```
 core/src/main/scala/kafka/tools/DumpLogSegments.scala | 21 
+++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 9e9546a92..a8750ac3d 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -35,6 +35,7 @@ object DumpLogSegments {

   // visible for testing
   private[tools] val RecordIndent = "|"
+  private val MAGIC_BYTE = 0x0

   def main(args: Array[String]): Unit = {
     val opts = new DumpLogSegmentsOptions(args)
@@ -277,8 +278,24 @@ object DumpLogSegments {
               }
             } else if (printContents) {
               val (key, payload) = parser.parse(record)
-              key.foreach(key => print(s" key: $key"))
-              payload.foreach(payload => print(s" payload: $payload"))
+              key.foreach(key => {
+                val keyBuffer = record.key()
+                if (keyBuffer.get() == MAGIC_BYTE) {
+                  print(s" keySchemaId: ${keyBuffer.getInt} key: $key")
+                }
+                else {
+                  print(s" key: $key")
+                }
+              })
+
+              payload.foreach(payload => {
+                val valueBuffer = record.value()
+                if (valueBuffer.get() == MAGIC_BYTE) {
+                  print(s" payloadSchemaId: ${valueBuffer.getInt} payload: 
$payload")
+                } else {
+                  print(s" payload: $payload")
+                }
+              })
             }
             println()
           }
(END)
```

And this is how the output looks like:

```
$ bin/kafka-dump-log.sh --files 
data/kafka/logdir/avro_topic_test-0/00000000000000000000.log --print-data-log

| offset: 50 CreateTime: 1593570942959 keysize: -1 valuesize: 16 sequence: -1 
headerKeys: [] payloadSchemaId: 1 payload:
TracRowe
baseOffset: 51 lastOffset: 51 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 2918 CreateTime: 1593570958044 size: 101 magic: 2 
compresscodec: NONE crc: 1913155179 isvalid: true
| offset: 51 CreateTime: 1593570958044 keysize: 3 valuesize: 30 sequence: -1 
headerKeys: [] key: ... payloadSchemaId: 2 payload: .iRKoMVeoVVnTmQEuqwSTHZQ
baseOffset: 52 lastOffset: 52 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 3019 CreateTime: 1593570969001 size: 84 magic: 2 
compresscodec: NONE crc: 2188466405 isvalid: true
```

-Mohan

Reply via email to