This is an automated email from the ASF dual-hosted git repository.

edocomar pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 0e7134c1059 KAFKA-16570 FenceProducers API returns "unexpected error" 
when succes… (#16229)
0e7134c1059 is described below

commit 0e7134c1059029d821d09364418031a47842c73e
Author: Edoardo Comar <[email protected]>
AuthorDate: Wed Jun 12 17:07:33 2024 +0100

    KAFKA-16570 FenceProducers API returns "unexpected error" when succes… 
(#16229)
    
    KAFKA-16570 FenceProducers API returns "unexpected error" when successful
    
    * Client handling of ConcurrentTransactionsException as retriable
    * Unit test
    * Integration test
    
    Reviewers: Chris Egerton <[email protected]>, Justine Olshan 
<[email protected]>
---
 .../admin/internals/FenceProducersHandler.java     |   4 +
 .../admin/internals/FenceProducersHandlerTest.java |   1 +
 .../admin/AdminFenceProducersIntegrationTest.scala | 141 +++++++++++++++++++++
 3 files changed, 146 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
index 9a12bc19596..826db904a36 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
@@ -134,6 +134,10 @@ public class FenceProducersHandler extends 
AdminApiHandler.Unbatched<Coordinator
                                 "coordinator is still in the process of 
loading state. Will retry",
                         transactionalIdKey.idValue);
                 return ApiResult.empty();
+            case CONCURRENT_TRANSACTIONS:
+                log.debug("InitProducerId request for transactionalId `{}` 
failed because of " +
+                                "a concurrent transaction. Will retry", 
transactionalIdKey.idValue);
+                return ApiResult.empty();
 
             case NOT_COORDINATOR:
             case COORDINATOR_NOT_AVAILABLE:
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java
index 9665bd0bdf1..62258289a90 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java
@@ -94,6 +94,7 @@ public class FenceProducersHandlerTest {
         assertRetriableError(handler, transactionalId, 
Errors.COORDINATOR_LOAD_IN_PROGRESS);
         assertUnmappedKey(handler, transactionalId, Errors.NOT_COORDINATOR);
         assertUnmappedKey(handler, transactionalId, 
Errors.COORDINATOR_NOT_AVAILABLE);
+        assertRetriableError(handler, transactionalId, 
Errors.CONCURRENT_TRANSACTIONS);
     }
 
     private void assertFatalError(
diff --git 
a/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
new file mode 100644
index 00000000000..fdddbd1568e
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.api.IntegrationTestHarness
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.errors.{InvalidProducerEpochException, 
ProducerFencedException}
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, 
TransactionStateManagerConfigs}
+import org.apache.kafka.server.config.ServerLogConfigs
+import org.junit.jupiter.api.Assertions.{assertInstanceOf, assertThrows, 
assertTrue, fail}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.concurrent.ExecutionException
+import java.util.{Collections, Properties}
+import scala.collection.Seq
+
+class AdminFenceProducersIntegrationTest extends IntegrationTestHarness {
+  override def brokerCount = 1
+
+  private val topicName = "mytopic"
+  private val txnId = "mytxnid"
+  private val record = new ProducerRecord[Array[Byte], Array[Byte]](topicName, 
null, new Array[Byte](1))
+
+  private var adminClient: Admin = _
+  private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
+
+  @BeforeEach
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
+
+    val producerProps = new Properties
+    producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txnId)
+    producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "2000")
+    producer = createProducer(configOverrides = producerProps)
+    adminClient = createAdminClient()
+    createTopic(topicName)
+  }
+
+  def overridingProps(): Properties = {
+    val props = new Properties()
+    props.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
false.toString)
+    // Set a smaller value for the number of partitions for speed
+    props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
+    
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 
1.toString)
+    props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
1.toString)
+    
props.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 "2000")
+    props
+  }
+
+  override protected def modifyConfigs(props: Seq[Properties]): Unit = {
+    props.foreach(p => p.putAll(overridingProps()))
+  }
+
+  override protected def kraftControllerConfigs(): Seq[Properties] = {
+    Seq(overridingProps())
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    Utils.closeQuietly(adminClient, "AdminFenceProducersIntegrationTest")
+    Utils.closeQuietly(producer, "AdminFenceProducersIntegrationTest")
+    super.tearDown()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testFenceAfterProducerCommit(quorum: String): Unit = {
+    producer.initTransactions()
+    producer.beginTransaction()
+    producer.send(record).get()
+    producer.commitTransaction()
+
+    adminClient.fenceProducers(Collections.singletonList(txnId)).all().get()
+
+    producer.beginTransaction()
+    try {
+        producer.send(record).get()
+        fail("expected ProducerFencedException")
+    } catch {
+      case _: ProducerFencedException => //ok
+      case ee: ExecutionException =>
+        assertInstanceOf(classOf[ProducerFencedException], ee.getCause) //ok
+      case e: Exception =>
+        throw e
+    }
+
+    assertThrows(classOf[ProducerFencedException], () => 
producer.commitTransaction())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testFenceBeforeProducerCommit(quorum: String): Unit = {
+    producer.initTransactions()
+    producer.beginTransaction()
+    producer.send(record).get()
+
+    adminClient.fenceProducers(Collections.singletonList(txnId)).all().get()
+
+    try {
+      producer.send(record).get()
+      fail("expected Exception")
+    } catch {
+      case ee: ExecutionException =>
+        assertTrue(ee.getCause.isInstanceOf[ProducerFencedException] ||
+                   ee.getCause.isInstanceOf[InvalidProducerEpochException],
+                   "Unexpected ExecutionException cause " + ee.getCause)
+      case e: Exception =>
+        throw e
+    }
+
+    try {
+      producer.commitTransaction()
+      fail("expected Exception")
+    } catch {
+      case _: ProducerFencedException =>
+      case _: InvalidProducerEpochException =>
+      case e: Exception =>
+        throw e
+    }
+  }
+}
+

Reply via email to