Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on PR #16019: URL: https://github.com/apache/kafka/pull/16019#issuecomment-2249319439 @gharris1727 thanks for kind reminder. I will check it asap -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on PR #16019: URL: https://github.com/apache/kafka/pull/16019#issuecomment-2249086455 @gharris1727, Thanks for your comments, I will take a look and fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
gharris1727 commented on PR #16019: URL: https://github.com/apache/kafka/pull/16019#issuecomment-2249058093 Hi @chia7712 @m1a2st this commit is causing deterministic test failures on trunk: https://issues.apache.org/jira/browse/KAFKA-17195 PTAL, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 merged PR #16019: URL: https://github.com/apache/kafka/pull/16019 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1686501013 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java: ## @@ -364,6 +364,10 @@ private static String convertDeprecatedClass(String className) { System.err.println("WARNING: kafka.tools.NoOpMessageFormatter is deprecated and will be removed in the next major release. " + "Please use org.apache.kafka.tools.consumer.NoOpMessageFormatter instead"); return NoOpMessageFormatter.class.getName(); +case "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter": +System.err.println("WARNING: kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is deprecated and will be removed in the next major release. " + +"Please use org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead"); +return className; Review Comment: Yes, I will update the Jira description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1686481284 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java: ## @@ -364,6 +364,10 @@ private static String convertDeprecatedClass(String className) { System.err.println("WARNING: kafka.tools.NoOpMessageFormatter is deprecated and will be removed in the next major release. " + "Please use org.apache.kafka.tools.consumer.NoOpMessageFormatter instead"); return NoOpMessageFormatter.class.getName(); +case "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter": +System.err.println("WARNING: kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is deprecated and will be removed in the next major release. " + +"Please use org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead"); +return className; Review Comment: > If we don't change className to TransactionLogMessageFormatter.class.getName(), do we use implementation from scala or java? that is a good point. The replacement does not generate same output as before. The previous output is not json format, but we want to leverage the protocol json mechanism ... @m1a2st Could you please update the jira description to avoid misdirecting readers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
FrankYang0529 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1686466843 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java: ## @@ -364,6 +364,10 @@ private static String convertDeprecatedClass(String className) { System.err.println("WARNING: kafka.tools.NoOpMessageFormatter is deprecated and will be removed in the next major release. " + "Please use org.apache.kafka.tools.consumer.NoOpMessageFormatter instead"); return NoOpMessageFormatter.class.getName(); +case "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter": +System.err.println("WARNING: kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is deprecated and will be removed in the next major release. " + +"Please use org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead"); +return className; Review Comment: From JIRA description: > That is to say, `ConsoleConsumer` can accept the previous package name and then use the (java) implementation to parse and make same output. If we don't change `className` to `TransactionLogMessageFormatter.class.getName()`, do we use implementation from scala or java? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1686438402 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java: ## @@ -364,6 +364,10 @@ private static String convertDeprecatedClass(String className) { System.err.println("WARNING: kafka.tools.NoOpMessageFormatter is deprecated and will be removed in the next major release. " + "Please use org.apache.kafka.tools.consumer.NoOpMessageFormatter instead"); return NoOpMessageFormatter.class.getName(); +case "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter": +System.err.println("WARNING: kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is deprecated and will be removed in the next major release. " + +"Please use org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead"); +return className; Review Comment: If user want to use the old `TransactionLog$TransactionLogMessageFormatter`, we should let it work before we remove it, thus show the error message and let user use old formatter, I think is the good way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
FrankYang0529 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1686393344 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java: ## @@ -364,6 +364,10 @@ private static String convertDeprecatedClass(String className) { System.err.println("WARNING: kafka.tools.NoOpMessageFormatter is deprecated and will be removed in the next major release. " + "Please use org.apache.kafka.tools.consumer.NoOpMessageFormatter instead"); return NoOpMessageFormatter.class.getName(); +case "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter": +System.err.println("WARNING: kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is deprecated and will be removed in the next major release. " + +"Please use org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead"); +return className; Review Comment: If we can deprecate the old one, how about using `TransactionLogMessageFormatter.class.getName()` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on PR #16019: URL: https://github.com/apache/kafka/pull/16019#issuecomment-2242460996 @chia7712, Thanks for your comments, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1686198223 ## tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java: ## @@ -301,24 +301,30 @@ public void testTransactionLogMessageFormatter(ClusterInstance cluster) throws E }; ConsoleConsumerOptions options = new ConsoleConsumerOptions(transactionLogMessageFormatter); -ByteArrayOutputStream out = new ByteArrayOutputStream(); -PrintStream output = new PrintStream(out); -ConsoleConsumer.process(1, options.formatter(), -new ConsoleConsumer.ConsumerWrapper(options, createConsumer(cluster)), output, true); -JsonNode jsonNode = objectMapper.reader().readTree(out.toByteArray()); -JsonNode keyNode = jsonNode.get("key"); -TransactionLogKey logKey = -TransactionLogKeyJsonConverter.read(keyNode.get("data"), TransactionLogKey.HIGHEST_SUPPORTED_VERSION); -assertNotNull(logKey); -assertEquals(transactionId, logKey.transactionalId()); - -JsonNode valueNode = jsonNode.get("value"); -TransactionLogValue logValue = - TransactionLogValueJsonConverter.read(valueNode.get("data"), TransactionLogValue.HIGHEST_SUPPORTED_VERSION); -assertNotNull(logValue); -assertEquals(0, logValue.producerId()); -assertEquals(0, logValue.transactionStatus()); +try (ByteArrayOutputStream out = new ByteArrayOutputStream(); + PrintStream output = new PrintStream(out)) { + +ConsoleConsumer.ConsumerWrapper consumerWrapper = new ConsoleConsumer.ConsumerWrapper(options, createConsumer(cluster)); +ConsoleConsumer.process(1, options.formatter(), consumerWrapper, output, true); + +JsonNode jsonNode = objectMapper.reader().readTree(out.toByteArray()); +JsonNode keyNode = jsonNode.get("key"); + +TransactionLogKey logKey = + TransactionLogKeyJsonConverter.read(keyNode.get("data"), TransactionLogKey.HIGHEST_SUPPORTED_VERSION); +assertNotNull(logKey); +assertEquals(transactionId, logKey.transactionalId()); + +JsonNode valueNode = jsonNode.get("value"); +TransactionLogValue logValue = + TransactionLogValueJsonConverter.read(valueNode.get("data"), TransactionLogValue.HIGHEST_SUPPORTED_VERSION); +assertNotNull(logValue); +assertEquals(0, logValue.producerId()); +assertEquals(0, logValue.transactionStatus()); + +consumerWrapper.cleanup(); Review Comment: Please make sure `cleanup` gets called. ## tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java: ## @@ -49,8 +88,14 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@ExtendWith(value = ClusterTestExtensions.class) Review Comment: `value` is redundant. ## tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java: ## @@ -49,8 +88,14 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@ExtendWith(value = ClusterTestExtensions.class) +@Tag("integration") Review Comment: we don't need this now, since the `tag` is added automatically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1685761154 ## tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java: ## @@ -239,4 +284,73 @@ public void shouldWorkWithoutTopicOption() throws IOException { verify(mockConsumer).subscribe(any(Pattern.class)); consumer.cleanup(); } + +@ClusterTest(brokers = 3) +public void testTransactionLogMessageFormatter(ClusterInstance cluster) throws Exception { +try (Admin admin = cluster.createAdminClient()) { + +NewTopic newTopic = new NewTopic(topic, 1, (short) 1); +admin.createTopics(singleton(newTopic)); +produceMessages(cluster); + +String[] transactionLogMessageFormatter = new String[]{ +"--bootstrap-server", cluster.bootstrapServers(), +"--topic", Topic.TRANSACTION_STATE_TOPIC_NAME, +"--formatter", "org.apache.kafka.tools.consumer.TransactionLogMessageFormatter", +"--from-beginning" +}; + +ConsoleConsumerOptions options = new ConsoleConsumerOptions(transactionLogMessageFormatter); +ByteArrayOutputStream out = new ByteArrayOutputStream(); +PrintStream output = new PrintStream(out); +ConsoleConsumer.process(1, options.formatter(), Review Comment: Thanks for your comments, I will use try catch block to close these resources -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1685752276 ## tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java: ## @@ -239,4 +284,73 @@ public void shouldWorkWithoutTopicOption() throws IOException { verify(mockConsumer).subscribe(any(Pattern.class)); consumer.cleanup(); } + +@ClusterTest(brokers = 3) +public void testTransactionLogMessageFormatter(ClusterInstance cluster) throws Exception { +try (Admin admin = cluster.createAdminClient()) { + +NewTopic newTopic = new NewTopic(topic, 1, (short) 1); +admin.createTopics(singleton(newTopic)); +produceMessages(cluster); + +String[] transactionLogMessageFormatter = new String[]{ +"--bootstrap-server", cluster.bootstrapServers(), +"--topic", Topic.TRANSACTION_STATE_TOPIC_NAME, +"--formatter", "org.apache.kafka.tools.consumer.TransactionLogMessageFormatter", +"--from-beginning" +}; + +ConsoleConsumerOptions options = new ConsoleConsumerOptions(transactionLogMessageFormatter); +ByteArrayOutputStream out = new ByteArrayOutputStream(); +PrintStream output = new PrintStream(out); +ConsoleConsumer.process(1, options.formatter(), Review Comment: please call `cleanup` to close consumer ... otherwise, @FrankYang0529 will hate your PR due to thread leak -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1685482743 ## tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java: ## @@ -49,14 +87,25 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@ExtendWith(value = ClusterTestExtensions.class) +@Tag("integration") public class ConsoleConsumerTest { +private final ClusterInstance cluster; +private final String topic = "test-topic"; +private final String transactionId = "transactional-id"; +private final ObjectMapper objectMapper = new ObjectMapper(); + +public ConsoleConsumerTest(ClusterInstance cluster) { Review Comment: not all test cases use `ClusterTest`, so you can't define the `ClusterInstance` in constructor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on PR #16019: URL: https://github.com/apache/kafka/pull/16019#issuecomment-2239400247 @chia7712, PTAL, Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on PR #16019: URL: https://github.com/apache/kafka/pull/16019#issuecomment-2235242833 @m1a2st pleae fix the build error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on PR #16019: URL: https://github.com/apache/kafka/pull/16019#issuecomment-2233343772 @chia7712, Thanks for your comments, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1677223413 ## tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java: ## @@ -239,4 +296,130 @@ public void shouldWorkWithoutTopicOption() throws IOException { verify(mockConsumer).subscribe(any(Pattern.class)); consumer.cleanup(); } + +@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3) +public void testTransactionLogMessageFormatter() throws Exception { +try (Admin admin = cluster.createAdminClient()) { + +List testcases = generateTestcases(); Review Comment: This test does NOT include the scenario we do care. 1. we should verify the topic `__transaction_state` has data 2. we should verify the new formatter can parse the data of `__transaction_state` 3. we can use arbitrary data in producing transaction since all we want to check is the data in `__transaction_state` ## tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java: ## @@ -239,4 +296,130 @@ public void shouldWorkWithoutTopicOption() throws IOException { verify(mockConsumer).subscribe(any(Pattern.class)); consumer.cleanup(); } + +@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3) Review Comment: we should check all types rather than only kraft -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1676993462 ## tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerIntegrationTest.java: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singleton; +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@ExtendWith(value = ClusterTestExtensions.class) +@Tag("integration") +public class ConsoleConsumerIntegrationTest { Review Comment: Sure, I moved into `ConsoleConsumerest`, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1676990471 ## tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerIntegrationTest.java: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singleton; +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@ExtendWith(value = ClusterTestExtensions.class) +@Tag("integration") +public class ConsoleConsumerIntegrationTest { Review Comment: Could we move this IT into `ConsoleConsumerest`? the new test infra allows to merge them easily. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on PR #16019: URL: https://github.com/apache/kafka/pull/16019#issuecomment-2226738412 @chia7712, Thanks for your comments, add new transction test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on PR #16019: URL: https://github.com/apache/kafka/pull/16019#issuecomment-2220350615 @chia7712, Thanks for your comments, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1667968408 ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TransactionLogMessageFormatter implements MessageFormatter { + +private static final String VERSION = "version"; +private static final String DATA = "data"; +private static final String KEY = "key"; +private static final String VALUE = "value"; + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +ObjectNode json = new ObjectNode(JsonNodeFactory.instance); + +byte[] key = consumerRecord.key(); +if (Objects.nonNull(key)) { +short keyVersion = ByteBuffer.wrap(key).getShort(); +Optional transactionLogKey = readToTransactionLogKey(ByteBuffer.wrap(key)); +settingKeyNode(json, transactionLogKey, keyVersion); +} else { +json.put(KEY, "NULL"); Review Comment: > {"key":null,"value":null} I prefer this output, as it follows JSON rule -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1667926640 ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TransactionLogMessageFormatter implements MessageFormatter { + +private static final String VERSION = "version"; +private static final String DATA = "data"; +private static final String KEY = "key"; +private static final String VALUE = "value"; + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +ObjectNode json = new ObjectNode(JsonNodeFactory.instance); + +byte[] key = consumerRecord.key(); +if (Objects.nonNull(key)) { +short keyVersion = ByteBuffer.wrap(key).getShort(); +Optional transactionLogKey = readToTransactionLogKey(ByteBuffer.wrap(key)); +settingKeyNode(json, transactionLogKey, keyVersion); +} else { +json.put(KEY, "NULL"); Review Comment: @chia7712, Thanks for your comments, If this value is null, We should print which is better ```json {"key":"NULL","value":"NULL"} ``` or ```json {"key":null,"value":null} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1667909229 ## build.gradle: ## @@ -2106,6 +2106,8 @@ project(':tools') { implementation project(':server-common') implementation project(':connect:runtime') implementation project(':tools:tools-api') +implementation project(':transaction-coordinator') +implementation project(':group-coordinator') Review Comment: we need `:transaction-coordinator'`, but don't need `:group-coordinator'` in this PR, I will delete it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1667895877 ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TransactionLogMessageFormatter implements MessageFormatter { + +private static final String VERSION = "version"; +private static final String DATA = "data"; +private static final String KEY = "key"; +private static final String VALUE = "value"; + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +ObjectNode json = new ObjectNode(JsonNodeFactory.instance); + +byte[] key = consumerRecord.key(); +if (Objects.nonNull(key)) { +short keyVersion = ByteBuffer.wrap(key).getShort(); +Optional transactionLogKey = readToTransactionLogKey(ByteBuffer.wrap(key)); +settingKeyNode(json, transactionLogKey, keyVersion); +} else { +json.put(KEY, "NULL"); +} + +byte[] value = consumerRecord.value(); +if (Objects.nonNull(value)) { +short valueVersion = ByteBuffer.wrap(value).getShort(); +Optional transactionLogValue = readToTransactionLogValue(ByteBuffer.wrap(value)); Review Comment: Maybe we can inline those methods. for example: ```java JsonNode dataNode = readToTransactionLogValue(ByteBuffer.wrap(value)) .map(s -> TransactionLogValueJsonConverter.write(s, valueVersion)) .orElseGet(() -> new TextNode("unknown")); json.putObject(VALUE) .put(VERSION, valueVersion) .set(DATA, dataNode); ``` ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.Transaction
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on PR #16019: URL: https://github.com/apache/kafka/pull/16019#issuecomment-2212670744 @m1a2st please take a look at build error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on PR #16019: URL: https://github.com/apache/kafka/pull/16019#issuecomment-2212117359 Thank you @chia7712 for the reminder, rebased. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on PR #16019: URL: https://github.com/apache/kafka/pull/16019#issuecomment-2211183788 @m1a2st could you please rebase code ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1651048314 ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TransactionLogMessageFormatter implements MessageFormatter { + +private static final String VERSION = "version"; +private static final String DATA = "data"; + + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +Optional.ofNullable(consumerRecord.key()) +.ifPresent(key -> { +short keyVersion = ByteBuffer.wrap(key).getShort(); Review Comment: please fix the indent size ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TransactionLogMessageFormatter implements MessageFormatter { + +private static final String VERSION = "version"; +private static final String DATA = "data"; + + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +Optional.ofNullable(consumerRecord.key()) +.ifPresent(key -> { +short keyVersion = ByteBuffer.wrap(key).getShort(); +byte[] value = consumerRecord.value(); +short valueVersion = ByteBuffer.wrap(value).getShort(); + +TransactionLogKey transactionLogKey = readToTransactio
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1650961363 ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TransactionLogMessageFormatter implements MessageFormatter { + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +Optional.ofNullable(consumerRecord.key()) +.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key))) +.ifPresent(transactionLogKey -> { +short version = ByteBuffer.wrap(consumerRecord.key()).getShort(); +ObjectNode json = new ObjectNode(JsonNodeFactory.instance); +json.set("version", new TextNode(Short.toString(version))); + +if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) { +byte[] value = consumerRecord.value(); +TransactionLogValue transactionLogValue = +new TransactionLogValue(new ByteBufferAccessor(ByteBuffer.wrap(value)), version); +JsonNode jsonNode = TransactionLogValueJsonConverter.write(transactionLogValue, version); +json.set("transactionalId", new TextNode(transactionLogKey.transactionalId())); +json.set("data", jsonNode); +} else { +json.set("data", new TextNode("unknown")); +} +try { +output.write(json.toString().getBytes(UTF_8)); +} catch (IOException e) { +throw new RuntimeException(e); +} +}); +} + +private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) { +short version = byteBuffer.getShort(); +if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) { +return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), version); +} else { +return new TransactionLogKey(); Review Comment: That is good and please add tests for it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1650898140 ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TransactionLogMessageFormatter implements MessageFormatter { + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +Optional.ofNullable(consumerRecord.key()) +.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key))) +.ifPresent(transactionLogKey -> { +short version = ByteBuffer.wrap(consumerRecord.key()).getShort(); +ObjectNode json = new ObjectNode(JsonNodeFactory.instance); +json.set("version", new TextNode(Short.toString(version))); + +if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) { +byte[] value = consumerRecord.value(); +TransactionLogValue transactionLogValue = +new TransactionLogValue(new ByteBufferAccessor(ByteBuffer.wrap(value)), version); +JsonNode jsonNode = TransactionLogValueJsonConverter.write(transactionLogValue, version); +json.set("transactionalId", new TextNode(transactionLogKey.transactionalId())); +json.set("data", jsonNode); +} else { +json.set("data", new TextNode("unknown")); +} +try { +output.write(json.toString().getBytes(UTF_8)); +} catch (IOException e) { +throw new RuntimeException(e); +} +}); +} + +private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) { +short version = byteBuffer.getShort(); +if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) { +return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), version); +} else { +return new TransactionLogKey(); Review Comment: I test the formatter, and the result is ```json { "key": { "version": "0", "data": { "transactionalId": "TXNID" } }, "value": { "version": "1", "data": { "producerId": 100, "producerEpoch": 50, "transactionTimeoutMs": 500, "transactionStatus": 4, "transactionPartitions": [], "transactionLastUpdateTimestampMs": 1000, "transactionStartTimestampMs": 750 } } } ``` ```json { "key": { "version": "10", "data": "unknown" }, "value": { "version": "10", "data": "unknown" } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this servic
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1650424408 ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TransactionLogMessageFormatter implements MessageFormatter { + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +Optional.ofNullable(consumerRecord.key()) +.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key))) +.ifPresent(transactionLogKey -> { +short version = ByteBuffer.wrap(consumerRecord.key()).getShort(); +ObjectNode json = new ObjectNode(JsonNodeFactory.instance); +json.set("version", new TextNode(Short.toString(version))); + +if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) { +byte[] value = consumerRecord.value(); +TransactionLogValue transactionLogValue = +new TransactionLogValue(new ByteBufferAccessor(ByteBuffer.wrap(value)), version); +JsonNode jsonNode = TransactionLogValueJsonConverter.write(transactionLogValue, version); +json.set("transactionalId", new TextNode(transactionLogKey.transactionalId())); +json.set("data", jsonNode); +} else { +json.set("data", new TextNode("unknown")); +} +try { +output.write(json.toString().getBytes(UTF_8)); +} catch (IOException e) { +throw new RuntimeException(e); +} +}); +} + +private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) { +short version = byteBuffer.getShort(); +if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) { +return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), version); +} else { +return new TransactionLogKey(); Review Comment: key and value could have different version, so it would be better to separate them into different item. For example: ```json { "key": { "version": 1, "data": xxx }, "value": { "version": 1, "data": xxx } } ``` if we can't parse the byte array due to version unmatched, the "data" will be "unknown" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1650415290 ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TransactionLogMessageFormatter implements MessageFormatter { + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +Optional.ofNullable(consumerRecord.key()) +.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key))) +.ifPresent(transactionLogKey -> { +short version = ByteBuffer.wrap(consumerRecord.key()).getShort(); +ObjectNode json = new ObjectNode(JsonNodeFactory.instance); +json.set("version", new TextNode(Short.toString(version))); + +if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) { +byte[] value = consumerRecord.value(); +TransactionLogValue transactionLogValue = +new TransactionLogValue(new ByteBufferAccessor(ByteBuffer.wrap(value)), version); +JsonNode jsonNode = TransactionLogValueJsonConverter.write(transactionLogValue, version); +json.set("transactionalId", new TextNode(transactionLogKey.transactionalId())); +json.set("data", jsonNode); +} else { +json.set("data", new TextNode("unknown")); +} +try { +output.write(json.toString().getBytes(UTF_8)); +} catch (IOException e) { +throw new RuntimeException(e); +} +}); +} + +private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) { +short version = byteBuffer.getShort(); +if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) { +return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), version); +} else { +return new TransactionLogKey(); Review Comment: @chia7712, Thanks for your comments, If the version not in `TransactionLogValue` or `TransactionLogKey`, we should show the message ``` { "version": 1, "data": "unknown" } ``` Only the version conform in `TransactionLogValue` and `TransactionLogKey`, and I will show ``` { "version": 1, "transactionalId": "", "data": "XXX" } ``` Have I misunderstood anything? Thanks for your patient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
m1a2st commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1650415290 ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TransactionLogMessageFormatter implements MessageFormatter { + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +Optional.ofNullable(consumerRecord.key()) +.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key))) +.ifPresent(transactionLogKey -> { +short version = ByteBuffer.wrap(consumerRecord.key()).getShort(); +ObjectNode json = new ObjectNode(JsonNodeFactory.instance); +json.set("version", new TextNode(Short.toString(version))); + +if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) { +byte[] value = consumerRecord.value(); +TransactionLogValue transactionLogValue = +new TransactionLogValue(new ByteBufferAccessor(ByteBuffer.wrap(value)), version); +JsonNode jsonNode = TransactionLogValueJsonConverter.write(transactionLogValue, version); +json.set("transactionalId", new TextNode(transactionLogKey.transactionalId())); +json.set("data", jsonNode); +} else { +json.set("data", new TextNode("unknown")); +} +try { +output.write(json.toString().getBytes(UTF_8)); +} catch (IOException e) { +throw new RuntimeException(e); +} +}); +} + +private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) { +short version = byteBuffer.getShort(); +if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) { +return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), version); +} else { +return new TransactionLogKey(); Review Comment: @chia7712, Thanks for your comments, If the version not in `TransactionLogValue` or `TransactionLogKey`, we should show the message ``` { "version": 1, "data": "unknown" } ``` Only the version conform in `TransactionLogValue` and `TransactionLogKey`, and I will show ``` { "version": 1, "transactionalId": "", "data": "XXX" } Have I misunderstood anything? Thanks for your patient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1650399876 ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TransactionLogMessageFormatter implements MessageFormatter { + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +Optional.ofNullable(consumerRecord.key()) +.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key))) +.ifPresent(transactionLogKey -> { +short version = ByteBuffer.wrap(consumerRecord.key()).getShort(); +ObjectNode json = new ObjectNode(JsonNodeFactory.instance); +json.set("version", new TextNode(Short.toString(version))); + +if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) { +byte[] value = consumerRecord.value(); +TransactionLogValue transactionLogValue = +new TransactionLogValue(new ByteBufferAccessor(ByteBuffer.wrap(value)), version); +JsonNode jsonNode = TransactionLogValueJsonConverter.write(transactionLogValue, version); +json.set("transactionalId", new TextNode(transactionLogKey.transactionalId())); +json.set("data", jsonNode); +} else { +json.set("data", new TextNode("unknown")); +} +try { +output.write(json.toString().getBytes(UTF_8)); +} catch (IOException e) { +throw new RuntimeException(e); +} +}); +} + +private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) { +short version = byteBuffer.getShort(); +if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) { +return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), version); +} else { +return new TransactionLogKey(); Review Comment: not sure whether I catch your point. I feel that we should not show the content if the version is not supported by current code. For example, users use the "old" code to parse "new" data. In that case, we should display the "unknown" instead of "default value". That rule should be applied to both key and value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1650085659 ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TransactionLogMessageFormatter implements MessageFormatter { + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +Optional.ofNullable(consumerRecord.key()) +.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key))) +.ifPresent(transactionLogKey -> { +short version = ByteBuffer.wrap(consumerRecord.key()).getShort(); +ObjectNode json = new ObjectNode(JsonNodeFactory.instance); +json.set("version", new TextNode(Short.toString(version))); + +if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) { +byte[] value = consumerRecord.value(); Review Comment: Please handle the null and unmatched version. key and value has their version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org