chia7712 commented on code in PR #15652: URL: https://github.com/apache/kafka/pull/15652#discussion_r1589164350
########## core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala: ########## @@ -399,6 +404,191 @@ class DumpLogSegmentsTest { assertEquals(partialBatches, partialBatchesCount) } + @Test + def testOffsetsMessageParser(): Unit = { + val serde = new RecordSerde() + val parser = new OffsetsMessageParser() + + def serializedRecord(key: ApiMessageAndVersion, value: ApiMessageAndVersion): Record = { + val record = new group.Record(key, value) + TestUtils.singletonRecord(key = serde.serializeKey(record), value = serde.serializeValue(record)) Review Comment: Maybe we can reuse the `singletonRecords`. for example: ```scala TestUtils.singletonRecords(key = serde.serializeKey(record), value = serde.serializeValue(record)).records().iterator().next() ``` I try to avoid creating a huge/chaos `TestUtils`, Especially, we just cleanup `TestUtils` (#15808). ########## core/src/main/scala/kafka/tools/DumpLogSegments.scala: ########## @@ -398,9 +404,141 @@ object DumpLogSegments { } } - private class OffsetsMessageParser extends MessageParser[String, String] { + // Package private for testing. + class OffsetsMessageParser extends MessageParser[String, String] { + private val serde = new RecordSerde() + + private def prepareKey(message: Message, version: Short): String = { + val messageAsJson = message match { + case m: OffsetCommitKey => + OffsetCommitKeyJsonConverter.write(m, version) + case m: GroupMetadataKey => + GroupMetadataKeyJsonConverter.write(m, version) + case m: ConsumerGroupMetadataKey => + ConsumerGroupMetadataKeyJsonConverter.write(m, version) + case m: ConsumerGroupPartitionMetadataKey => + ConsumerGroupPartitionMetadataKeyJsonConverter.write(m, version) + case m: ConsumerGroupMemberMetadataKey => + ConsumerGroupMemberMetadataKeyJsonConverter.write(m, version) + case m: ConsumerGroupTargetAssignmentMetadataKey => + ConsumerGroupTargetAssignmentMetadataKeyJsonConverter.write(m, version) + case m: ConsumerGroupTargetAssignmentMemberKey => + ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, version) + case m: ConsumerGroupCurrentMemberAssignmentKey => + ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, version) + case _ => throw new UnknownRecordTypeException(version) + } + + val json = new ObjectNode(JsonNodeFactory.instance) + json.set("type", new TextNode(version.toString)) + json.set("data", messageAsJson) + json.toString + } + + private def prepareGroupMetadataValue(message: GroupMetadataValue, version: Short): JsonNode = { + val json = GroupMetadataValueJsonConverter.write(message, version) + + def replace[T]( + node: JsonNode, + field: String, + reader: (org.apache.kafka.common.protocol.Readable, Short) => T, + writer: (T, Short) => JsonNode + ): Unit = { + Option(node.get(field)).foreach { subscription => Review Comment: We use this function to convert "assignment" too, so the name `subscription` is a bit weird to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org