This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1547204baa2 KAFKA-18914 Migrate ConsumerRebootstrapTest to use new
test infra (#19154)
1547204baa2 is described below
commit 1547204baa2f4cf6ee55b0d7ab1b508a9e6d09e0
Author: ClarkChen <[email protected]>
AuthorDate: Wed Mar 26 01:53:42 2025 +0800
KAFKA-18914 Migrate ConsumerRebootstrapTest to use new test infra (#19154)
Migrate ConsumerRebootstrapTest to the new test infra and remove the old
Scala test.
The PR changed three things.
* Migrated `ConsumerRebootstrapTest` to new test infra and removed the
old Scala test.
* Updated the original test case to cover rebootstrap scenarios.
* Integrated `ConsumerRebootstrapTest` into `ClientRebootstrapTest` in
the `client-integration-tests` module.
* Removed the `RebootstrapTest.scala`.
Default `ConsumerRebootstrap` config:
> properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap");
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"300000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"10000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"30000");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
"1000L");
The test case for the consumer with enabled rebootstrap

The test case for the consumer with disabled rebootstrap

Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/ClientRebootstrapTest.java | 133 ++++++++++++++++++-
.../kafka/api/ConsumerRebootstrapTest.scala | 146 ---------------------
.../integration/kafka/api/RebootstrapTest.scala | 69 ----------
3 files changed, 129 insertions(+), 219 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
index 081995a6734..2f11e13377d 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
@@ -17,9 +17,14 @@
package org.apache.kafka.clients;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
@@ -38,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class ClientRebootstrapTest {
private static final String TOPIC = "topic";
+ private static final int PARTITIONS = 1;
private static final int REPLICAS = 2;
@ClusterTest(
@@ -55,7 +61,7 @@ public class ClientRebootstrapTest {
clusterInstance.shutdownBroker(broker0);
try (var admin = clusterInstance.admin()) {
- admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short)
REPLICAS)));
+ admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short)
REPLICAS)));
// Only the broker 1 is available for the admin client during the
bootstrap.
assertDoesNotThrow(() -> admin.listTopics().names().get(timeout,
TimeUnit.SECONDS).contains(TOPIC));
@@ -84,7 +90,7 @@ public class ClientRebootstrapTest {
clusterInstance.shutdownBroker(broker0);
var admin =
clusterInstance.admin(Map.of(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"none"));
- admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) REPLICAS)));
+ admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short)
REPLICAS)));
// Only the broker 1 is available for the admin client during the
bootstrap.
assertDoesNotThrow(() -> admin.listTopics().names().get(60,
TimeUnit.SECONDS).contains(TOPIC));
@@ -109,7 +115,7 @@ public class ClientRebootstrapTest {
)
public void testProducerRebootstrap(ClusterInstance clusterInstance)
throws ExecutionException, InterruptedException {
try (var admin = clusterInstance.admin()) {
- admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short)
REPLICAS)));
+ admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short)
REPLICAS)));
}
var broker0 = 0;
@@ -144,7 +150,7 @@ public class ClientRebootstrapTest {
)
public void testProducerRebootstrapDisabled(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
try (var admin = clusterInstance.admin()) {
- admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short)
REPLICAS)));
+ admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short)
REPLICAS)));
}
var broker0 = 0;
@@ -168,4 +174,123 @@ public class ClientRebootstrapTest {
// Since the brokers cached during the bootstrap are offline, the
producer needs to wait the default timeout for other threads.
producer.close(Duration.ZERO);
}
+
+ public void consumerRebootstrap(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+ clusterInstance.createTopic(TOPIC, PARTITIONS, (short) REPLICAS);
+
+ var broker0 = 0;
+ var broker1 = 1;
+ var partitions = List.of(new TopicPartition(TOPIC, 0));
+
+ try (var producer =
clusterInstance.producer(Map.of(ProducerConfig.ACKS_CONFIG, "-1"))) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
"value 0".getBytes())).get();
+ assertEquals(0, recordMetadata.offset());
+ }
+
+ clusterInstance.shutdownBroker(broker0);
+
+ try (var consumer =
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name))) {
+ // Only the server 1 is available for the consumer during the
bootstrap.
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
+ TestUtils.waitForCondition(() ->
consumer.poll(Duration.ofMillis(100)).count() == 1, 10 * 1000, "Failed to poll
data.");
+
+ // Bring back the server 0 and shut down 1.
+ clusterInstance.shutdownBroker(broker1);
+ clusterInstance.startBroker(broker0);
+
+ try (var producer =
clusterInstance.producer(Map.of(ProducerConfig.ACKS_CONFIG, "-1"))) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
"value 1".getBytes())).get();
+ assertEquals(1, recordMetadata.offset());
+ }
+
+ // The server 1 originally cached during the bootstrap, is offline.
+ // However, the server 0 from the bootstrap list is online.
+ TestUtils.waitForCondition(() ->
consumer.poll(Duration.ofMillis(100)).count() == 1, 10 * 1000, "Failed to poll
data.");
+ }
+ }
+
+ @ClusterTest(
+ brokers = REPLICAS,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+ })
+ public void testClassicConsumerRebootstrap(ClusterInstance
clusterInstance) throws InterruptedException, ExecutionException {
+ consumerRebootstrap(clusterInstance, GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest(
+ brokers = REPLICAS,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+ })
+ public void testConsumerRebootstrap(ClusterInstance clusterInstance)
throws InterruptedException, ExecutionException {
+ consumerRebootstrap(clusterInstance, GroupProtocol.CONSUMER);
+ }
+
+ public void consumerRebootstrapDisabled(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+ clusterInstance.createTopic(TOPIC, PARTITIONS, (short) REPLICAS);
+
+ var broker0 = 0;
+ var broker1 = 1;
+ var tp = new TopicPartition(TOPIC, 0);
+
+ try (var producer =
clusterInstance.producer(Map.of(ProducerConfig.ACKS_CONFIG, "-1"))) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
"value 0".getBytes())).get();
+ assertEquals(0, recordMetadata.offset());
+ }
+
+ clusterInstance.shutdownBroker(broker0);
+
+ try (var consumer = clusterInstance.consumer(Map.of(
+ CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "none",
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name)
+ )) {
+ // Only the server 1 is available for the consumer during the
bootstrap.
+ consumer.assign(List.of(tp));
+ consumer.seekToBeginning(List.of(tp));
+ TestUtils.waitForCondition(() ->
consumer.poll(Duration.ofMillis(100)).count() == 1, 10 * 1000, "Failed to poll
data.");
+
+ // Bring back the server 0 and shut down 1.
+ clusterInstance.shutdownBroker(broker1);
+ clusterInstance.startBroker(broker0);
+
+ try (var producer =
clusterInstance.producer(Map.of(ProducerConfig.ACKS_CONFIG, "-1"))) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
"value 1".getBytes())).get();
+ assertEquals(1, recordMetadata.offset());
+ }
+
+ // The server 1 originally cached during the bootstrap, is offline.
+ // However, the server 0 from the bootstrap list is online.
+ assertEquals(0, consumer.poll(Duration.ofMillis(100)).count());
+ }
+ }
+
+ @ClusterTest(
+ brokers = REPLICAS,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")
+ }
+ )
+ public void testClassicConsumerRebootstrapDisabled(ClusterInstance
clusterInstance) throws InterruptedException, ExecutionException {
+ consumerRebootstrapDisabled(clusterInstance, GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest(
+ brokers = REPLICAS,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")
+ }
+ )
+ public void testConsumerRebootstrapDisabled(ClusterInstance
clusterInstance) throws InterruptedException, ExecutionException {
+ consumerRebootstrapDisabled(clusterInstance, GroupProtocol.CONSUMER);
+ }
}
diff --git
a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
deleted file mode 100644
index ea9345f7265..00000000000
--- a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import kafka.api.ConsumerRebootstrapTest._
-import
kafka.server.QuorumTestHarness.getTestQuorumAndGroupProtocolParametersAll
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
-import org.junit.jupiter.api.Disabled
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
-
-import java.time.Duration
-import java.util.{Collections, stream}
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.TimeoutException
-
-class ConsumerRebootstrapTest extends RebootstrapTest {
- @ParameterizedTest(name = RebootstrapTestName)
- @MethodSource(Array("rebootstrapTestParams"))
- def testRebootstrap(quorum: String, groupProtocol: String,
useRebootstrapTriggerMs: Boolean): Unit = {
- sendRecords(10, 0)
-
- TestUtils.waitUntilTrue(
- () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset ==
server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
- "Timeout waiting for records to be replicated"
- )
-
- server1.shutdown()
- server1.awaitShutdown()
-
- val consumer = createConsumer(configOverrides =
clientOverrides(useRebootstrapTriggerMs))
-
- // Only the server 0 is available for the consumer during the bootstrap.
- consumer.assign(Collections.singleton(tp))
-
- consumeAndVerifyRecords(consumer, 10, 0)
-
- // Bring back the server 1 and shut down 0.
- server1.startup()
-
- TestUtils.waitUntilTrue(
- () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset ==
server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
- "Timeout waiting for records to be replicated"
- )
-
- server0.shutdown()
- server0.awaitShutdown()
- sendRecords(10, 10)
-
- // The server 0, originally cached during the bootstrap, is offline.
- // However, the server 1 from the bootstrap list is online.
- // Should be able to consume records.
- consumeAndVerifyRecords(consumer, 10, 10, startingKeyAndValueIndex = 10,
startingTimestamp = 10)
-
- // Bring back the server 0 and shut down 1.
- server0.startup()
-
- TestUtils.waitUntilTrue(
- () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset ==
server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
- "Timeout waiting for records to be replicated"
- )
-
- server1.shutdown()
- server1.awaitShutdown()
- sendRecords(10, 20)
-
- // The same situation, but the server 1 has gone and server 0 is back.
- consumeAndVerifyRecords(consumer, 10, 20, startingKeyAndValueIndex = 20,
startingTimestamp = 20)
- }
-
- @Disabled
- @ParameterizedTest(name = RebootstrapTestName)
- @MethodSource(Array("rebootstrapTestParams"))
- def testRebootstrapDisabled(quorum: String, groupProtocol: String,
useRebootstrapTriggerMs: Boolean): Unit = {
- server1.shutdown()
- server1.awaitShutdown()
-
- val configOverrides = clientOverrides(useRebootstrapTriggerMs)
- configOverrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"none")
- if (useRebootstrapTriggerMs)
-
configOverrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"1000")
-
- val producer = createProducer(configOverrides = configOverrides)
- val consumer = createConsumer(configOverrides = configOverrides)
- val adminClient = createAdminClient(configOverrides = configOverrides)
-
- // Only the server 0 is available during the bootstrap.
- val recordMetadata0 = producer.send(new ProducerRecord(topic, part, 0L,
"key 0".getBytes, "value 0".getBytes)).get(15, TimeUnit.SECONDS)
- assertEquals(0, recordMetadata0.offset())
- adminClient.listTopics().names().get(15, TimeUnit.SECONDS)
- consumer.assign(Collections.singleton(tp))
- consumeAndVerifyRecords(consumer, 1, 0)
-
- server0.shutdown()
- server0.awaitShutdown()
- server1.startup()
-
- assertThrows(classOf[TimeoutException], () => producer.send(new
ProducerRecord(topic, part, "key 2".getBytes, "value 2".getBytes)).get(5,
TimeUnit.SECONDS))
- assertThrows(classOf[TimeoutException], () =>
adminClient.listTopics().names().get(5, TimeUnit.SECONDS))
-
- val producer2 = createProducer(configOverrides = configOverrides)
- producer2.send(new ProducerRecord(topic, part, 1L, "key 1".getBytes,
"value 1".getBytes)).get(15, TimeUnit.SECONDS)
- assertEquals(0, consumer.poll(Duration.ofSeconds(5)).count)
- }
-
- private def sendRecords(numRecords: Int, from: Int): Unit = {
- val producer: KafkaProducer[Array[Byte], Array[Byte]] = createProducer()
- (from until (numRecords + from)).foreach { i =>
- val record = new ProducerRecord(tp.topic(), tp.partition(), i.toLong,
s"key $i".getBytes, s"value $i".getBytes)
- producer.send(record)
- }
- producer.flush()
- producer.close()
- }
-}
-
-object ConsumerRebootstrapTest {
-
- final val RebootstrapTestName =
s"${TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames}.useRebootstrapTriggerMs={2}"
- def rebootstrapTestParams: stream.Stream[Arguments] = {
- getTestQuorumAndGroupProtocolParametersAll
- .flatMap { baseArgs =>
- stream.Stream.of(
- Arguments.of((baseArgs.get :+ true):_*),
- Arguments.of((baseArgs.get :+ false):_*)
- )
- }
- }
-}
diff --git a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
deleted file mode 100644
index 2d84284cd6b..00000000000
--- a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import kafka.server.{KafkaBroker, KafkaConfig}
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.junit.jupiter.api.{BeforeEach, TestInfo}
-
-import java.util.Properties
-
-abstract class RebootstrapTest extends AbstractConsumerTest {
- override def brokerCount: Int = 2
-
- def server0: KafkaBroker = serverForId(0).get
- def server1: KafkaBroker = serverForId(1).get
-
- @BeforeEach
- override def setUp(testInfo: TestInfo): Unit = {
- super.doSetup(testInfo, createOffsetsTopic = true)
-
- // Enable unclean leader election for the test topic
- val topicProps = new Properties
- topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
-
- // create the test topic with all the brokers as replicas
- createTopic(topic, 2, brokerCount, adminClientConfig =
this.adminClientConfig, topicConfig = topicProps)
- }
-
- override def generateConfigs: Seq[KafkaConfig] = {
- val overridingProps = new Properties()
-
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
brokerCount.toString)
-
- // In this test, fixed ports are necessary, because brokers must have the
- // same port after the restart.
- FixedPortTestUtils.createBrokerConfigs(brokerCount,
enableControlledShutdown = false)
- .map(KafkaConfig.fromProps(_, overridingProps))
- }
-
- def clientOverrides(useRebootstrapTriggerMs: Boolean): Properties = {
- val overrides = new Properties()
- if (useRebootstrapTriggerMs) {
-
overrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"5000")
- } else {
-
overrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"3600000")
-
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"5000")
-
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"5000")
- overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "1000")
- overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
"1000")
- }
- overrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap")
- overrides
- }
-}