This is an automated email from the ASF dual-hosted git repository.
payang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2daf5be27ae KAFKA-16024:
SaslPlaintextConsumerTest#testCoordinatorFailover is flaky (#20774)
2daf5be27ae is described below
commit 2daf5be27ae81eda9955a9d8bdfcf15499d955cc
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Oct 29 23:04:13 2025 +0800
KAFKA-16024: SaslPlaintextConsumerTest#testCoordinatorFailover is flaky
(#20774)
- `broker.session.timeout.ms` defaults to 9s. When a broker goes
offline, the group coordinator may take up to this long to be
re-elected.
- The commit callback retry timeout is currently 10 seconds, which
leaves very little buffer. If the metadata hasn’t refreshed yet, the
consumer may still send an OFFSET_COMMIT request to the offline
coordinator, leading to transient failures.
This patch enable `controlled.shutdown.enable` to allow the broker to
notify the controller before shutting down. This speeds up the test by
triggering an immediate failover instead of waiting for the broker
session timeout (default: 9s) to expire.
Reviewers: TaiJuWu <[email protected]>, PoAn Yang <[email protected]>
---
.../integration/kafka/api/BaseConsumerTest.scala | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index adfb657b776..b0fce6ab36a 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -22,7 +22,9 @@ import org.apache.kafka.clients.producer.{KafkaProducer,
ProducerConfig}
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener,
PartitionInfo}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
+import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@@ -36,6 +38,25 @@ import scala.collection.Seq
*/
abstract class BaseConsumerTest extends AbstractConsumerTest {
+ private var currentTestName: String = _
+
+ @BeforeEach
+ override def setUp(testInfo: TestInfo): Unit = {
+ currentTestName = testInfo.getTestMethod.get().getName
+ super.setUp(testInfo)
+ }
+
+ override protected def brokerPropertyOverrides(properties: Properties): Unit
= {
+ super.brokerPropertyOverrides(properties)
+
+ if (currentTestName != null &&
currentTestName.equals("testCoordinatorFailover")) {
+ // Enable controlled shutdown to allow the broker to notify the
controller before shutting down.
+ // This speeds up the test by triggering an immediate failover instead
of waiting for the
+ // broker session timeout (default: 9s) to expire.
+ properties.setProperty(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG,
"true")
+ }
+ }
+
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testSimpleConsumption(groupProtocol: String): Unit = {