[
https://issues.apache.org/jira/browse/KAFKA-18206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907678#comment-17907678
]
Kuan Po Tseng commented on KAFKA-18206:
---------------------------------------
During the investigation of the failed tests, I found that some tests (e.g.,
{{OffsetApiIntegrationTest.testResetSourceConnectorOffsetsExactlyOnceSupportEnabled}})
commit transactions without sending any records. This behavior is fine when
using transaction versions 0 or 1 because the producer won't send EndTxnRequest
to the broker [0]. However, with transaction version 2, the producer still
sends an EndTxnRequest to the broker while in transaction coordinator, the txn
state is still in EMPTY, resulting in an error from the broker.
This issue can be reproduced with the test in below. I'm unsure if this
behavior is expected. If it's not, one potential fix could be to follow the
approach used in TV_0 and TV_1, where the EndTxnRequest is not sent if no
partitions or offsets have been successfully added to the transaction. I'm
happy to work on this fix if it's acceptable.
{code:java}
@ClusterTests({
@ClusterTest(brokers = 3, features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
0)}),
@ClusterTest(brokers = 3, features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
1)}),
@ClusterTest(brokers = 3, features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
2)})
})
public void testProducerEndTransaction2(ClusterInstance cluster) throws
InterruptedException {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
try (Producer<byte[], byte[]> producer1 = cluster.producer(properties))
{
producer1.initTransactions();
producer1.beginTransaction();
producer1.commitTransaction(); // In TV_2, we'll get
InvalidTxnStateException
}
}
{code}
Another test case, which is essentially the same as the previous one, starts
with a transaction that includes records, and then proceeds to start the next
transaction. When using transaction version 2, we encounter an error, but this
time it’s a different error from the one seen in the previous case.{code:java}
@ClusterTests({
@ClusterTest(brokers = 3, features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
0)}),
@ClusterTest(brokers = 3, features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
1)}),
@ClusterTest(brokers = 3, features = {
@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
2)})
})
public void testProducerEndTransaction(ClusterInstance cluster) {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
try (Producer<byte[], byte[]> producer1 = cluster.producer(properties))
{
producer1.initTransactions();
producer1.beginTransaction();
producer1.send(new ProducerRecord<>("test", "key".getBytes(),
"value".getBytes()));
producer1.commitTransaction();
producer1.beginTransaction();
producer1.commitTransaction(); // In TV_2, we'll get
ProducerFencedException
}
}
{code}
[0]:
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L857-L865]
> EmbeddedKafkaCluster must set features
> --------------------------------------
>
> Key: KAFKA-18206
> URL: https://issues.apache.org/jira/browse/KAFKA-18206
> Project: Kafka
> Issue Type: Bug
> Components: streams, unit tests
> Reporter: David Jacot
> Assignee: Kuan Po Tseng
> Priority: Blocker
> Fix For: 4.0.0
>
>
> The EmbeddedKafkaCluster classes respectively used by Streams and Connect
> relies on
> KafkaClusterTestKit. We just found out that they do not set the features at
> all. They should.
>
> The other integration tests rely on classes wrapping KafkaClusterTestKit and
> they take care of setting the features. This is not ideal. I wonder whether
> we could push that functionality to KafkaClusterTestKit.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)