brandboat commented on code in PR #18448:
URL: https://github.com/apache/kafka/pull/18448#discussion_r1916916687


##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala:
##########
@@ -85,6 +88,50 @@ class ProducerIntegrationTest {
     } finally if (producer != null) producer.close()
   }
 
+  @ClusterTests(Array(
+    new ClusterTest(
+      features = Array(
+        new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
0))),
+    new ClusterTest(
+      features = Array(
+        new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
1))),
+    new ClusterTest(
+      features = Array(
+        new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
2))),
+  ))
+  def testTransactionWithInvalidSend(cluster: ClusterInstance): Unit = {
+    val topic = new NewTopic("foobar", 1, 
1.toShort).configs(Collections.singletonMap(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,
 "1"))
+    val admin = cluster.admin()
+    var txnVersion: Short = 0
+    try {
+      txnVersion = 
Option(admin.describeFeatures().featureMetadata().get().finalizedFeatures().get(Feature.TRANSACTION_VERSION))
+        .map(finalizedFeatures => finalizedFeatures.maxVersionLevel())
+        .getOrElse(0)
+      admin.createTopics(List(topic).asJava)
+    } finally if (admin != null) admin.close()
+
+    val properties = new util.HashMap[String, Object]
+    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar")
+    properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
+    properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+
+    val producer: Producer[Array[Byte], Array[Byte]] = 
cluster.producer(properties)
+    try {
+      producer.initTransactions()
+      producer.beginTransaction()
+      assertInstanceOf(classOf[RecordTooLargeException],
+        assertThrows(classOf[ExecutionException],
+          () => producer.send(new ProducerRecord[Array[Byte], 
Array[Byte]](topic.name(), "key".getBytes, "value".getBytes)).get()).getCause)
+
+      val commitError = assertThrows(classOf[KafkaException], () => 
producer.commitTransaction()) // fail due to last send failed
+      assertInstanceOf(classOf[RecordTooLargeException], commitError.getCause)
+
+      if (txnVersion == 2) {

Review Comment:
   > I'm still working through why the TV2 case was working. Not quite sure yet.
   
   ~~Looks like in TV_2 we'll call response (EndTxnResponse) first, and then 
call WriteTxnMarkerRequest, I think that's why TV2 abortTransaction pass even 
if WriteTxnMarker failed (due to max.message.bytes=1, like you said, marker 
takes up some bytes)~~ (Update, this behavior is the same as TV_0, TV_1)
   
   
https://github.com/apache/kafka/blob/32dbbe6a1f3ef39318c796bdc0a3b8da2c7060ad/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L943-L945
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to