This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new fbdfd0d5963 KAFKA-16666 Migrate OffsetMessageFormatter to tools module
(#16689)
fbdfd0d5963 is described below
commit fbdfd0d59630a3d712acf406a96cfd200c093ea0
Author: Ken Huang <[email protected]>
AuthorDate: Wed Jul 31 15:18:14 2024 +0800
KAFKA-16666 Migrate OffsetMessageFormatter to tools module (#16689)
Reviewers: Chia-Ping Tsai <[email protected]>
---
build.gradle | 2 +-
checkstyle/import-control.xml | 1 +
.../coordinator/group/GroupMetadataManager.scala | 1 +
.../tools/consumer/ConsoleConsumerOptions.java | 4 +
.../tools/consumer/OffsetsMessageFormatter.java | 126 +++++++++++++++++
.../tools/consumer/ConsoleConsumerOptionsTest.java | 56 ++++----
.../kafka/tools/consumer/ConsoleConsumerTest.java | 100 ++++++++++++--
.../tools/consumer/OffsetMessageFormatterTest.java | 153 +++++++++++++++++++++
8 files changed, 400 insertions(+), 43 deletions(-)
diff --git a/build.gradle b/build.gradle
index a15e00c0a6b..51f9659e587 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2117,6 +2117,7 @@ project(':tools') {
implementation project(':connect:runtime')
implementation project(':tools:tools-api')
implementation project(':transaction-coordinator')
+ implementation project(':group-coordinator')
implementation libs.argparse4j
implementation libs.jacksonDatabind
implementation libs.jacksonDataformatCsv
@@ -2140,7 +2141,6 @@ project(':tools') {
testImplementation project(':connect:runtime')
testImplementation project(':connect:runtime').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.main.output
- testImplementation project(':group-coordinator')
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 08c45d0aa38..a5784ef935c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -301,6 +301,7 @@
<allow pkg="kafka.utils" />
<allow pkg="scala.collection" />
<allow pkg="org.apache.kafka.coordinator.transaction" />
+ <allow pkg="org.apache.kafka.coordinator.group" />
<subpackage name="consumer">
<allow pkg="org.apache.kafka.tools"/>
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 064df2f607d..5bb7216ab8d 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -1243,6 +1243,7 @@ object GroupMetadataManager {
// Formatter for use with tools such as console consumer: Consumer should
also set exclude.internal.topics to false.
// (specify --formatter
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" when
consuming __consumer_offsets)
+ @Deprecated
class OffsetsMessageFormatter extends MessageFormatter {
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]],
output: PrintStream): Unit = {
Option(consumerRecord.key).map(key =>
GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
index 0a3a008b7a3..455ca885a53 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
@@ -368,6 +368,10 @@ public final class ConsoleConsumerOptions extends
CommandDefaultOptions {
System.err.println("WARNING:
kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is
deprecated and will be removed in the next major release. " +
"Please use
org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead");
return className;
+ case
"kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter":
+ System.err.println("WARNING:
kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter is
deprecated and will be removed in the next major release. " +
+ "Please use
org.apache.kafka.tools.consumer.OffsetsMessageFormatter instead");
+ return className;
default:
return className;
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java
new file mode 100644
index 00000000000..62dcb871c81
--- /dev/null
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import
org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import
org.apache.kafka.coordinator.group.generated.OffsetCommitValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Formatter for use with tools such as console consumer: Consumer should also
set exclude.internal.topics to false.
+ */
+public class OffsetsMessageFormatter implements MessageFormatter {
+
+ private static final String VERSION = "version";
+ private static final String DATA = "data";
+ private static final String KEY = "key";
+ private static final String VALUE = "value";
+ private static final String UNKNOWN = "unknown";
+
+ @Override
+ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord,
PrintStream output) {
+ ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+
+ byte[] key = consumerRecord.key();
+ if (Objects.nonNull(key)) {
+ short keyVersion = ByteBuffer.wrap(key).getShort();
+ JsonNode dataNode = readToGroupMetadataKey(ByteBuffer.wrap(key))
+ .map(logKey -> transferMetadataToJsonNode(logKey,
keyVersion))
+ .orElseGet(() -> new TextNode(UNKNOWN));
+ // Only print if the message is an offset record.
+ if (dataNode instanceof NullNode) {
+ return;
+ }
+ json.putObject(KEY)
+ .put(VERSION, keyVersion)
+ .set(DATA, dataNode);
+ } else {
+ json.set(KEY, NullNode.getInstance());
+ }
+
+ byte[] value = consumerRecord.value();
+ if (Objects.nonNull(value)) {
+ short valueVersion = ByteBuffer.wrap(value).getShort();
+ JsonNode dataNode = readToOffsetCommitValue(ByteBuffer.wrap(value))
+ .map(logValue ->
OffsetCommitValueJsonConverter.write(logValue, valueVersion))
+ .orElseGet(() -> new TextNode(UNKNOWN));
+ json.putObject(VALUE)
+ .put(VERSION, valueVersion)
+ .set(DATA, dataNode);
+ } else {
+ json.set(VALUE, NullNode.getInstance());
+ }
+
+ try {
+ output.write(json.toString().getBytes(UTF_8));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Optional<ApiMessage> readToGroupMetadataKey(ByteBuffer byteBuffer)
{
+ short version = byteBuffer.getShort();
+ if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION
+ && version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
+ return Optional.of(new OffsetCommitKey(new
ByteBufferAccessor(byteBuffer), version));
+ } else if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION &&
version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
+ return Optional.of(new GroupMetadataKey(new
ByteBufferAccessor(byteBuffer), version));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ private static JsonNode transferMetadataToJsonNode(ApiMessage logKey,
short keyVersion) {
+ if (logKey instanceof OffsetCommitKey) {
+ return OffsetCommitKeyJsonConverter.write((OffsetCommitKey)
logKey, keyVersion);
+ } else if (logKey instanceof GroupMetadataKey) {
+ return NullNode.getInstance();
+ } else {
+ return new TextNode(UNKNOWN);
+ }
+ }
+
+ private Optional<OffsetCommitValue> readToOffsetCommitValue(ByteBuffer
byteBuffer) {
+ short version = byteBuffer.getShort();
+ if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION
+ && version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
+ return Optional.of(new OffsetCommitValue(new
ByteBufferAccessor(byteBuffer), version));
+ } else {
+ return Optional.empty();
+ }
+ }
+}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
index 383b39700ce..7c84a10fd44 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
@@ -649,50 +649,50 @@ public class ConsoleConsumerOptionsTest {
@Test
public void testParseDeprecatedFormatter() throws Exception {
- String[] deprecatedDefaultMessageFormatter = new String[]{
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--partition", "0",
- "--formatter", "kafka.tools.DefaultMessageFormatter",
- };
+ String[] deprecatedDefaultMessageFormatter =
generateArgsForFormatter("kafka.tools.DefaultMessageFormatter");
assertInstanceOf(DefaultMessageFormatter.class, new
ConsoleConsumerOptions(deprecatedDefaultMessageFormatter).formatter());
- String[] deprecatedLoggingMessageFormatter = new String[]{
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--partition", "0",
- "--formatter", "kafka.tools.LoggingMessageFormatter",
- };
+ String[] deprecatedLoggingMessageFormatter =
generateArgsForFormatter("kafka.tools.LoggingMessageFormatter");
assertInstanceOf(LoggingMessageFormatter.class, new
ConsoleConsumerOptions(deprecatedLoggingMessageFormatter).formatter());
- String[] deprecatedNoOpMessageFormatter = new String[]{
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--partition", "0",
- "--formatter", "kafka.tools.NoOpMessageFormatter",
- };
+ String[] deprecatedNoOpMessageFormatter =
generateArgsForFormatter("kafka.tools.NoOpMessageFormatter");
assertInstanceOf(NoOpMessageFormatter.class, new
ConsoleConsumerOptions(deprecatedNoOpMessageFormatter).formatter());
}
@SuppressWarnings("deprecation")
@Test
public void testNewAndDeprecateTransactionLogMessageFormatter() throws
Exception {
- String[] deprecatedTransactionLogMessageFormatter = new String[]{
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--partition", "0",
- "--formatter",
"kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter",
- };
+ String[] deprecatedTransactionLogMessageFormatter =
+
generateArgsForFormatter("kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter");
assertInstanceOf(kafka.coordinator.transaction.TransactionLog.TransactionLogMessageFormatter.class,
new
ConsoleConsumerOptions(deprecatedTransactionLogMessageFormatter).formatter());
- String[] transactionLogMessageFormatter = new String[]{
+ String[] transactionLogMessageFormatter =
+
generateArgsForFormatter("org.apache.kafka.tools.consumer.TransactionLogMessageFormatter");
+ assertInstanceOf(TransactionLogMessageFormatter.class,
+ new
ConsoleConsumerOptions(transactionLogMessageFormatter).formatter());
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testNewAndDeprecateOffsetsMessageFormatter() throws Exception {
+ String[] deprecatedOffsetsMessageFormatter =
+
generateArgsForFormatter("kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter");
+
assertInstanceOf(kafka.coordinator.group.GroupMetadataManager.OffsetsMessageFormatter.class,
+ new
ConsoleConsumerOptions(deprecatedOffsetsMessageFormatter).formatter());
+
+ String[] offsetsMessageFormatter =
+
generateArgsForFormatter("org.apache.kafka.tools.consumer.OffsetsMessageFormatter");
+ assertInstanceOf(OffsetsMessageFormatter.class,
+ new
ConsoleConsumerOptions(offsetsMessageFormatter).formatter());
+ }
+
+ private String[] generateArgsForFormatter(String formatter) {
+ return new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
- "--formatter",
"org.apache.kafka.tools.consumer.TransactionLogMessageFormatter",
+ "--formatter", formatter,
};
- assertInstanceOf(TransactionLogMessageFormatter.class,
- new
ConsoleConsumerOptions(transactionLogMessageFormatter).formatter());
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
index d849a8f5315..6e378b3a98e 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
@@ -39,6 +39,10 @@ import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import
org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import
org.apache.kafka.coordinator.group.generated.OffsetCommitValueJsonConverter;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
import
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
@@ -66,6 +70,7 @@ import java.util.regex.Pattern;
import static java.util.Collections.singleton;
import static
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
@@ -91,6 +96,7 @@ import static org.mockito.Mockito.when;
public class ConsoleConsumerTest {
private final String topic = "test-topic";
+ private final String groupId = "test-group";
private final String transactionId = "transactional-id";
private final ObjectMapper objectMapper = new ObjectMapper();
@@ -289,7 +295,7 @@ public class ConsoleConsumerTest {
NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
admin.createTopics(singleton(newTopic));
- produceMessages(cluster);
+ produceMessagesWithTxn(cluster);
String[] transactionLogMessageFormatter = new String[]{
"--bootstrap-server", cluster.bootstrapServers(),
@@ -299,7 +305,7 @@ public class ConsoleConsumerTest {
};
ConsoleConsumerOptions options = new
ConsoleConsumerOptions(transactionLogMessageFormatter);
- ConsoleConsumer.ConsumerWrapper consumerWrapper = new
ConsoleConsumer.ConsumerWrapper(options, createConsumer(cluster));
+ ConsoleConsumer.ConsumerWrapper consumerWrapper = new
ConsoleConsumer.ConsumerWrapper(options, createTxnConsumer(cluster));
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
PrintStream output = new PrintStream(out)) {
@@ -325,8 +331,52 @@ public class ConsoleConsumerTest {
}
}
- private void produceMessages(ClusterInstance cluster) {
- try (Producer<byte[], byte[]> producer = createProducer(cluster)) {
+ @ClusterTest(brokers = 3)
+ public void testOffsetsMessageFormatter(ClusterInstance cluster) throws
Exception {
+ try (Admin admin = cluster.createAdminClient()) {
+
+ NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
+ admin.createTopics(singleton(newTopic));
+ produceMessages(cluster);
+
+ String[] offsetsMessageFormatter = new String[]{
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--topic", Topic.GROUP_METADATA_TOPIC_NAME,
+ "--formatter",
"org.apache.kafka.tools.consumer.OffsetsMessageFormatter"
+ };
+
+ ConsoleConsumerOptions options = new
ConsoleConsumerOptions(offsetsMessageFormatter);
+ ConsoleConsumer.ConsumerWrapper consumerWrapper = new
ConsoleConsumer.ConsumerWrapper(options, createOffsetConsumer(cluster));
+
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+ PrintStream output = new PrintStream(out)) {
+ ConsoleConsumer.process(1, options.formatter(),
consumerWrapper, output, true);
+
+ JsonNode jsonNode =
objectMapper.reader().readTree(out.toByteArray());
+ JsonNode keyNode = jsonNode.get("key");
+
+ OffsetCommitKey offsetCommitKey =
+ OffsetCommitKeyJsonConverter.read(keyNode.get("data"),
OffsetCommitKey.HIGHEST_SUPPORTED_VERSION);
+ assertNotNull(offsetCommitKey);
+ assertEquals(Topic.GROUP_METADATA_TOPIC_NAME,
offsetCommitKey.topic());
+ assertEquals(groupId, offsetCommitKey.group());
+
+ JsonNode valueNode = jsonNode.get("value");
+ OffsetCommitValue offsetCommitValue =
+
OffsetCommitValueJsonConverter.read(valueNode.get("data"),
OffsetCommitValue.HIGHEST_SUPPORTED_VERSION);
+ assertNotNull(offsetCommitValue);
+ assertEquals(0, offsetCommitValue.offset());
+ assertEquals(-1, offsetCommitValue.leaderEpoch());
+ assertNotNull(offsetCommitValue.metadata());
+ assertEquals(-1, offsetCommitValue.expireTimestamp());
+ } finally {
+ consumerWrapper.cleanup();
+ }
+ }
+ }
+
+ private void produceMessagesWithTxn(ClusterInstance cluster) {
+ try (Producer<byte[], byte[]> producer = createTxnProducer(cluster)) {
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>(topic, new byte[1_000 * 100]));
@@ -334,26 +384,48 @@ public class ConsoleConsumerTest {
}
}
- private Producer<byte[], byte[]> createProducer(ClusterInstance cluster) {
- Properties props = new Properties();
- props.put(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+ private void produceMessages(ClusterInstance cluster) {
+ try (Producer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps(cluster))) {
+ producer.send(new ProducerRecord<>(topic, new byte[1_000 * 100]));
+ }
+ }
+
+ private Producer<byte[], byte[]> createTxnProducer(ClusterInstance
cluster) {
+ Properties props = producerProps(cluster);
props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ACKS_CONFIG, "all");
props.put(TRANSACTIONAL_ID_CONFIG, transactionId);
- props.put(KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
- props.put(VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
return new KafkaProducer<>(props);
}
- private Consumer<byte[], byte[]> createConsumer(ClusterInstance cluster) {
+ private Consumer<byte[], byte[]> createTxnConsumer(ClusterInstance
cluster) {
+ Properties props = consumerProps(cluster);
+ props.put(ISOLATION_LEVEL_CONFIG, "read_committed");
+ props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return new KafkaConsumer<>(props);
+ }
+
+ private Consumer<byte[], byte[]> createOffsetConsumer(ClusterInstance
cluster) {
+ Properties props = consumerProps(cluster);
+ props.put(EXCLUDE_INTERNAL_TOPICS_CONFIG, "false");
+ return new KafkaConsumer<>(props);
+ }
+
+ private Properties producerProps(ClusterInstance cluster) {
+ Properties props = new Properties();
+ props.put(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+ props.put(KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ props.put(VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ return props;
+ }
+
+ private Properties consumerProps(ClusterInstance cluster) {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
props.put(KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
props.put(VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
props.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
RangeAssignor.class.getName());
- props.put(ISOLATION_LEVEL_CONFIG, "read_committed");
- props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(GROUP_ID_CONFIG, "test-group");
- return new KafkaConsumer<>(props);
+ props.put(GROUP_ID_CONFIG, groupId);
+ return props;
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
new file mode 100644
index 00000000000..f2c4a8e3e34
--- /dev/null
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class OffsetMessageFormatterTest {
+
+ private static final OffsetCommitKey OFFSET_COMMIT_KEY = new
OffsetCommitKey()
+ .setGroup("group-id")
+ .setTopic("foo")
+ .setPartition(1);
+ private static final OffsetCommitValue OFFSET_COMMIT_VALUE = new
OffsetCommitValue()
+ .setOffset(100L)
+ .setLeaderEpoch(10)
+ .setMetadata("metadata")
+ .setCommitTimestamp(1234L)
+ .setExpireTimestamp(-1L);
+ private static final GroupMetadataKey GROUP_METADATA_KEY = new
GroupMetadataKey().setGroup("group-id");
+ private static final GroupMetadataValue GROUP_METADATA_VALUE = new
GroupMetadataValue()
+ .setProtocolType("consumer")
+ .setGeneration(1)
+ .setProtocol("range")
+ .setLeader("leader")
+ .setMembers(Collections.emptyList());
+ private static final String TOPIC = "TOPIC";
+
+ private static Stream<Arguments> parameters() {
+ return Stream.of(
+ Arguments.of(
+ MessageUtil.toVersionPrefixedByteBuffer((short) 10,
OFFSET_COMMIT_KEY).array(),
+ MessageUtil.toVersionPrefixedByteBuffer((short) 10,
OFFSET_COMMIT_VALUE).array(),
+
"{\"key\":{\"version\":10,\"data\":\"unknown\"},\"value\":{\"version\":10,\"data\":\"unknown\"}}"
+ ),
+ Arguments.of(
+ MessageUtil.toVersionPrefixedByteBuffer((short) 0,
OFFSET_COMMIT_KEY).array(),
+ MessageUtil.toVersionPrefixedByteBuffer((short) 0,
OFFSET_COMMIT_VALUE).array(),
+
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
+
+
"\"value\":{\"version\":0,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
+ "\"commitTimestamp\":1234}}}"
+ ),
+ Arguments.of(
+ MessageUtil.toVersionPrefixedByteBuffer((short) 0,
OFFSET_COMMIT_KEY).array(),
+ MessageUtil.toVersionPrefixedByteBuffer((short) 1,
OFFSET_COMMIT_VALUE).array(),
+
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
+
+
"\"value\":{\"version\":1,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
+
"\"commitTimestamp\":1234,\"expireTimestamp\":-1}}}"
+ ),
+ Arguments.of(
+ MessageUtil.toVersionPrefixedByteBuffer((short) 0,
OFFSET_COMMIT_KEY).array(),
+ MessageUtil.toVersionPrefixedByteBuffer((short) 2,
OFFSET_COMMIT_VALUE).array(),
+
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
+
+
"\"value\":{\"version\":2,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," +
+ "\"commitTimestamp\":1234}}}"
+ ),
+ Arguments.of(
+ MessageUtil.toVersionPrefixedByteBuffer((short) 0,
OFFSET_COMMIT_KEY).array(),
+ MessageUtil.toVersionPrefixedByteBuffer((short) 3,
OFFSET_COMMIT_VALUE).array(),
+
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
+
+
"\"value\":{\"version\":3,\"data\":{\"offset\":100,\"leaderEpoch\":10," +
+
"\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}"
+ ),
+ Arguments.of(
+ MessageUtil.toVersionPrefixedByteBuffer((short) 0,
OFFSET_COMMIT_KEY).array(),
+ MessageUtil.toVersionPrefixedByteBuffer((short) 4,
OFFSET_COMMIT_VALUE).array(),
+
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
+
+
"\"value\":{\"version\":4,\"data\":{\"offset\":100,\"leaderEpoch\":10," +
+
"\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}"
+ ),
+ Arguments.of(
+ MessageUtil.toVersionPrefixedByteBuffer((short) 5,
OFFSET_COMMIT_KEY).array(),
+ MessageUtil.toVersionPrefixedByteBuffer((short) 4,
OFFSET_COMMIT_VALUE).array(),
+
"{\"key\":{\"version\":5,\"data\":\"unknown\"},\"value\":{\"version\":4," +
+
"\"data\":{\"offset\":100,\"leaderEpoch\":10,\"metadata\":\"metadata\"," +
+ "\"commitTimestamp\":1234}}}"
+ ),
+ Arguments.of(
+ MessageUtil.toVersionPrefixedByteBuffer((short) 0,
OFFSET_COMMIT_KEY).array(),
+ MessageUtil.toVersionPrefixedByteBuffer((short) 5,
OFFSET_COMMIT_VALUE).array(),
+
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
+
+ "\"value\":{\"version\":5,\"data\":\"unknown\"}}"
+ ),
+ Arguments.of(
+ MessageUtil.toVersionPrefixedByteBuffer((short) 0,
OFFSET_COMMIT_KEY).array(),
+ null,
+
"{\"key\":{\"version\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}},"
+
+ "\"value\":null}"),
+ Arguments.of(
+ null,
+ MessageUtil.toVersionPrefixedByteBuffer((short) 1,
OFFSET_COMMIT_VALUE).array(),
+
"{\"key\":null,\"value\":{\"version\":1,\"data\":{\"offset\":100,\"metadata\":\"metadata\","
+
+
"\"commitTimestamp\":1234,\"expireTimestamp\":-1}}}"),
+ Arguments.of(null, null, "{\"key\":null,\"value\":null}"),
+ Arguments.of(
+ MessageUtil.toVersionPrefixedByteBuffer((short) 2,
GROUP_METADATA_KEY).array(),
+ MessageUtil.toVersionPrefixedByteBuffer((short) 2,
GROUP_METADATA_VALUE).array(),
+ ""
+ )
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void testTransactionLogMessageFormatter(byte[] keyBuffer, byte[]
valueBuffer, String expectedOutput) {
+ ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
+ TOPIC, 0, 0,
+ 0L, TimestampType.CREATE_TIME, 0,
+ 0, keyBuffer, valueBuffer,
+ new RecordHeaders(), Optional.empty());
+
+ try (MessageFormatter formatter = new OffsetsMessageFormatter()) {
+ formatter.configure(emptyMap());
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals(expectedOutput, out.toString());
+ }
+ }
+}