chia7712 commented on code in PR #15652:
URL: https://github.com/apache/kafka/pull/15652#discussion_r1589164350


##########
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##########
@@ -399,6 +404,191 @@ class DumpLogSegmentsTest {
     assertEquals(partialBatches, partialBatchesCount)
   }
 
+  @Test
+  def testOffsetsMessageParser(): Unit = {
+    val serde = new RecordSerde()
+    val parser = new OffsetsMessageParser()
+
+    def serializedRecord(key: ApiMessageAndVersion, value: 
ApiMessageAndVersion): Record = {
+      val record = new group.Record(key, value)
+      TestUtils.singletonRecord(key = serde.serializeKey(record), value = 
serde.serializeValue(record))

Review Comment:
   Maybe we can reuse the `singletonRecords`. for example:
   ```scala
   TestUtils.singletonRecords(key = serde.serializeKey(record), value = 
serde.serializeValue(record)).records().iterator().next()
   ```
   
   I try to avoid creating a huge/chaos `TestUtils`, Especially, we just 
cleanup `TestUtils` (#15808).



##########
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##########
@@ -398,9 +404,141 @@ object DumpLogSegments {
     }
   }
 
-  private class OffsetsMessageParser extends MessageParser[String, String] {
+  // Package private for testing.
+  class OffsetsMessageParser extends MessageParser[String, String] {
+    private val serde = new RecordSerde()
+
+    private def prepareKey(message: Message, version: Short): String = {
+      val messageAsJson = message match {
+        case m: OffsetCommitKey =>
+          OffsetCommitKeyJsonConverter.write(m, version)
+        case m: GroupMetadataKey =>
+          GroupMetadataKeyJsonConverter.write(m, version)
+        case m: ConsumerGroupMetadataKey =>
+          ConsumerGroupMetadataKeyJsonConverter.write(m, version)
+        case m: ConsumerGroupPartitionMetadataKey =>
+          ConsumerGroupPartitionMetadataKeyJsonConverter.write(m, version)
+        case m: ConsumerGroupMemberMetadataKey =>
+          ConsumerGroupMemberMetadataKeyJsonConverter.write(m, version)
+        case m: ConsumerGroupTargetAssignmentMetadataKey =>
+          ConsumerGroupTargetAssignmentMetadataKeyJsonConverter.write(m, 
version)
+        case m: ConsumerGroupTargetAssignmentMemberKey =>
+          ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, version)
+        case m: ConsumerGroupCurrentMemberAssignmentKey =>
+          ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, 
version)
+        case _ => throw new UnknownRecordTypeException(version)
+      }
+
+      val json = new ObjectNode(JsonNodeFactory.instance)
+      json.set("type", new TextNode(version.toString))
+      json.set("data", messageAsJson)
+      json.toString
+    }
+
+    private def prepareGroupMetadataValue(message: GroupMetadataValue, 
version: Short): JsonNode = {
+      val json = GroupMetadataValueJsonConverter.write(message, version)
+
+      def replace[T](
+        node: JsonNode,
+        field: String,
+        reader: (org.apache.kafka.common.protocol.Readable, Short) => T,
+        writer: (T, Short) => JsonNode
+      ): Unit = {
+        Option(node.get(field)).foreach { subscription =>

Review Comment:
   We use this function to convert "assignment" too, so the name `subscription` 
is a bit weird to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to