dajac commented on code in PR #15652:
URL: https://github.com/apache/kafka/pull/15652#discussion_r1589206306


##########
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##########
@@ -399,6 +404,191 @@ class DumpLogSegmentsTest {
     assertEquals(partialBatches, partialBatchesCount)
   }
 
+  @Test
+  def testOffsetsMessageParser(): Unit = {
+    val serde = new RecordSerde()
+    val parser = new OffsetsMessageParser()
+
+    def serializedRecord(key: ApiMessageAndVersion, value: 
ApiMessageAndVersion): Record = {
+      val record = new group.Record(key, value)
+      TestUtils.singletonRecord(key = serde.serializeKey(record), value = 
serde.serializeValue(record))

Review Comment:
   Works for me.



##########
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##########
@@ -398,9 +404,141 @@ object DumpLogSegments {
     }
   }
 
-  private class OffsetsMessageParser extends MessageParser[String, String] {
+  // Package private for testing.
+  class OffsetsMessageParser extends MessageParser[String, String] {
+    private val serde = new RecordSerde()
+
+    private def prepareKey(message: Message, version: Short): String = {
+      val messageAsJson = message match {
+        case m: OffsetCommitKey =>
+          OffsetCommitKeyJsonConverter.write(m, version)
+        case m: GroupMetadataKey =>
+          GroupMetadataKeyJsonConverter.write(m, version)
+        case m: ConsumerGroupMetadataKey =>
+          ConsumerGroupMetadataKeyJsonConverter.write(m, version)
+        case m: ConsumerGroupPartitionMetadataKey =>
+          ConsumerGroupPartitionMetadataKeyJsonConverter.write(m, version)
+        case m: ConsumerGroupMemberMetadataKey =>
+          ConsumerGroupMemberMetadataKeyJsonConverter.write(m, version)
+        case m: ConsumerGroupTargetAssignmentMetadataKey =>
+          ConsumerGroupTargetAssignmentMetadataKeyJsonConverter.write(m, 
version)
+        case m: ConsumerGroupTargetAssignmentMemberKey =>
+          ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, version)
+        case m: ConsumerGroupCurrentMemberAssignmentKey =>
+          ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, 
version)
+        case _ => throw new UnknownRecordTypeException(version)
+      }
+
+      val json = new ObjectNode(JsonNodeFactory.instance)
+      json.set("type", new TextNode(version.toString))
+      json.set("data", messageAsJson)
+      json.toString
+    }
+
+    private def prepareGroupMetadataValue(message: GroupMetadataValue, 
version: Short): JsonNode = {
+      val json = GroupMetadataValueJsonConverter.write(message, version)
+
+      def replace[T](
+        node: JsonNode,
+        field: String,
+        reader: (org.apache.kafka.common.protocol.Readable, Short) => T,
+        writer: (T, Short) => JsonNode
+      ): Unit = {
+        Option(node.get(field)).foreach { subscription =>

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to