This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fa682623b9a KAFKA-16666 Migrate OffsetMessageFormatter to tools module 
(#16689)
fa682623b9a is described below

commit fa682623b9ad593eb1886642f67ef6686c8a11d4
Author: Ken Huang <[email protected]>
AuthorDate: Wed Jul 31 15:18:14 2024 +0800

    KAFKA-16666 Migrate OffsetMessageFormatter to tools module (#16689)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 build.gradle                                       |   2 +-
 checkstyle/import-control.xml                      |   1 +
 .../coordinator/group/GroupMetadataManager.scala   |   1 +
 .../tools/consumer/ConsoleConsumerOptions.java     |   4 +
 .../tools/consumer/OffsetsMessageFormatter.java    | 126 +++++++++++++++++
 .../tools/consumer/ConsoleConsumerOptionsTest.java |  56 ++++----
 .../kafka/tools/consumer/ConsoleConsumerTest.java  | 100 ++++++++++++--
 .../tools/consumer/OffsetMessageFormatterTest.java | 153 +++++++++++++++++++++
 8 files changed, 400 insertions(+), 43 deletions(-)

diff --git a/build.gradle b/build.gradle
index a15e00c0a6b..51f9659e587 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2117,6 +2117,7 @@ project(':tools') {
     implementation project(':connect:runtime')
     implementation project(':tools:tools-api')
     implementation project(':transaction-coordinator')
+    implementation project(':group-coordinator')
     implementation libs.argparse4j
     implementation libs.jacksonDatabind
     implementation libs.jacksonDataformatCsv
@@ -2140,7 +2141,6 @@ project(':tools') {
     testImplementation project(':connect:runtime')
     testImplementation project(':connect:runtime').sourceSets.test.output
     testImplementation project(':storage:storage-api').sourceSets.main.output
-    testImplementation project(':group-coordinator')
     testImplementation libs.junitJupiter
     testImplementation libs.mockitoCore
     testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 08c45d0aa38..a5784ef935c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -301,6 +301,7 @@
     <allow pkg="kafka.utils" />
     <allow pkg="scala.collection" />
     <allow pkg="org.apache.kafka.coordinator.transaction" />
+    <allow pkg="org.apache.kafka.coordinator.group" />
 
     <subpackage name="consumer">
       <allow pkg="org.apache.kafka.tools"/>
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 064df2f607d..5bb7216ab8d 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -1243,6 +1243,7 @@ object GroupMetadataManager {
 
   // Formatter for use with tools such as console consumer: Consumer should 
also set exclude.internal.topics to false.
   // (specify --formatter 
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" when 
consuming __consumer_offsets)
+  @Deprecated
   class OffsetsMessageFormatter extends MessageFormatter {
     def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], 
output: PrintStream): Unit = {
       Option(consumerRecord.key).map(key => 
GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
index 0a3a008b7a3..455ca885a53 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
@@ -368,6 +368,10 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
                 System.err.println("WARNING: 
kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is 
deprecated and will be removed in the next major release. " +
                         "Please use 
org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead");
                 return className;
+            case 
"kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter":
+                System.err.println("WARNING: 
kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter is 
deprecated and will be removed in the next major release. " +
+                        "Please use 
org.apache.kafka.tools.consumer.OffsetsMessageFormatter instead");
+                return className;
             default:
                 return className;
         }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java
new file mode 100644
index 00000000000..62dcb871c81
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import 
org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import 
org.apache.kafka.coordinator.group.generated.OffsetCommitValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Formatter for use with tools such as console consumer: Consumer should also 
set exclude.internal.topics to false.
+ */
+public class OffsetsMessageFormatter implements MessageFormatter {
+
+    private static final String VERSION = "version";
+    private static final String DATA = "data";
+    private static final String KEY = "key";
+    private static final String VALUE = "value";
+    private static final String UNKNOWN = "unknown";
+
+    @Override
+    public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, 
PrintStream output) {
+        ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+
+        byte[] key = consumerRecord.key();
+        if (Objects.nonNull(key)) {
+            short keyVersion = ByteBuffer.wrap(key).getShort();
+            JsonNode dataNode = readToGroupMetadataKey(ByteBuffer.wrap(key))
+                    .map(logKey -> transferMetadataToJsonNode(logKey, 
keyVersion))
+                    .orElseGet(() -> new TextNode(UNKNOWN));
+            // Only print if the message is an offset record.
+            if (dataNode instanceof NullNode) {
+                return;
+            }
+            json.putObject(KEY)
+                    .put(VERSION, keyVersion)
+                    .set(DATA, dataNode);
+        } else {
+            json.set(KEY, NullNode.getInstance());
+        }
+
+        byte[] value = consumerRecord.value();
+        if (Objects.nonNull(value)) {
+            short valueVersion = ByteBuffer.wrap(value).getShort();
+            JsonNode dataNode = readToOffsetCommitValue(ByteBuffer.wrap(value))
+                    .map(logValue -> 
OffsetCommitValueJsonConverter.write(logValue, valueVersion))
+                    .orElseGet(() -> new TextNode(UNKNOWN));
+            json.putObject(VALUE)
+                    .put(VERSION, valueVersion)
+                    .set(DATA, dataNode);
+        } else {
+            json.set(VALUE, NullNode.getInstance());
+        }
+
+        try {
+            output.write(json.toString().getBytes(UTF_8));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Optional<ApiMessage> readToGroupMetadataKey(ByteBuffer byteBuffer) 
{
+        short version = byteBuffer.getShort();
+        if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION
+                && version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
+            return Optional.of(new OffsetCommitKey(new 
ByteBufferAccessor(byteBuffer), version));
+        } else if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && 
version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
+            return Optional.of(new GroupMetadataKey(new 
ByteBufferAccessor(byteBuffer), version));
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    private static JsonNode transferMetadataToJsonNode(ApiMessage logKey, 
short keyVersion) {
+        if (logKey instanceof OffsetCommitKey) {
+            return OffsetCommitKeyJsonConverter.write((OffsetCommitKey) 
logKey, keyVersion);
+        } else if (logKey instanceof GroupMetadataKey) {
+            return NullNode.getInstance();
+        } else {
+            return new TextNode(UNKNOWN);
+        }
+    }
+
+    private Optional<OffsetCommitValue> readToOffsetCommitValue(ByteBuffer 
byteBuffer) {
+        short version = byteBuffer.getShort();
+        if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION
+                && version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
+            return Optional.of(new OffsetCommitValue(new 
ByteBufferAccessor(byteBuffer), version));
+        } else {
+            return Optional.empty();
+        }
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
index 383b39700ce..7c84a10fd44 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
@@ -649,50 +649,50 @@ public class ConsoleConsumerOptionsTest {
 
     @Test
     public void testParseDeprecatedFormatter() throws Exception {
-        String[] deprecatedDefaultMessageFormatter = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--partition", "0",
-            "--formatter", "kafka.tools.DefaultMessageFormatter",
-        };
+        String[] deprecatedDefaultMessageFormatter = 
generateArgsForFormatter("kafka.tools.DefaultMessageFormatter");
         assertInstanceOf(DefaultMessageFormatter.class, new 
ConsoleConsumerOptions(deprecatedDefaultMessageFormatter).formatter());
 
-        String[] deprecatedLoggingMessageFormatter = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--partition", "0",
-            "--formatter", "kafka.tools.LoggingMessageFormatter",
-        };
+        String[] deprecatedLoggingMessageFormatter = 
generateArgsForFormatter("kafka.tools.LoggingMessageFormatter");
         assertInstanceOf(LoggingMessageFormatter.class, new 
ConsoleConsumerOptions(deprecatedLoggingMessageFormatter).formatter());
 
-        String[] deprecatedNoOpMessageFormatter = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--partition", "0",
-            "--formatter", "kafka.tools.NoOpMessageFormatter",
-        };
+        String[] deprecatedNoOpMessageFormatter = 
generateArgsForFormatter("kafka.tools.NoOpMessageFormatter");
         assertInstanceOf(NoOpMessageFormatter.class, new 
ConsoleConsumerOptions(deprecatedNoOpMessageFormatter).formatter());
     }
 
     @SuppressWarnings("deprecation")
     @Test
     public void testNewAndDeprecateTransactionLogMessageFormatter() throws 
Exception {
-        String[] deprecatedTransactionLogMessageFormatter = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--partition", "0",
-            "--formatter", 
"kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter",
-        };
+        String[] deprecatedTransactionLogMessageFormatter = 
+                
generateArgsForFormatter("kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter");
         
assertInstanceOf(kafka.coordinator.transaction.TransactionLog.TransactionLogMessageFormatter.class,
 
                 new 
ConsoleConsumerOptions(deprecatedTransactionLogMessageFormatter).formatter());
 
-        String[] transactionLogMessageFormatter = new String[]{
+        String[] transactionLogMessageFormatter = 
+                
generateArgsForFormatter("org.apache.kafka.tools.consumer.TransactionLogMessageFormatter");
+        assertInstanceOf(TransactionLogMessageFormatter.class, 
+                new 
ConsoleConsumerOptions(transactionLogMessageFormatter).formatter());
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testNewAndDeprecateOffsetsMessageFormatter() throws Exception {
+        String[] deprecatedOffsetsMessageFormatter = 
+                
generateArgsForFormatter("kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter");
+        
assertInstanceOf(kafka.coordinator.group.GroupMetadataManager.OffsetsMessageFormatter.class,
+                new 
ConsoleConsumerOptions(deprecatedOffsetsMessageFormatter).formatter());
+
+        String[] offsetsMessageFormatter = 
+                
generateArgsForFormatter("org.apache.kafka.tools.consumer.OffsetsMessageFormatter");
+        assertInstanceOf(OffsetsMessageFormatter.class,
+                new 
ConsoleConsumerOptions(offsetsMessageFormatter).formatter());
+    }
+    
+    private String[] generateArgsForFormatter(String formatter) {
+        return new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
             "--partition", "0",
-            "--formatter", 
"org.apache.kafka.tools.consumer.TransactionLogMessageFormatter",
+            "--formatter", formatter,
         };
-        assertInstanceOf(TransactionLogMessageFormatter.class, 
-                new 
ConsoleConsumerOptions(transactionLogMessageFormatter).formatter());
     }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
index d849a8f5315..6e378b3a98e 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
@@ -39,6 +39,10 @@ import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import 
org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import 
org.apache.kafka.coordinator.group.generated.OffsetCommitValueJsonConverter;
 import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
 import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
 import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
@@ -66,6 +70,7 @@ import java.util.regex.Pattern;
 import static java.util.Collections.singleton;
 import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
@@ -91,6 +96,7 @@ import static org.mockito.Mockito.when;
 public class ConsoleConsumerTest {
 
     private final String topic = "test-topic";
+    private final String groupId = "test-group";
     private final String transactionId = "transactional-id";
     private final ObjectMapper objectMapper = new ObjectMapper();
 
@@ -289,7 +295,7 @@ public class ConsoleConsumerTest {
 
             NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
             admin.createTopics(singleton(newTopic));
-            produceMessages(cluster);
+            produceMessagesWithTxn(cluster);
 
             String[] transactionLogMessageFormatter = new String[]{
                 "--bootstrap-server", cluster.bootstrapServers(),
@@ -299,7 +305,7 @@ public class ConsoleConsumerTest {
             };
 
             ConsoleConsumerOptions options = new 
ConsoleConsumerOptions(transactionLogMessageFormatter);
-            ConsoleConsumer.ConsumerWrapper consumerWrapper = new 
ConsoleConsumer.ConsumerWrapper(options, createConsumer(cluster));
+            ConsoleConsumer.ConsumerWrapper consumerWrapper = new 
ConsoleConsumer.ConsumerWrapper(options, createTxnConsumer(cluster));
             
             try (ByteArrayOutputStream out = new ByteArrayOutputStream();
                  PrintStream output = new PrintStream(out)) {
@@ -325,8 +331,52 @@ public class ConsoleConsumerTest {
         }
     }
 
-    private void produceMessages(ClusterInstance cluster) {
-        try (Producer<byte[], byte[]> producer = createProducer(cluster)) {
+    @ClusterTest(brokers = 3)
+    public void testOffsetsMessageFormatter(ClusterInstance cluster) throws 
Exception {
+        try (Admin admin = cluster.createAdminClient()) {
+
+            NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
+            admin.createTopics(singleton(newTopic));
+            produceMessages(cluster);
+
+            String[] offsetsMessageFormatter = new String[]{
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--topic", Topic.GROUP_METADATA_TOPIC_NAME,
+                "--formatter", 
"org.apache.kafka.tools.consumer.OffsetsMessageFormatter"
+            };
+
+            ConsoleConsumerOptions options = new 
ConsoleConsumerOptions(offsetsMessageFormatter);
+            ConsoleConsumer.ConsumerWrapper consumerWrapper = new 
ConsoleConsumer.ConsumerWrapper(options, createOffsetConsumer(cluster));
+
+            try (ByteArrayOutputStream out = new ByteArrayOutputStream(); 
+                 PrintStream output = new PrintStream(out)) {
+                ConsoleConsumer.process(1, options.formatter(), 
consumerWrapper, output, true);
+
+                JsonNode jsonNode = 
objectMapper.reader().readTree(out.toByteArray());
+                JsonNode keyNode = jsonNode.get("key");
+
+                OffsetCommitKey offsetCommitKey =
+                        OffsetCommitKeyJsonConverter.read(keyNode.get("data"), 
OffsetCommitKey.HIGHEST_SUPPORTED_VERSION);
+                assertNotNull(offsetCommitKey);
+                assertEquals(Topic.GROUP_METADATA_TOPIC_NAME, 
offsetCommitKey.topic());
+                assertEquals(groupId, offsetCommitKey.group());
+
+                JsonNode valueNode = jsonNode.get("value");
+                OffsetCommitValue offsetCommitValue =
+                        
OffsetCommitValueJsonConverter.read(valueNode.get("data"), 
OffsetCommitValue.HIGHEST_SUPPORTED_VERSION);
+                assertNotNull(offsetCommitValue);
+                assertEquals(0, offsetCommitValue.offset());
+                assertEquals(-1, offsetCommitValue.leaderEpoch());
+                assertNotNull(offsetCommitValue.metadata());
+                assertEquals(-1, offsetCommitValue.expireTimestamp());
+            } finally {
+                consumerWrapper.cleanup();
+            }
+        }
+    }
+
+    private void produceMessagesWithTxn(ClusterInstance cluster) {
+        try (Producer<byte[], byte[]> producer = createTxnProducer(cluster)) {
             producer.initTransactions();
             producer.beginTransaction();
             producer.send(new ProducerRecord<>(topic, new byte[1_000 * 100]));
@@ -334,26 +384,48 @@ public class ConsoleConsumerTest {
         }
     }
 
-    private Producer<byte[], byte[]> createProducer(ClusterInstance cluster) {
-        Properties props = new Properties();
-        props.put(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+    private void produceMessages(ClusterInstance cluster) {
+        try (Producer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps(cluster))) {
+            producer.send(new ProducerRecord<>(topic, new byte[1_000 * 100]));
+        }
+    }
+
+    private Producer<byte[], byte[]> createTxnProducer(ClusterInstance 
cluster) {
+        Properties props = producerProps(cluster);
         props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
         props.put(ACKS_CONFIG, "all");
         props.put(TRANSACTIONAL_ID_CONFIG, transactionId);
-        props.put(KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-        props.put(VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         return new KafkaProducer<>(props);
     }
 
-    private Consumer<byte[], byte[]> createConsumer(ClusterInstance cluster) {
+    private Consumer<byte[], byte[]> createTxnConsumer(ClusterInstance 
cluster) {
+        Properties props = consumerProps(cluster);
+        props.put(ISOLATION_LEVEL_CONFIG, "read_committed");
+        props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return new KafkaConsumer<>(props);
+    }
+
+    private Consumer<byte[], byte[]> createOffsetConsumer(ClusterInstance 
cluster) {
+        Properties props = consumerProps(cluster);
+        props.put(EXCLUDE_INTERNAL_TOPICS_CONFIG, "false");
+        return new KafkaConsumer<>(props);
+    }
+    
+    private Properties producerProps(ClusterInstance cluster) {
+        Properties props = new Properties();
+        props.put(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+        props.put(KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        props.put(VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        return props;
+    }
+    
+    private Properties consumerProps(ClusterInstance cluster) {
         Properties props = new Properties();
         props.put(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
         props.put(KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
         props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
         props.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
RangeAssignor.class.getName());
-        props.put(ISOLATION_LEVEL_CONFIG, "read_committed");
-        props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.put(GROUP_ID_CONFIG, "test-group");
-        return new KafkaConsumer<>(props);
+        props.put(GROUP_ID_CONFIG, groupId);
+        return props;
     }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
new file mode 100644
index 00000000000..f2c4a8e3e34
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class OffsetMessageFormatterTest {
+
+    private static final OffsetCommitKey OFFSET_COMMIT_KEY = new 
OffsetCommitKey()
+            .setGroup("group-id")
+            .setTopic("foo")
+            .setPartition(1);
+    private static final OffsetCommitValue OFFSET_COMMIT_VALUE = new 
OffsetCommitValue()
+            .setOffset(100L)
+            .setLeaderEpoch(10)
+            .setMetadata("metadata")
+            .setCommitTimestamp(1234L)
+            .setExpireTimestamp(-1L);
+    private static final GroupMetadataKey GROUP_METADATA_KEY = new 
GroupMetadataKey().setGroup("group-id");
+    private static final GroupMetadataValue GROUP_METADATA_VALUE = new 
GroupMetadataValue()
+            .setProtocolType("consumer")
+            .setGeneration(1)
+            .setProtocol("range")
+            .setLeader("leader")
+            .setMembers(Collections.emptyList());
+    private static final String TOPIC = "TOPIC";
+
+    private static Stream<Arguments> parameters() {
+        return Stream.of(
+                Arguments.of(
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 10, 
OFFSET_COMMIT_KEY).array(),
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 10, 
OFFSET_COMMIT_VALUE).array(),
+                        
"{\"key\":{\"version\":10,\"data\":\"unknown\"},\"value\":{\"version\":10,\"data\":\"unknown\"}}"
+                ),
+                Arguments.of(
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_VALUE).array(),
+                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                            
"\"value\":{\"version\":0,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
+                            "\"commitTimestamp\":1234}}}"
+                ),
+                Arguments.of(
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 1, 
OFFSET_COMMIT_VALUE).array(),
+                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                            
"\"value\":{\"version\":1,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
+                            
"\"commitTimestamp\":1234,\"expireTimestamp\":-1}}}"
+                ),
+                Arguments.of(
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 2, 
OFFSET_COMMIT_VALUE).array(),
+                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                            
"\"value\":{\"version\":2,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
+                            "\"commitTimestamp\":1234}}}"
+                ),
+                Arguments.of(
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 3, 
OFFSET_COMMIT_VALUE).array(),
+                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                            
"\"value\":{\"version\":3,\"data\":{\"offset\":100,\"leaderEpoch\":10," +
+                            
"\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}"
+                ),
+                Arguments.of(
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 4, 
OFFSET_COMMIT_VALUE).array(),
+                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                            
"\"value\":{\"version\":4,\"data\":{\"offset\":100,\"leaderEpoch\":10," +
+                            
"\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}"
+                ),
+                Arguments.of(
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 5, 
OFFSET_COMMIT_KEY).array(),
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 4, 
OFFSET_COMMIT_VALUE).array(),
+                        
"{\"key\":{\"version\":5,\"data\":\"unknown\"},\"value\":{\"version\":4," +
+                            
"\"data\":{\"offset\":100,\"leaderEpoch\":10,\"metadata\":\"metadata\"," +
+                            "\"commitTimestamp\":1234}}}"
+                ),
+                Arguments.of(
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 5, 
OFFSET_COMMIT_VALUE).array(),
+                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                            "\"value\":{\"version\":5,\"data\":\"unknown\"}}"
+                ),
+                Arguments.of(
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 0, 
OFFSET_COMMIT_KEY).array(),
+                        null,
+                        
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
 +
+                            "\"value\":null}"),
+                Arguments.of(
+                        null,
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 1, 
OFFSET_COMMIT_VALUE).array(),
+                        
"{\"key\":null,\"value\":{\"version\":1,\"data\":{\"offset\":100,\"metadata\":\"metadata\","
 +
+                            
"\"commitTimestamp\":1234,\"expireTimestamp\":-1}}}"),
+                Arguments.of(null, null, "{\"key\":null,\"value\":null}"),
+                Arguments.of(
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 2, 
GROUP_METADATA_KEY).array(),
+                        MessageUtil.toVersionPrefixedByteBuffer((short) 2, 
GROUP_METADATA_VALUE).array(),
+                        ""
+                )
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void testTransactionLogMessageFormatter(byte[] keyBuffer, byte[] 
valueBuffer, String expectedOutput) {
+        ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
+                TOPIC, 0, 0,
+                0L, TimestampType.CREATE_TIME, 0,
+                0, keyBuffer, valueBuffer,
+                new RecordHeaders(), Optional.empty());
+        
+        try (MessageFormatter formatter = new OffsetsMessageFormatter()) {
+            formatter.configure(emptyMap());
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            formatter.writeTo(record, new PrintStream(out));
+            assertEquals(expectedOutput, out.toString());
+        }
+    }
+}

Reply via email to