[
https://issues.apache.org/jira/browse/KAFKA-18401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Justine Olshan resolved KAFKA-18401.
------------------------------------
Resolution: Fixed
> Transaction version 2 does not support commit transaction without records
> -------------------------------------------------------------------------
>
> Key: KAFKA-18401
> URL: https://issues.apache.org/jira/browse/KAFKA-18401
> Project: Kafka
> Issue Type: Bug
> Reporter: Kuan Po Tseng
> Assignee: Kuan Po Tseng
> Priority: Blocker
> Fix For: 4.0.0
>
>
> This issue was observed when implementing
> https://issues.apache.org/jira/browse/KAFKA-18206.
> In short, under transaction version 2, it doesn't support commit transaction
> without sending any records while transaction version 0 & 1 do support this
> kind of scenario.
> Commit transactions without sending any records is fine when using
> transaction versions 0 or 1 because the producer won't send EndTxnRequest to
> the broker [0]. However, with transaction version 2, the producer still sends
> an EndTxnRequest to the broker while in transaction coordinator, the txn
> state is still in EMPTY, resulting in an error from the broker.
> This issue can be reproduced with the test in below. I'm unsure if this
> behavior is expected. If it's not, one potential fix could be to follow the
> approach used in TV_0 and TV_1, where the EndTxnRequest is not sent if no
> partitions or offsets have been successfully added to the transaction. If
> this behavior is expected, we should document it and let user know this
> change.
> {code:java}
> @ClusterTests({
> @ClusterTest(brokers = 3, features = {
> @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
> 0)}),
> @ClusterTest(brokers = 3, features = {
> @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
> 1)}),
> @ClusterTest(brokers = 3, features = {
> @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
> 2)})
> })
> public void testProducerEndTransaction2(ClusterInstance cluster) throws
> InterruptedException {
> Map<String, Object> properties = new HashMap<>();
> properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
> properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
> properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
> try (Producer<byte[], byte[]> producer1 =
> cluster.producer(properties)) {
> producer1.initTransactions();
> producer1.beginTransaction();
> producer1.commitTransaction(); // In TV_2, we'll get
> InvalidTxnStateException
> }
> }
> {code}
> Another test case, which is essentially the same as the previous one, starts
> with a transaction that includes records, and then proceeds to start the next
> transaction. When using transaction version 2, we encounter an error, but
> this time it's a different error from the one seen in the previous case.
> {code:java}
> @ClusterTests({
> @ClusterTest(brokers = 3, features = {
> @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
> 0)}),
> @ClusterTest(brokers = 3, features = {
> @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
> 1)}),
> @ClusterTest(brokers = 3, features = {
> @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
> 2)})
> })
> public void testProducerEndTransaction(ClusterInstance cluster) {
> Map<String, Object> properties = new HashMap<>();
> properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
> properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
> properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
> try (Producer<byte[], byte[]> producer1 =
> cluster.producer(properties)) {
> producer1.initTransactions();
> producer1.beginTransaction();
> producer1.send(new ProducerRecord<>("test", "key".getBytes(),
> "value".getBytes()));
> producer1.commitTransaction();
> producer1.beginTransaction();
> producer1.commitTransaction(); // In TV_2, we'll get
> ProducerFencedException
> }
> }
> {code}
>
> [0]:
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L857-L865]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)