brandboat commented on code in PR #18448:
URL: https://github.com/apache/kafka/pull/18448#discussion_r1919514635
##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala:
##########
@@ -85,6 +88,50 @@ class ProducerIntegrationTest {
} finally if (producer != null) producer.close()
}
+ @ClusterTests(Array(
+ new ClusterTest(
+ features = Array(
+ new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
0))),
+ new ClusterTest(
+ features = Array(
+ new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
1))),
+ new ClusterTest(
+ features = Array(
+ new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
2))),
+ ))
+ def testTransactionWithInvalidSend(cluster: ClusterInstance): Unit = {
+ val topic = new NewTopic("foobar", 1,
1.toShort).configs(Collections.singletonMap(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,
"1"))
+ val admin = cluster.admin()
+ var txnVersion: Short = 0
+ try {
+ txnVersion =
Option(admin.describeFeatures().featureMetadata().get().finalizedFeatures().get(Feature.TRANSACTION_VERSION))
+ .map(finalizedFeatures => finalizedFeatures.maxVersionLevel())
+ .getOrElse(0)
+ admin.createTopics(List(topic).asJava)
+ } finally if (admin != null) admin.close()
+
+ val properties = new util.HashMap[String, Object]
+ properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar")
+ properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
+ properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+
+ val producer: Producer[Array[Byte], Array[Byte]] =
cluster.producer(properties)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ assertInstanceOf(classOf[RecordTooLargeException],
+ assertThrows(classOf[ExecutionException],
+ () => producer.send(new ProducerRecord[Array[Byte],
Array[Byte]](topic.name(), "key".getBytes, "value".getBytes)).get()).getCause)
+
+ val commitError = assertThrows(classOf[KafkaException], () =>
producer.commitTransaction()) // fail due to last send failed
+ assertInstanceOf(classOf[RecordTooLargeException], commitError.getCause)
+
+ if (txnVersion == 2) {
Review Comment:
Sorry for the late reply, the comment
https://github.com/apache/kafka/pull/18448#discussion_r1916916687 was
incorrect—apologies for the misleading input. The behavior I was referring to,
where the response (EndTxnResponse) is called first, followed by
WriteTxnMarkerRequest, is actually the same for TV0 and TV1.
> I recreated the failure locally and it seems like for TV0 and TV1 we are
unable to join the ioThread when trying to close the producer. With TV2, the
thread seems to join fine. Were you observing this as well?
What I observed in TV0 and TV1 is that the producer continuously sends
InitProducerId requests[[0]] because it encounters
Errors.CONCURRENT_TRANSACTIONS. Since this is a retriable error, the producer
enters a retry loop, repeatedly sending InitProducerId requests to the broker.
But the last transaction remains incomplete, the entire process gets stuck in
an endless retry loop, and causing the test time out.
The CONCURRENT_TRANSACTIONS error occurs because WriteTxnMarker fails due to
a RecordTooLargeException, preventing the transaction state from correctly
transitioning to the CompleteAbort state.
Although TV2 passes in this scenario, we can still reproduce the
CONCURRENT_TRANSACTIONS error if we add the following lines after
abortTransaction(). This happens because WriteTxnMarker failed in the previous
abort attempt:
```
producer.beginTransaction()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test",
"key".getBytes, "value".getBytes))
producer.commitTransaction()
```
I'm not sure if this is something we need to aware of...
[0]:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1445
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]