Kuan Po Tseng created KAFKA-18401:
-------------------------------------
Summary: Transaction version 2 does not support commit transaction
without records
Key: KAFKA-18401
URL: https://issues.apache.org/jira/browse/KAFKA-18401
Project: Kafka
Issue Type: Bug
Reporter: Kuan Po Tseng
Assignee: Kuan Po Tseng
This issue was observed when implementing
https://issues.apache.org/jira/browse/KAFKA-18206.
In short, under transaction version 2, it doesn't support commit transaction
without sending any records while transaction version 0 & 1 do support this
kind of scenario.
Commit transactions without sending any records is fine when using transaction
versions 0 or 1 because the producer won't send EndTxnRequest to the broker
[0]. However, with transaction version 2, the producer still sends an
EndTxnRequest to the broker while in transaction coordinator, the txn state is
still in EMPTY, resulting in an error from the broker.
This issue can be reproduced with the test in below. I'm unsure if this
behavior is expected. If it's not, one potential fix could be to follow the
approach used in TV_0 and TV_1, where the EndTxnRequest is not sent if no
partitions or offsets have been successfully added to the transaction. If this
behavior is expected, we should document it and let user know this change.
{code:java}
@ClusterTests({
@ClusterTest(brokers = 3, features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
0)}),
@ClusterTest(brokers = 3, features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
1)}),
@ClusterTest(brokers = 3, features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
2)})
})
public void testProducerEndTransaction2(ClusterInstance cluster) throws
InterruptedException {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
try (Producer<byte[], byte[]> producer1 = cluster.producer(properties))
{
producer1.initTransactions();
producer1.beginTransaction();
producer1.commitTransaction(); // In TV_2, we'll get
InvalidTxnStateException
}
}
{code}
Another test case, which is essentially the same as the previous one, starts
with a transaction that includes records, and then proceeds to start the next
transaction. When using transaction version 2, we encounter an error, but this
time it's a different error from the one seen in the previous case.
{code:java}
@ClusterTests({
@ClusterTest(brokers = 3, features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
0)}),
@ClusterTest(brokers = 3, features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
1)}),
@ClusterTest(brokers = 3, features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
2)})
})
public void testProducerEndTransaction(ClusterInstance cluster) {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
try (Producer<byte[], byte[]> producer1 = cluster.producer(properties))
{
producer1.initTransactions();
producer1.beginTransaction();
producer1.send(new ProducerRecord<>("test", "key".getBytes(),
"value".getBytes()));
producer1.commitTransaction();
producer1.beginTransaction();
producer1.commitTransaction(); // In TV_2, we'll get
ProducerFencedException
}
}
{code}
[0]:
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L857-L865]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)