Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-24 Thread via GitHub


chia7712 commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2249319439

   @gharris1727 thanks for kind reminder. I will check it asap


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-24 Thread via GitHub


m1a2st commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2249086455

   @gharris1727, Thanks for your comments, I will take a look and fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-24 Thread via GitHub


gharris1727 commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2249058093

   Hi @chia7712 @m1a2st this commit is causing deterministic test failures on 
trunk: https://issues.apache.org/jira/browse/KAFKA-17195 PTAL, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-24 Thread via GitHub


chia7712 merged PR #16019:
URL: https://github.com/apache/kafka/pull/16019


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-22 Thread via GitHub


m1a2st commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1686501013


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -364,6 +364,10 @@ private static String convertDeprecatedClass(String 
className) {
 System.err.println("WARNING: kafka.tools.NoOpMessageFormatter 
is deprecated and will be removed in the next major release. " +
 "Please use 
org.apache.kafka.tools.consumer.NoOpMessageFormatter instead");
 return NoOpMessageFormatter.class.getName();
+case 
"kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter":
+System.err.println("WARNING: 
kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is 
deprecated and will be removed in the next major release. " +
+"Please use 
org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead");
+return className;

Review Comment:
   Yes, I will update the Jira description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-22 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1686481284


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -364,6 +364,10 @@ private static String convertDeprecatedClass(String 
className) {
 System.err.println("WARNING: kafka.tools.NoOpMessageFormatter 
is deprecated and will be removed in the next major release. " +
 "Please use 
org.apache.kafka.tools.consumer.NoOpMessageFormatter instead");
 return NoOpMessageFormatter.class.getName();
+case 
"kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter":
+System.err.println("WARNING: 
kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is 
deprecated and will be removed in the next major release. " +
+"Please use 
org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead");
+return className;

Review Comment:
   > If we don't change className to 
TransactionLogMessageFormatter.class.getName(), do we use implementation from 
scala or java?
   
   that is a good point. The replacement does not generate same output as 
before. The previous output is not json format, but we want to leverage the 
protocol json mechanism ...
   
   @m1a2st Could you please update the jira description to avoid misdirecting 
readers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-22 Thread via GitHub


FrankYang0529 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1686466843


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -364,6 +364,10 @@ private static String convertDeprecatedClass(String 
className) {
 System.err.println("WARNING: kafka.tools.NoOpMessageFormatter 
is deprecated and will be removed in the next major release. " +
 "Please use 
org.apache.kafka.tools.consumer.NoOpMessageFormatter instead");
 return NoOpMessageFormatter.class.getName();
+case 
"kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter":
+System.err.println("WARNING: 
kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is 
deprecated and will be removed in the next major release. " +
+"Please use 
org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead");
+return className;

Review Comment:
   From JIRA description:
   > That is to say, `ConsoleConsumer` can accept the previous package name and 
then use the (java) implementation to parse and make same output.
   
   If we don't change `className` to 
`TransactionLogMessageFormatter.class.getName()`, do we use implementation from 
scala or java?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-22 Thread via GitHub


m1a2st commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1686438402


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -364,6 +364,10 @@ private static String convertDeprecatedClass(String 
className) {
 System.err.println("WARNING: kafka.tools.NoOpMessageFormatter 
is deprecated and will be removed in the next major release. " +
 "Please use 
org.apache.kafka.tools.consumer.NoOpMessageFormatter instead");
 return NoOpMessageFormatter.class.getName();
+case 
"kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter":
+System.err.println("WARNING: 
kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is 
deprecated and will be removed in the next major release. " +
+"Please use 
org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead");
+return className;

Review Comment:
   If user want to use the old `TransactionLog$TransactionLogMessageFormatter`, 
we should let it work before we remove it, thus show the error message and let 
user use old formatter, I think is the good way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-22 Thread via GitHub


FrankYang0529 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1686393344


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -364,6 +364,10 @@ private static String convertDeprecatedClass(String 
className) {
 System.err.println("WARNING: kafka.tools.NoOpMessageFormatter 
is deprecated and will be removed in the next major release. " +
 "Please use 
org.apache.kafka.tools.consumer.NoOpMessageFormatter instead");
 return NoOpMessageFormatter.class.getName();
+case 
"kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter":
+System.err.println("WARNING: 
kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is 
deprecated and will be removed in the next major release. " +
+"Please use 
org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead");
+return className;

Review Comment:
   If we can deprecate the old one, how about using 
`TransactionLogMessageFormatter.class.getName()` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-22 Thread via GitHub


m1a2st commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2242460996

   @chia7712, Thanks for your comments, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-22 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1686198223


##
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java:
##
@@ -301,24 +301,30 @@ public void 
testTransactionLogMessageFormatter(ClusterInstance cluster) throws E
 };
 
 ConsoleConsumerOptions options = new 
ConsoleConsumerOptions(transactionLogMessageFormatter);
-ByteArrayOutputStream out = new ByteArrayOutputStream();
-PrintStream output = new PrintStream(out);
-ConsoleConsumer.process(1, options.formatter(),
-new ConsoleConsumer.ConsumerWrapper(options, 
createConsumer(cluster)), output, true);
-JsonNode jsonNode = 
objectMapper.reader().readTree(out.toByteArray());
-JsonNode keyNode = jsonNode.get("key");
 
-TransactionLogKey logKey =
-TransactionLogKeyJsonConverter.read(keyNode.get("data"), 
TransactionLogKey.HIGHEST_SUPPORTED_VERSION);
-assertNotNull(logKey);
-assertEquals(transactionId, logKey.transactionalId());
-
-JsonNode valueNode = jsonNode.get("value");
-TransactionLogValue logValue =
-
TransactionLogValueJsonConverter.read(valueNode.get("data"), 
TransactionLogValue.HIGHEST_SUPPORTED_VERSION);
-assertNotNull(logValue);
-assertEquals(0, logValue.producerId());
-assertEquals(0, logValue.transactionStatus());
+try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+ PrintStream output = new PrintStream(out)) {
+
+ConsoleConsumer.ConsumerWrapper consumerWrapper = new 
ConsoleConsumer.ConsumerWrapper(options, createConsumer(cluster));
+ConsoleConsumer.process(1, options.formatter(), 
consumerWrapper, output, true);
+
+JsonNode jsonNode = 
objectMapper.reader().readTree(out.toByteArray());
+JsonNode keyNode = jsonNode.get("key");
+
+TransactionLogKey logKey =
+
TransactionLogKeyJsonConverter.read(keyNode.get("data"), 
TransactionLogKey.HIGHEST_SUPPORTED_VERSION);
+assertNotNull(logKey);
+assertEquals(transactionId, logKey.transactionalId());
+
+JsonNode valueNode = jsonNode.get("value");
+TransactionLogValue logValue =
+
TransactionLogValueJsonConverter.read(valueNode.get("data"), 
TransactionLogValue.HIGHEST_SUPPORTED_VERSION);
+assertNotNull(logValue);
+assertEquals(0, logValue.producerId());
+assertEquals(0, logValue.transactionStatus());
+
+consumerWrapper.cleanup();

Review Comment:
   Please make sure `cleanup` gets called.



##
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java:
##
@@ -49,8 +88,14 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+@ExtendWith(value = ClusterTestExtensions.class)

Review Comment:
   `value` is redundant.



##
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java:
##
@@ -49,8 +88,14 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+@ExtendWith(value = ClusterTestExtensions.class)
+@Tag("integration")

Review Comment:
   we don't need this now, since the `tag` is added automatically. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-21 Thread via GitHub


m1a2st commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1685761154


##
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java:
##
@@ -239,4 +284,73 @@ public void shouldWorkWithoutTopicOption() throws 
IOException {
 verify(mockConsumer).subscribe(any(Pattern.class));
 consumer.cleanup();
 }
+
+@ClusterTest(brokers = 3)
+public void testTransactionLogMessageFormatter(ClusterInstance cluster) 
throws Exception {
+try (Admin admin = cluster.createAdminClient()) {
+
+NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
+admin.createTopics(singleton(newTopic));
+produceMessages(cluster);
+
+String[] transactionLogMessageFormatter = new String[]{
+"--bootstrap-server", cluster.bootstrapServers(),
+"--topic", Topic.TRANSACTION_STATE_TOPIC_NAME,
+"--formatter", 
"org.apache.kafka.tools.consumer.TransactionLogMessageFormatter",
+"--from-beginning"
+};
+
+ConsoleConsumerOptions options = new 
ConsoleConsumerOptions(transactionLogMessageFormatter);
+ByteArrayOutputStream out = new ByteArrayOutputStream();
+PrintStream output = new PrintStream(out);
+ConsoleConsumer.process(1, options.formatter(),

Review Comment:
   Thanks for your comments, I will use try catch block to close these resources



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-21 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1685752276


##
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java:
##
@@ -239,4 +284,73 @@ public void shouldWorkWithoutTopicOption() throws 
IOException {
 verify(mockConsumer).subscribe(any(Pattern.class));
 consumer.cleanup();
 }
+
+@ClusterTest(brokers = 3)
+public void testTransactionLogMessageFormatter(ClusterInstance cluster) 
throws Exception {
+try (Admin admin = cluster.createAdminClient()) {
+
+NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
+admin.createTopics(singleton(newTopic));
+produceMessages(cluster);
+
+String[] transactionLogMessageFormatter = new String[]{
+"--bootstrap-server", cluster.bootstrapServers(),
+"--topic", Topic.TRANSACTION_STATE_TOPIC_NAME,
+"--formatter", 
"org.apache.kafka.tools.consumer.TransactionLogMessageFormatter",
+"--from-beginning"
+};
+
+ConsoleConsumerOptions options = new 
ConsoleConsumerOptions(transactionLogMessageFormatter);
+ByteArrayOutputStream out = new ByteArrayOutputStream();
+PrintStream output = new PrintStream(out);
+ConsoleConsumer.process(1, options.formatter(),

Review Comment:
   please call `cleanup` to close consumer ... otherwise, @FrankYang0529 will 
hate your PR due to thread leak



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-20 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1685482743


##
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java:
##
@@ -49,14 +87,25 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+@ExtendWith(value = ClusterTestExtensions.class)
+@Tag("integration")
 public class ConsoleConsumerTest {
 
+private final ClusterInstance cluster;
+private final String topic = "test-topic";
+private final String transactionId = "transactional-id";
+private final ObjectMapper objectMapper = new ObjectMapper();
+
+public ConsoleConsumerTest(ClusterInstance cluster) {

Review Comment:
   not all test cases use `ClusterTest`, so you can't define the 
`ClusterInstance` in constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-19 Thread via GitHub


m1a2st commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2239400247

   @chia7712, PTAL, Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-17 Thread via GitHub


chia7712 commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2235242833

   @m1a2st pleae fix the build error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-17 Thread via GitHub


m1a2st commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2233343772

   @chia7712, Thanks for your comments, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-14 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1677223413


##
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java:
##
@@ -239,4 +296,130 @@ public void shouldWorkWithoutTopicOption() throws 
IOException {
 verify(mockConsumer).subscribe(any(Pattern.class));
 consumer.cleanup();
 }
+
+@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3)
+public void testTransactionLogMessageFormatter() throws Exception {
+try (Admin admin = cluster.createAdminClient()) {
+
+List testcases = generateTestcases();

Review Comment:
   This test does NOT include the scenario we do care.
   
   1. we should verify the topic `__transaction_state` has data
   2. we should verify the new formatter can parse the data of 
`__transaction_state`
   3. we can use arbitrary data in producing transaction since all we want to 
check is the data in `__transaction_state`



##
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java:
##
@@ -239,4 +296,130 @@ public void shouldWorkWithoutTopicOption() throws 
IOException {
 verify(mockConsumer).subscribe(any(Pattern.class));
 consumer.cleanup();
 }
+
+@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3)

Review Comment:
   we should check all types rather than only kraft



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-13 Thread via GitHub


m1a2st commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1676993462


##
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerIntegrationTest.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singleton;
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@Tag("integration")
+public class ConsoleConsumerIntegrationTest {

Review Comment:
   Sure, I moved into `ConsoleConsumerest`, PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-13 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1676990471


##
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerIntegrationTest.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singleton;
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@Tag("integration")
+public class ConsoleConsumerIntegrationTest {

Review Comment:
   Could we move this IT into `ConsoleConsumerest`? the new test infra allows 
to merge them easily. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-12 Thread via GitHub


m1a2st commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2226738412

   @chia7712, Thanks for your comments, add new transction test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-10 Thread via GitHub


m1a2st commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2220350615

   @chia7712, Thanks for your comments, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-07 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1667968408


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+private static final String VERSION = "version";
+private static final String DATA = "data";
+private static final String KEY = "key";
+private static final String VALUE = "value";
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+
+byte[] key = consumerRecord.key();
+if (Objects.nonNull(key)) {
+short keyVersion = ByteBuffer.wrap(key).getShort();
+Optional transactionLogKey = 
readToTransactionLogKey(ByteBuffer.wrap(key));
+settingKeyNode(json, transactionLogKey, keyVersion);
+} else {
+json.put(KEY, "NULL");

Review Comment:
   > {"key":null,"value":null}
   
   I prefer this output, as it follows JSON rule



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-07 Thread via GitHub


m1a2st commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1667926640


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+private static final String VERSION = "version";
+private static final String DATA = "data";
+private static final String KEY = "key";
+private static final String VALUE = "value";
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+
+byte[] key = consumerRecord.key();
+if (Objects.nonNull(key)) {
+short keyVersion = ByteBuffer.wrap(key).getShort();
+Optional transactionLogKey = 
readToTransactionLogKey(ByteBuffer.wrap(key));
+settingKeyNode(json, transactionLogKey, keyVersion);
+} else {
+json.put(KEY, "NULL");

Review Comment:
   @chia7712, Thanks for your comments, If this value is null, We should print 
which is better
   ```json
   {"key":"NULL","value":"NULL"}
   ```
   or
   ```json
   {"key":null,"value":null}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-07 Thread via GitHub


m1a2st commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1667909229


##
build.gradle:
##
@@ -2106,6 +2106,8 @@ project(':tools') {
 implementation project(':server-common')
 implementation project(':connect:runtime')
 implementation project(':tools:tools-api')
+implementation project(':transaction-coordinator')
+implementation project(':group-coordinator')

Review Comment:
   we need `:transaction-coordinator'`, but don't need `:group-coordinator'` in 
this PR, I will delete it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-07 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1667895877


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+private static final String VERSION = "version";
+private static final String DATA = "data";
+private static final String KEY = "key";
+private static final String VALUE = "value";
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+
+byte[] key = consumerRecord.key();
+if (Objects.nonNull(key)) {
+short keyVersion = ByteBuffer.wrap(key).getShort();
+Optional transactionLogKey = 
readToTransactionLogKey(ByteBuffer.wrap(key));
+settingKeyNode(json, transactionLogKey, keyVersion);
+} else {
+json.put(KEY, "NULL");
+}
+
+byte[] value = consumerRecord.value();
+if (Objects.nonNull(value)) {
+short valueVersion = ByteBuffer.wrap(value).getShort();
+Optional transactionLogValue = 
readToTransactionLogValue(ByteBuffer.wrap(value));

Review Comment:
   Maybe we can inline those methods. for example:
   ```java
   JsonNode dataNode = 
readToTransactionLogValue(ByteBuffer.wrap(value))
   .map(s -> TransactionLogValueJsonConverter.write(s, 
valueVersion))
   .orElseGet(() -> new TextNode("unknown"));
   json.putObject(VALUE)
   .put(VERSION, valueVersion)
   .set(DATA, dataNode);
   ```



##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.Transaction

Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-07 Thread via GitHub


chia7712 commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2212670744

   @m1a2st please take a look at build error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-06 Thread via GitHub


m1a2st commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2212117359

   Thank you @chia7712 for the reminder, rebased.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-05 Thread via GitHub


chia7712 commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2211183788

   @m1a2st could you please rebase code ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-06-24 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1651048314


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+private static final String VERSION = "version";
+private static final String DATA = "data";
+
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.ifPresent(key -> {
+short keyVersion = ByteBuffer.wrap(key).getShort();

Review Comment:
   please fix the indent size



##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+private static final String VERSION = "version";
+private static final String DATA = "data";
+
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.ifPresent(key -> {
+short keyVersion = ByteBuffer.wrap(key).getShort();
+byte[] value = consumerRecord.value();
+short valueVersion = ByteBuffer.wrap(value).getShort();
+
+TransactionLogKey transactionLogKey = 
readToTransactio

Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-06-24 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1650961363


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key)))
+.ifPresent(transactionLogKey -> {
+short version = 
ByteBuffer.wrap(consumerRecord.key()).getShort();
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+json.set("version", new TextNode(Short.toString(version)));
+
+if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
+&& version <= 
TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
+byte[] value = consumerRecord.value();
+TransactionLogValue transactionLogValue = 
+new TransactionLogValue(new 
ByteBufferAccessor(ByteBuffer.wrap(value)), version);
+JsonNode jsonNode = 
TransactionLogValueJsonConverter.write(transactionLogValue, version);
+json.set("transactionalId", new 
TextNode(transactionLogKey.transactionalId()));
+json.set("data", jsonNode);
+} else {
+json.set("data", new TextNode("unknown"));
+}
+try {
+output.write(json.toString().getBytes(UTF_8));
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+});
+}
+
+private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) {
+short version = byteBuffer.getShort();
+if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
+&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
+return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), 
version);
+} else {
+return new TransactionLogKey();

Review Comment:
   That is good and please add tests for it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-06-24 Thread via GitHub


m1a2st commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1650898140


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key)))
+.ifPresent(transactionLogKey -> {
+short version = 
ByteBuffer.wrap(consumerRecord.key()).getShort();
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+json.set("version", new TextNode(Short.toString(version)));
+
+if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
+&& version <= 
TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
+byte[] value = consumerRecord.value();
+TransactionLogValue transactionLogValue = 
+new TransactionLogValue(new 
ByteBufferAccessor(ByteBuffer.wrap(value)), version);
+JsonNode jsonNode = 
TransactionLogValueJsonConverter.write(transactionLogValue, version);
+json.set("transactionalId", new 
TextNode(transactionLogKey.transactionalId()));
+json.set("data", jsonNode);
+} else {
+json.set("data", new TextNode("unknown"));
+}
+try {
+output.write(json.toString().getBytes(UTF_8));
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+});
+}
+
+private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) {
+short version = byteBuffer.getShort();
+if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
+&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
+return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), 
version);
+} else {
+return new TransactionLogKey();

Review Comment:
   I test the formatter, and the result is 
   ```json
   {
 "key": {
   "version": "0",
   "data": {
 "transactionalId": "TXNID"
   }
 },
 "value": {
   "version": "1",
   "data": {
 "producerId": 100,
 "producerEpoch": 50,
 "transactionTimeoutMs": 500,
 "transactionStatus": 4,
 "transactionPartitions": [],
 "transactionLastUpdateTimestampMs": 1000,
 "transactionStartTimestampMs": 750
   }
 }
   }
   ```
   ```json
   {
 "key": {
   "version": "10",
   "data": "unknown"
 },
 "value": {
   "version": "10",
   "data": "unknown"
 }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this servic

Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-06-23 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1650424408


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key)))
+.ifPresent(transactionLogKey -> {
+short version = 
ByteBuffer.wrap(consumerRecord.key()).getShort();
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+json.set("version", new TextNode(Short.toString(version)));
+
+if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
+&& version <= 
TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
+byte[] value = consumerRecord.value();
+TransactionLogValue transactionLogValue = 
+new TransactionLogValue(new 
ByteBufferAccessor(ByteBuffer.wrap(value)), version);
+JsonNode jsonNode = 
TransactionLogValueJsonConverter.write(transactionLogValue, version);
+json.set("transactionalId", new 
TextNode(transactionLogKey.transactionalId()));
+json.set("data", jsonNode);
+} else {
+json.set("data", new TextNode("unknown"));
+}
+try {
+output.write(json.toString().getBytes(UTF_8));
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+});
+}
+
+private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) {
+short version = byteBuffer.getShort();
+if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
+&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
+return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), 
version);
+} else {
+return new TransactionLogKey();

Review Comment:
   key and value could have different version, so it would be better to 
separate them into different item. For example:
   ```json
   {
 "key": {
   "version": 1,
   "data": xxx
 },
 "value": {
   "version": 1,
   "data": xxx
 }
   }
   ```
   
   if we can't parse the byte array due to version unmatched, the "data" will 
be "unknown"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-06-23 Thread via GitHub


m1a2st commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1650415290


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key)))
+.ifPresent(transactionLogKey -> {
+short version = 
ByteBuffer.wrap(consumerRecord.key()).getShort();
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+json.set("version", new TextNode(Short.toString(version)));
+
+if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
+&& version <= 
TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
+byte[] value = consumerRecord.value();
+TransactionLogValue transactionLogValue = 
+new TransactionLogValue(new 
ByteBufferAccessor(ByteBuffer.wrap(value)), version);
+JsonNode jsonNode = 
TransactionLogValueJsonConverter.write(transactionLogValue, version);
+json.set("transactionalId", new 
TextNode(transactionLogKey.transactionalId()));
+json.set("data", jsonNode);
+} else {
+json.set("data", new TextNode("unknown"));
+}
+try {
+output.write(json.toString().getBytes(UTF_8));
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+});
+}
+
+private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) {
+short version = byteBuffer.getShort();
+if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
+&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
+return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), 
version);
+} else {
+return new TransactionLogKey();

Review Comment:
   @chia7712, Thanks for your comments, If the version not in 
`TransactionLogValue` or `TransactionLogKey`, we should show the message
   ```
   {
   "version": 1,
   "data": "unknown"
   }
   ```
   Only the version conform in `TransactionLogValue` and `TransactionLogKey`, 
and I will show 
   ```
   {
   "version": 1,
   "transactionalId": "",
   "data": "XXX"
   }
   ```
   Have I misunderstood anything? Thanks for your patient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-06-23 Thread via GitHub


m1a2st commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1650415290


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key)))
+.ifPresent(transactionLogKey -> {
+short version = 
ByteBuffer.wrap(consumerRecord.key()).getShort();
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+json.set("version", new TextNode(Short.toString(version)));
+
+if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
+&& version <= 
TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
+byte[] value = consumerRecord.value();
+TransactionLogValue transactionLogValue = 
+new TransactionLogValue(new 
ByteBufferAccessor(ByteBuffer.wrap(value)), version);
+JsonNode jsonNode = 
TransactionLogValueJsonConverter.write(transactionLogValue, version);
+json.set("transactionalId", new 
TextNode(transactionLogKey.transactionalId()));
+json.set("data", jsonNode);
+} else {
+json.set("data", new TextNode("unknown"));
+}
+try {
+output.write(json.toString().getBytes(UTF_8));
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+});
+}
+
+private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) {
+short version = byteBuffer.getShort();
+if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
+&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
+return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), 
version);
+} else {
+return new TransactionLogKey();

Review Comment:
   @chia7712, Thanks for your comments, If the version not in 
`TransactionLogValue` or `TransactionLogKey`, we should show the message
   ```
   {
   "version": 1,
   "data": "unknown"
   }
   ```
   Only the version conform in `TransactionLogValue` and `TransactionLogKey`, 
and I will show 
   ```
   {
   "version": 1,
   "transactionalId": "",
   "data": "XXX"
   }
   Have I misunderstood anything? Thanks for your patient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-06-23 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1650399876


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key)))
+.ifPresent(transactionLogKey -> {
+short version = 
ByteBuffer.wrap(consumerRecord.key()).getShort();
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+json.set("version", new TextNode(Short.toString(version)));
+
+if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
+&& version <= 
TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
+byte[] value = consumerRecord.value();
+TransactionLogValue transactionLogValue = 
+new TransactionLogValue(new 
ByteBufferAccessor(ByteBuffer.wrap(value)), version);
+JsonNode jsonNode = 
TransactionLogValueJsonConverter.write(transactionLogValue, version);
+json.set("transactionalId", new 
TextNode(transactionLogKey.transactionalId()));
+json.set("data", jsonNode);
+} else {
+json.set("data", new TextNode("unknown"));
+}
+try {
+output.write(json.toString().getBytes(UTF_8));
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+});
+}
+
+private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) {
+short version = byteBuffer.getShort();
+if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
+&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
+return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), 
version);
+} else {
+return new TransactionLogKey();

Review Comment:
   not sure whether I catch your point. I feel that we should not show the 
content if the version is not supported by current code. For example, users use 
the "old" code to parse "new" data. In that case, we should display the 
"unknown" instead of "default value". That rule should be applied to both key 
and value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-06-23 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1650085659


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key)))
+.ifPresent(transactionLogKey -> {
+short version = 
ByteBuffer.wrap(consumerRecord.key()).getShort();
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+json.set("version", new TextNode(Short.toString(version)));
+
+if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
+&& version <= 
TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
+byte[] value = consumerRecord.value();

Review Comment:
   Please handle the null and unmatched version. key and value has their version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org