This is an automated email from the ASF dual-hosted git repository.
edocomar pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 0e7134c1059 KAFKA-16570 FenceProducers API returns "unexpected error"
when succes… (#16229)
0e7134c1059 is described below
commit 0e7134c1059029d821d09364418031a47842c73e
Author: Edoardo Comar <[email protected]>
AuthorDate: Wed Jun 12 17:07:33 2024 +0100
KAFKA-16570 FenceProducers API returns "unexpected error" when succes…
(#16229)
KAFKA-16570 FenceProducers API returns "unexpected error" when successful
* Client handling of ConcurrentTransactionsException as retriable
* Unit test
* Integration test
Reviewers: Chris Egerton <[email protected]>, Justine Olshan
<[email protected]>
---
.../admin/internals/FenceProducersHandler.java | 4 +
.../admin/internals/FenceProducersHandlerTest.java | 1 +
.../admin/AdminFenceProducersIntegrationTest.scala | 141 +++++++++++++++++++++
3 files changed, 146 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
index 9a12bc19596..826db904a36 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
@@ -134,6 +134,10 @@ public class FenceProducersHandler extends
AdminApiHandler.Unbatched<Coordinator
"coordinator is still in the process of
loading state. Will retry",
transactionalIdKey.idValue);
return ApiResult.empty();
+ case CONCURRENT_TRANSACTIONS:
+ log.debug("InitProducerId request for transactionalId `{}`
failed because of " +
+ "a concurrent transaction. Will retry",
transactionalIdKey.idValue);
+ return ApiResult.empty();
case NOT_COORDINATOR:
case COORDINATOR_NOT_AVAILABLE:
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java
index 9665bd0bdf1..62258289a90 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java
@@ -94,6 +94,7 @@ public class FenceProducersHandlerTest {
assertRetriableError(handler, transactionalId,
Errors.COORDINATOR_LOAD_IN_PROGRESS);
assertUnmappedKey(handler, transactionalId, Errors.NOT_COORDINATOR);
assertUnmappedKey(handler, transactionalId,
Errors.COORDINATOR_NOT_AVAILABLE);
+ assertRetriableError(handler, transactionalId,
Errors.CONCURRENT_TRANSACTIONS);
}
private void assertFatalError(
diff --git
a/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
new file mode 100644
index 00000000000..fdddbd1568e
--- /dev/null
+++
b/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.api.IntegrationTestHarness
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
+import org.apache.kafka.common.errors.{InvalidProducerEpochException,
ProducerFencedException}
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs,
TransactionStateManagerConfigs}
+import org.apache.kafka.server.config.ServerLogConfigs
+import org.junit.jupiter.api.Assertions.{assertInstanceOf, assertThrows,
assertTrue, fail}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.concurrent.ExecutionException
+import java.util.{Collections, Properties}
+import scala.collection.Seq
+
+class AdminFenceProducersIntegrationTest extends IntegrationTestHarness {
+ override def brokerCount = 1
+
+ private val topicName = "mytopic"
+ private val txnId = "mytxnid"
+ private val record = new ProducerRecord[Array[Byte], Array[Byte]](topicName,
null, new Array[Byte](1))
+
+ private var adminClient: Admin = _
+ private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
+
+ @BeforeEach
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
+
+ val producerProps = new Properties
+ producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txnId)
+ producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "2000")
+ producer = createProducer(configOverrides = producerProps)
+ adminClient = createAdminClient()
+ createTopic(topicName)
+ }
+
+ def overridingProps(): Properties = {
+ val props = new Properties()
+ props.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG,
false.toString)
+ // Set a smaller value for the number of partitions for speed
+ props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
1.toString)
+
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
1.toString)
+ props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
1.toString)
+
props.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
"2000")
+ props
+ }
+
+ override protected def modifyConfigs(props: Seq[Properties]): Unit = {
+ props.foreach(p => p.putAll(overridingProps()))
+ }
+
+ override protected def kraftControllerConfigs(): Seq[Properties] = {
+ Seq(overridingProps())
+ }
+
+ @AfterEach
+ override def tearDown(): Unit = {
+ Utils.closeQuietly(adminClient, "AdminFenceProducersIntegrationTest")
+ Utils.closeQuietly(producer, "AdminFenceProducersIntegrationTest")
+ super.tearDown()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testFenceAfterProducerCommit(quorum: String): Unit = {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(record).get()
+ producer.commitTransaction()
+
+ adminClient.fenceProducers(Collections.singletonList(txnId)).all().get()
+
+ producer.beginTransaction()
+ try {
+ producer.send(record).get()
+ fail("expected ProducerFencedException")
+ } catch {
+ case _: ProducerFencedException => //ok
+ case ee: ExecutionException =>
+ assertInstanceOf(classOf[ProducerFencedException], ee.getCause) //ok
+ case e: Exception =>
+ throw e
+ }
+
+ assertThrows(classOf[ProducerFencedException], () =>
producer.commitTransaction())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testFenceBeforeProducerCommit(quorum: String): Unit = {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(record).get()
+
+ adminClient.fenceProducers(Collections.singletonList(txnId)).all().get()
+
+ try {
+ producer.send(record).get()
+ fail("expected Exception")
+ } catch {
+ case ee: ExecutionException =>
+ assertTrue(ee.getCause.isInstanceOf[ProducerFencedException] ||
+ ee.getCause.isInstanceOf[InvalidProducerEpochException],
+ "Unexpected ExecutionException cause " + ee.getCause)
+ case e: Exception =>
+ throw e
+ }
+
+ try {
+ producer.commitTransaction()
+ fail("expected Exception")
+ } catch {
+ case _: ProducerFencedException =>
+ case _: InvalidProducerEpochException =>
+ case e: Exception =>
+ throw e
+ }
+ }
+}
+