This is an automated email from the ASF dual-hosted git repository.

payang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2daf5be27ae KAFKA-16024: 
SaslPlaintextConsumerTest#testCoordinatorFailover is flaky (#20774)
2daf5be27ae is described below

commit 2daf5be27ae81eda9955a9d8bdfcf15499d955cc
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Oct 29 23:04:13 2025 +0800

    KAFKA-16024: SaslPlaintextConsumerTest#testCoordinatorFailover is flaky 
(#20774)
    
    - `broker.session.timeout.ms` defaults to 9s. When a broker goes
    offline, the group coordinator may take up to this long to be
    re-elected.
    - The commit callback retry timeout is currently 10 seconds, which
    leaves very little buffer. If the metadata hasn’t refreshed yet, the
    consumer may still send an OFFSET_COMMIT request to the offline
    coordinator, leading to transient failures.
    
    This patch enable `controlled.shutdown.enable` to allow the broker to
    notify the  controller before shutting down.  This speeds up the test by
    triggering  an immediate failover instead of waiting for the  broker
    session timeout  (default: 9s) to expire.
    
    Reviewers: TaiJuWu <[email protected]>, PoAn Yang <[email protected]>
---
 .../integration/kafka/api/BaseConsumerTest.scala    | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index adfb657b776..b0fce6ab36a 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -22,7 +22,9 @@ import org.apache.kafka.clients.producer.{KafkaProducer, 
ProducerConfig}
 import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, 
PartitionInfo}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.serialization.{Deserializer, Serializer}
+import org.apache.kafka.server.config.ServerConfigs
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 
@@ -36,6 +38,25 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
+  private var currentTestName: String = _
+
+  @BeforeEach
+  override def setUp(testInfo: TestInfo): Unit = {
+    currentTestName = testInfo.getTestMethod.get().getName
+    super.setUp(testInfo)
+  }
+
+  override protected def brokerPropertyOverrides(properties: Properties): Unit 
= {
+    super.brokerPropertyOverrides(properties)
+
+    if (currentTestName != null && 
currentTestName.equals("testCoordinatorFailover")) {
+      // Enable controlled shutdown to allow the broker to notify the 
controller before shutting down.
+      // This speeds up the test by triggering an immediate failover instead 
of waiting for the
+      // broker session timeout (default: 9s) to expire.
+      properties.setProperty(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, 
"true")
+    }
+  }
+
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("getTestGroupProtocolParametersAll"))
   def testSimpleConsumption(groupProtocol: String): Unit = {

Reply via email to