This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c01723340ff MINOR: Migrate EligibleLeaderReplicasIntegrationTest to
use new test infra (#20199)
c01723340ff is described below
commit c01723340ffe14f61379980495c08f7f3f7dec25
Author: Chang-Chi Hsu <[email protected]>
AuthorDate: Sat Aug 23 19:35:20 2025 +0200
MINOR: Migrate EligibleLeaderReplicasIntegrationTest to use new test infra
(#20199)
**Changes**: Use ClusterTest to rewrite
EligibleLeaderReplicasIntegrationTest.
**Validation**: Run the test 50 times locally with consistent success.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../EligibleLeaderReplicasIntegrationTest.java | 458 ---------------------
.../EligibleLeaderReplicasIntegrationTest.java | 384 +++++++++++++++++
2 files changed, 384 insertions(+), 458 deletions(-)
diff --git
a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
deleted file mode 100644
index 28c12cf6bce..00000000000
---
a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server.integration;
-import kafka.integration.KafkaServerTestHarness;
-import kafka.server.KafkaBroker;
-import kafka.server.KafkaConfig;
-import kafka.utils.Logging;
-import kafka.utils.TestUtils;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AlterConfigOp;
-import org.apache.kafka.clients.admin.ConfigEntry;
-import org.apache.kafka.clients.admin.FeatureUpdate;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartitionInfo;
-import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
-import org.apache.kafka.server.common.MetadataVersion;
-import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
-
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInfo;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.function.BiFunction;
-import java.util.stream.Collectors;
-
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
-import scala.collection.mutable.HashMap;
-
-import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class EligibleLeaderReplicasIntegrationTest extends
KafkaServerTestHarness implements Logging {
- private String bootstrapServer;
- private String testTopicName;
- private Admin adminClient;
-
- @Override
- public MetadataVersion metadataVersion() {
- return MetadataVersion.IBP_4_0_IV1;
- }
-
- @Override
- public Seq<KafkaConfig> generateConfigs() {
- List<Properties> brokerConfigs = new ArrayList<>();
-
brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs(
- 5, // The tests require 4 brokers to host the partition. However,
we need the 5th broker to handle the admin client requests.
- true,
- true,
- scala.Option.<SecurityProtocol>empty(),
- scala.Option.<File>empty(),
- scala.Option.<Properties>empty(),
- true,
- false,
- false,
- false,
- new HashMap<>(),
- 1,
- false,
- 1,
- (short) 4,
- 0,
- false
- )));
- List<KafkaConfig> configs = new ArrayList<>();
- for (Properties props : brokerConfigs) {
- configs.add(KafkaConfig.fromProps(props));
- }
- return JavaConverters.asScalaBuffer(configs).toSeq();
- }
-
- @BeforeEach
- @Override
- public void setUp(TestInfo info) {
- super.setUp(info);
- // create adminClient
- Properties props = new Properties();
- bootstrapServer = bootstrapServers(listenerName());
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
- adminClient = Admin.create(props);
- adminClient.updateFeatures(
- Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
- new
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE)),
- new UpdateFeaturesOptions()
- );
- testTopicName = String.format("%s-%s",
info.getTestMethod().get().getName(), "ELR-test");
- }
-
- @AfterEach
- public void close() throws Exception {
- if (adminClient != null) adminClient.close();
- }
-
- @Test
- public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws
ExecutionException, InterruptedException {
- adminClient.createTopics(
- List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
- TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
-
- ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
- Collection<AlterConfigOp> ops = new ArrayList<>();
- ops.add(new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"),
AlterConfigOp.OpType.SET));
- Map<ConfigResource, Collection<AlterConfigOp>> configOps =
Map.of(configResource, ops);
- // alter configs on target cluster
- adminClient.incrementalAlterConfigs(configOps).all().get();
- Producer producer = null;
- Consumer consumer = null;
- try {
- TopicDescription testTopicDescription =
adminClient.describeTopics(List.of(testTopicName))
- .allTopicNames().get().get(testTopicName);
- TopicPartitionInfo topicPartitionInfo =
testTopicDescription.partitions().get(0);
- List<Node> initialReplicas = topicPartitionInfo.replicas();
- assertEquals(4, topicPartitionInfo.isr().size());
- assertEquals(0, topicPartitionInfo.elr().size());
- assertEquals(0, topicPartitionInfo.lastKnownElr().size());
-
- Properties producerProps = new Properties();
-
producerProps.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
-
producerProps.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
- producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
- // Use Ack=1 for the producer.
- producerProps.put(ProducerConfig.ACKS_CONFIG, "1");
- producer = new KafkaProducer(producerProps);
-
- Properties consumerProps = new Properties();
- consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
- consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10");
- consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
-
consumerProps.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
-
consumerProps.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- consumer = new KafkaConsumer<>(consumerProps);
- consumer.subscribe(Set.of(testTopicName));
-
- producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get();
- waitUntilOneMessageIsConsumed(consumer);
-
- killBroker(initialReplicas.get(0).id());
- killBroker(initialReplicas.get(1).id());
-
- waitForIsrAndElr((isrSize, elrSize) -> {
- return isrSize == 2 && elrSize == 1;
- });
-
- // Now the partition is under min ISR. HWM should not advance.
- producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get();
- Thread.sleep(100);
- assertEquals(0, consumer.poll(Duration.ofSeconds(1L)).count());
-
- // Restore the min ISR and the previous log should be visible.
- startBroker(initialReplicas.get(1).id());
- startBroker(initialReplicas.get(0).id());
- waitForIsrAndElr((isrSize, elrSize) -> {
- return isrSize == 4 && elrSize == 0;
- });
-
- waitUntilOneMessageIsConsumed(consumer);
- } finally {
- restartDeadBrokers(false);
- if (consumer != null) consumer.close();
- if (producer != null) producer.close();
- }
- }
-
- void waitUntilOneMessageIsConsumed(Consumer consumer) {
- TestUtils.waitUntilTrue(
- () -> {
- try {
- ConsumerRecords record =
consumer.poll(Duration.ofMillis(100L));
- return record.count() >= 1;
- } catch (Exception e) {
- return false;
- }
- },
- () -> "fail to consume messages",
- DEFAULT_MAX_WAIT_MS, 100L
- );
- }
-
- @Test
- public void testElrMemberCanBeElected() throws ExecutionException,
InterruptedException {
- adminClient.createTopics(
- List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
- TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
-
- ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
- Collection<AlterConfigOp> ops = new ArrayList<>();
- ops.add(new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"),
AlterConfigOp.OpType.SET));
- Map<ConfigResource, Collection<AlterConfigOp>> configOps =
Map.of(configResource, ops);
- // alter configs on target cluster
- adminClient.incrementalAlterConfigs(configOps).all().get();
-
- try {
- TopicDescription testTopicDescription =
adminClient.describeTopics(List.of(testTopicName))
- .allTopicNames().get().get(testTopicName);
- TopicPartitionInfo topicPartitionInfo =
testTopicDescription.partitions().get(0);
- List<Node> initialReplicas = topicPartitionInfo.replicas();
- assertEquals(4, topicPartitionInfo.isr().size());
- assertEquals(0, topicPartitionInfo.elr().size());
- assertEquals(0, topicPartitionInfo.lastKnownElr().size());
-
- killBroker(initialReplicas.get(0).id());
- killBroker(initialReplicas.get(1).id());
- killBroker(initialReplicas.get(2).id());
-
- waitForIsrAndElr((isrSize, elrSize) -> {
- return isrSize == 1 && elrSize == 2;
- });
-
- killBroker(initialReplicas.get(3).id());
-
- waitForIsrAndElr((isrSize, elrSize) -> {
- return isrSize == 0 && elrSize == 3;
- });
-
- topicPartitionInfo =
adminClient.describeTopics(List.of(testTopicName))
- .allTopicNames().get().get(testTopicName).partitions().get(0);
- assertEquals(1, topicPartitionInfo.lastKnownElr().size(),
topicPartitionInfo.toString());
- int expectLastKnownLeader = initialReplicas.get(3).id();
- assertEquals(expectLastKnownLeader,
topicPartitionInfo.lastKnownElr().get(0).id(), topicPartitionInfo.toString());
-
- // At this point, all the replicas are failed and the last know
leader is No.3 and 3 members in the ELR.
- // Restart one broker of the ELR and it should be the leader.
-
- int expectLeader = topicPartitionInfo.elr().stream()
- .filter(node -> node.id() !=
expectLastKnownLeader).toList().get(0).id();
-
- startBroker(expectLeader);
- waitForIsrAndElr((isrSize, elrSize) -> {
- return isrSize == 1 && elrSize == 2;
- });
-
- topicPartitionInfo =
adminClient.describeTopics(List.of(testTopicName))
- .allTopicNames().get().get(testTopicName).partitions().get(0);
- assertEquals(0, topicPartitionInfo.lastKnownElr().size(),
topicPartitionInfo.toString());
- assertEquals(expectLeader, topicPartitionInfo.leader().id(),
topicPartitionInfo.toString());
-
- // Start another 2 brokers and the ELR fields should be cleaned.
- topicPartitionInfo.replicas().stream().filter(node -> node.id() !=
expectLeader).limit(2)
- .forEach(node -> startBroker(node.id()));
-
- waitForIsrAndElr((isrSize, elrSize) -> {
- return isrSize == 3 && elrSize == 0;
- });
-
- topicPartitionInfo =
adminClient.describeTopics(List.of(testTopicName))
- .allTopicNames().get().get(testTopicName).partitions().get(0);
- assertEquals(0, topicPartitionInfo.lastKnownElr().size(),
topicPartitionInfo.toString());
- assertEquals(expectLeader, topicPartitionInfo.leader().id(),
topicPartitionInfo.toString());
- } finally {
- restartDeadBrokers(false);
- }
- }
-
- @Test
- public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws
ExecutionException, InterruptedException {
- adminClient.createTopics(
- List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
- TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
-
- ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
- Collection<AlterConfigOp> ops = new ArrayList<>();
- ops.add(new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"),
AlterConfigOp.OpType.SET));
- Map<ConfigResource, Collection<AlterConfigOp>> configOps =
Map.of(configResource, ops);
- // alter configs on target cluster
- adminClient.incrementalAlterConfigs(configOps).all().get();
-
- try {
- TopicDescription testTopicDescription =
adminClient.describeTopics(List.of(testTopicName))
- .allTopicNames().get().get(testTopicName);
- TopicPartitionInfo topicPartitionInfo =
testTopicDescription.partitions().get(0);
- List<Node> initialReplicas = topicPartitionInfo.replicas();
- assertEquals(4, topicPartitionInfo.isr().size());
- assertEquals(0, topicPartitionInfo.elr().size());
- assertEquals(0, topicPartitionInfo.lastKnownElr().size());
-
- killBroker(initialReplicas.get(0).id());
- killBroker(initialReplicas.get(1).id());
- killBroker(initialReplicas.get(2).id());
- killBroker(initialReplicas.get(3).id());
-
- waitForIsrAndElr((isrSize, elrSize) -> {
- return isrSize == 0 && elrSize == 3;
- });
- topicPartitionInfo =
adminClient.describeTopics(List.of(testTopicName))
- .allTopicNames().get().get(testTopicName).partitions().get(0);
-
- int brokerToBeUncleanShutdown =
topicPartitionInfo.elr().get(0).id();
- KafkaBroker broker = brokers().find(b -> {
- return b.config().brokerId() == brokerToBeUncleanShutdown;
- }).get();
- Seq<File> dirs = broker.logManager().liveLogDirs();
- assertEquals(1, dirs.size());
- CleanShutdownFileHandler handler = new
CleanShutdownFileHandler(dirs.apply(0).toString());
- assertTrue(handler.exists());
- assertDoesNotThrow(() -> handler.delete());
-
- // After remove the clean shutdown file, the broker should report
unclean shutdown during restart.
- startBroker(brokerToBeUncleanShutdown);
- waitForIsrAndElr((isrSize, elrSize) -> {
- return isrSize == 0 && elrSize == 2;
- });
- topicPartitionInfo =
adminClient.describeTopics(List.of(testTopicName))
- .allTopicNames().get().get(testTopicName).partitions().get(0);
- assertNull(topicPartitionInfo.leader());
- assertEquals(1, topicPartitionInfo.lastKnownElr().size());
- } finally {
- restartDeadBrokers(false);
- }
- }
-
- /*
- This test is only valid for KIP-966 part 1. When the unclean recovery
is implemented, it should be removed.
- */
- @Test
- public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws
ExecutionException, InterruptedException {
- adminClient.createTopics(
- List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
- TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
-
- ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
- Collection<AlterConfigOp> ops = new ArrayList<>();
- ops.add(new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"),
AlterConfigOp.OpType.SET));
- Map<ConfigResource, Collection<AlterConfigOp>> configOps =
Map.of(configResource, ops);
- // alter configs on target cluster
- adminClient.incrementalAlterConfigs(configOps).all().get();
-
- try {
- TopicDescription testTopicDescription =
adminClient.describeTopics(List.of(testTopicName))
- .allTopicNames().get().get(testTopicName);
- TopicPartitionInfo topicPartitionInfo =
testTopicDescription.partitions().get(0);
- List<Node> initialReplicas = topicPartitionInfo.replicas();
- assertEquals(4, topicPartitionInfo.isr().size());
- assertEquals(0, topicPartitionInfo.elr().size());
- assertEquals(0, topicPartitionInfo.lastKnownElr().size());
-
- killBroker(initialReplicas.get(0).id());
- killBroker(initialReplicas.get(1).id());
- killBroker(initialReplicas.get(2).id());
- killBroker(initialReplicas.get(3).id());
-
- waitForIsrAndElr((isrSize, elrSize) -> {
- return isrSize == 0 && elrSize == 3;
- });
- topicPartitionInfo =
adminClient.describeTopics(List.of(testTopicName))
- .allTopicNames().get().get(testTopicName).partitions().get(0);
- int lastKnownLeader =
topicPartitionInfo.lastKnownElr().get(0).id();
-
- Set<Integer> initialReplicaSet = initialReplicas.stream().map(node
-> node.id()).collect(Collectors.toSet());
- brokers().foreach(broker -> {
- if (initialReplicaSet.contains(broker.config().brokerId())) {
- Seq<File> dirs = broker.logManager().liveLogDirs();
- assertEquals(1, dirs.size());
- CleanShutdownFileHandler handler = new
CleanShutdownFileHandler(dirs.apply(0).toString());
- assertDoesNotThrow(() -> handler.delete());
- }
- return true;
- });
-
-
- // After remove the clean shutdown file, the broker should report
unclean shutdown during restart.
- topicPartitionInfo.replicas().forEach(replica -> {
- if (replica.id() != lastKnownLeader) startBroker(replica.id());
- });
- waitForIsrAndElr((isrSize, elrSize) -> {
- return isrSize == 0 && elrSize == 1;
- });
- topicPartitionInfo =
adminClient.describeTopics(List.of(testTopicName))
- .allTopicNames().get().get(testTopicName).partitions().get(0);
- assertNull(topicPartitionInfo.leader());
- assertEquals(1, topicPartitionInfo.lastKnownElr().size());
-
- // Now if the last known leader goes through unclean shutdown, it
will still be elected.
- startBroker(lastKnownLeader);
- waitForIsrAndElr((isrSize, elrSize) -> {
- return isrSize > 0 && elrSize == 0;
- });
-
- TestUtils.waitUntilTrue(
- () -> {
- try {
- TopicPartitionInfo partition =
adminClient.describeTopics(List.of(testTopicName))
-
.allTopicNames().get().get(testTopicName).partitions().get(0);
- if (partition.leader() == null) return false;
- return partition.lastKnownElr().isEmpty() &&
partition.elr().isEmpty() && partition.leader().id() == lastKnownLeader;
- } catch (Exception e) {
- return false;
- }
- },
- () -> String.format("Partition metadata for %s is not
correct", testTopicName),
- DEFAULT_MAX_WAIT_MS, 100L
- );
- } finally {
- restartDeadBrokers(false);
- }
- }
-
- void waitForIsrAndElr(BiFunction<Integer, Integer, Boolean>
isIsrAndElrSizeSatisfied) {
- TestUtils.waitUntilTrue(
- () -> {
- try {
- TopicDescription topicDescription =
adminClient.describeTopics(List.of(testTopicName))
- .allTopicNames().get().get(testTopicName);
- TopicPartitionInfo partition =
topicDescription.partitions().get(0);
- return
isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size());
- } catch (Exception e) {
- return false;
- }
- },
- () -> String.format("Partition metadata for %s is not propagated",
testTopicName),
- DEFAULT_MAX_WAIT_MS, 100L);
- }
-}
diff --git
a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java
b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java
new file mode 100644
index 00000000000..61863688f2e
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.FeatureUpdate;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
+import org.apache.kafka.test.TestUtils;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+ brokers = 5,
+ serverProperties = {
+ @ClusterConfigProperty(key =
ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG,
value = "true"),
+ @ClusterConfigProperty(key =
ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "4")
+ }
+)
+public class EligibleLeaderReplicasIntegrationTest {
+ private final ClusterInstance clusterInstance;
+
+ EligibleLeaderReplicasIntegrationTest(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
+ }
+
+ @ClusterTest(types = {Type.KRAFT}, metadataVersion =
MetadataVersion.IBP_4_0_IV1)
+ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws
ExecutionException, InterruptedException {
+ try (var admin = clusterInstance.admin();
+ var producer = clusterInstance.producer(Map.of(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName(),
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName(),
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers(),
+ ProducerConfig.ACKS_CONFIG, "1"));
+ var consumer = clusterInstance.consumer(Map.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers(),
+ ConsumerConfig.GROUP_ID_CONFIG, "test",
+ ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10",
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName(),
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName()))) {
+ String testTopicName = String.format("%s-%s",
"testHighWatermarkShouldNotAdvanceIfUnderMinIsr", "ELR-test");
+ admin.updateFeatures(
+ Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
+ new
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE)),
+ new UpdateFeaturesOptions()).all().get();
+
+ admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short)
4))).all().get();
+ clusterInstance.waitTopicCreation(testTopicName, 1);
+
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+ Collection<AlterConfigOp> ops = new ArrayList<>();
+ ops.add(new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"),
AlterConfigOp.OpType.SET));
+ Map<ConfigResource, Collection<AlterConfigOp>> configOps =
Map.of(configResource, ops);
+ // alter configs on target cluster
+ admin.incrementalAlterConfigs(configOps).all().get();
+
+ TopicDescription testTopicDescription =
admin.describeTopics(List.of(testTopicName))
+ .allTopicNames().get().get(testTopicName);
+ TopicPartitionInfo topicPartitionInfo =
testTopicDescription.partitions().get(0);
+ List<Node> initialReplicas = topicPartitionInfo.replicas();
+ assertEquals(4, topicPartitionInfo.isr().size());
+ assertEquals(0, topicPartitionInfo.elr().size());
+ assertEquals(0, topicPartitionInfo.lastKnownElr().size());
+
+ consumer.subscribe(Set.of(testTopicName));
+ producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get();
+ waitUntilOneMessageIsConsumed(consumer);
+
+ clusterInstance.shutdownBroker(initialReplicas.get(0).id());
+ clusterInstance.shutdownBroker(initialReplicas.get(1).id());
+
+ waitForIsrAndElr((isrSize, elrSize) -> isrSize == 2 && elrSize ==
1, admin, testTopicName);
+
+ TopicPartition partition = new TopicPartition(testTopicName, 0);
+ long leoBeforeSend = admin.listOffsets(Map.of(partition,
OffsetSpec.latest())).partitionResult(partition).get().offset();
+ // Now the partition is under min ISR. HWM should not advance.
+ producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get();
+ long leoAfterSend = admin.listOffsets(Map.of(partition,
OffsetSpec.latest())).partitionResult(partition).get().offset();
+ assertEquals(leoBeforeSend, leoAfterSend);
+
+ // Restore the min ISR and the previous log should be visible.
+ clusterInstance.startBroker(initialReplicas.get(1).id());
+ clusterInstance.startBroker(initialReplicas.get(0).id());
+ waitForIsrAndElr((isrSize, elrSize) -> isrSize == 4 && elrSize ==
0, admin, testTopicName);
+
+ waitUntilOneMessageIsConsumed(consumer);
+ }
+ }
+
+ void waitUntilOneMessageIsConsumed(Consumer<?, ?> consumer) throws
InterruptedException {
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ return consumer.poll(Duration.ofMillis(100L)).count() >= 1;
+ } catch (Exception e) {
+ return false;
+ }
+ },
+ DEFAULT_MAX_WAIT_MS,
+ () -> "fail to consume messages"
+ );
+ }
+
+ @ClusterTest(types = {Type.KRAFT}, metadataVersion =
MetadataVersion.IBP_4_0_IV1)
+ public void testElrMemberCanBeElected() throws ExecutionException,
InterruptedException {
+ try (var admin = clusterInstance.admin()) {
+ String testTopicName = String.format("%s-%s",
"testElrMemberCanBeElected", "ELR-test");
+
+ admin.updateFeatures(
+ Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
+ new
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE)),
+ new UpdateFeaturesOptions()).all().get();
+ admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short)
4))).all().get();
+ clusterInstance.waitTopicCreation(testTopicName, 1);
+
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+ Collection<AlterConfigOp> ops = new ArrayList<>();
+ ops.add(new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"),
AlterConfigOp.OpType.SET));
+ Map<ConfigResource, Collection<AlterConfigOp>> configOps =
Map.of(configResource, ops);
+ // alter configs on target cluster
+ admin.incrementalAlterConfigs(configOps).all().get();
+
+ TopicDescription testTopicDescription =
admin.describeTopics(List.of(testTopicName))
+ .allTopicNames().get().get(testTopicName);
+ TopicPartitionInfo topicPartitionInfo =
testTopicDescription.partitions().get(0);
+ List<Node> initialReplicas = topicPartitionInfo.replicas();
+ assertEquals(4, topicPartitionInfo.isr().size());
+ assertEquals(0, topicPartitionInfo.elr().size());
+ assertEquals(0, topicPartitionInfo.lastKnownElr().size());
+
+ clusterInstance.shutdownBroker(initialReplicas.get(0).id());
+ clusterInstance.shutdownBroker(initialReplicas.get(1).id());
+ clusterInstance.shutdownBroker(initialReplicas.get(2).id());
+
+ waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize ==
2, admin, testTopicName);
+
+ clusterInstance.shutdownBroker(initialReplicas.get(3).id());
+
+ waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize ==
3, admin, testTopicName);
+
+ topicPartitionInfo = admin.describeTopics(List.of(testTopicName))
+ .allTopicNames().get().get(testTopicName).partitions().get(0);
+ assertEquals(1, topicPartitionInfo.lastKnownElr().size(),
topicPartitionInfo.toString());
+ int expectLastKnownLeader = initialReplicas.get(3).id();
+ assertEquals(expectLastKnownLeader,
topicPartitionInfo.lastKnownElr().get(0).id(), topicPartitionInfo.toString());
+
+ // At this point, all the replicas are failed and the last know
leader is No.3 and 3 members in the ELR.
+ // Restart one broker of the ELR and it should be the leader.
+
+ int expectLeader = topicPartitionInfo.elr().stream()
+ .filter(node -> node.id() !=
expectLastKnownLeader).toList().get(0).id();
+
+ clusterInstance.startBroker(expectLeader);
+ waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize ==
2, admin, testTopicName);
+
+ topicPartitionInfo = admin.describeTopics(List.of(testTopicName))
+ .allTopicNames().get().get(testTopicName).partitions().get(0);
+ assertEquals(0, topicPartitionInfo.lastKnownElr().size(),
topicPartitionInfo.toString());
+ assertEquals(expectLeader, topicPartitionInfo.leader().id(),
topicPartitionInfo.toString());
+
+ // Start another 2 brokers and the ELR fields should be cleaned.
+ topicPartitionInfo.replicas().stream().filter(node -> node.id() !=
expectLeader).limit(2)
+ .forEach(node -> clusterInstance.startBroker(node.id()));
+
+ waitForIsrAndElr((isrSize, elrSize) -> isrSize == 3 && elrSize ==
0, admin, testTopicName);
+
+ topicPartitionInfo = admin.describeTopics(List.of(testTopicName))
+ .allTopicNames().get().get(testTopicName).partitions().get(0);
+ assertEquals(0, topicPartitionInfo.lastKnownElr().size(),
topicPartitionInfo.toString());
+ assertEquals(expectLeader, topicPartitionInfo.leader().id(),
topicPartitionInfo.toString());
+ }
+ }
+
+ @ClusterTest(types = {Type.KRAFT}, metadataVersion =
MetadataVersion.IBP_4_0_IV1)
+ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws
ExecutionException, InterruptedException {
+ try (var admin = clusterInstance.admin()) {
+ String testTopicName = String.format("%s-%s",
"testElrMemberShouldBeKickOutWhenUncleanShutdown", "ELR-test");
+
+ admin.updateFeatures(
+ Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
+ new
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE)),
+ new UpdateFeaturesOptions()).all().get();
+ admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short)
4))).all().get();
+ clusterInstance.waitTopicCreation(testTopicName, 1);
+
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+ Collection<AlterConfigOp> ops = new ArrayList<>();
+ ops.add(new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"),
AlterConfigOp.OpType.SET));
+ Map<ConfigResource, Collection<AlterConfigOp>> configOps =
Map.of(configResource, ops);
+ // alter configs on target cluster
+ admin.incrementalAlterConfigs(configOps).all().get();
+
+ TopicDescription testTopicDescription =
admin.describeTopics(List.of(testTopicName))
+ .allTopicNames().get().get(testTopicName);
+ TopicPartitionInfo topicPartitionInfo =
testTopicDescription.partitions().get(0);
+ List<Node> initialReplicas = topicPartitionInfo.replicas();
+ assertEquals(4, topicPartitionInfo.isr().size());
+ assertEquals(0, topicPartitionInfo.elr().size());
+ assertEquals(0, topicPartitionInfo.lastKnownElr().size());
+
+ clusterInstance.shutdownBroker(initialReplicas.get(0).id());
+ clusterInstance.shutdownBroker(initialReplicas.get(1).id());
+ clusterInstance.shutdownBroker(initialReplicas.get(2).id());
+ clusterInstance.shutdownBroker(initialReplicas.get(3).id());
+
+ waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize ==
3, admin, testTopicName);
+ topicPartitionInfo = admin.describeTopics(List.of(testTopicName))
+ .allTopicNames().get().get(testTopicName).partitions().get(0);
+
+ int brokerToBeUncleanShutdown =
topicPartitionInfo.elr().get(0).id();
+ var broker = clusterInstance.brokers().values().stream().filter(b
-> b.config().brokerId() == brokerToBeUncleanShutdown)
+ .findFirst().get();
+ List<File> dirs = new ArrayList<>();
+ broker.logManager().liveLogDirs().foreach(dirs::add);
+ assertEquals(1, dirs.size());
+ CleanShutdownFileHandler handler = new
CleanShutdownFileHandler(dirs.get(0).toString());
+ assertTrue(handler.exists());
+ assertDoesNotThrow(handler::delete);
+
+ // After remove the clean shutdown file, the broker should report
unclean shutdown during restart.
+ clusterInstance.startBroker(brokerToBeUncleanShutdown);
+ waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize ==
2, admin, testTopicName);
+ topicPartitionInfo = admin.describeTopics(List.of(testTopicName))
+ .allTopicNames().get().get(testTopicName).partitions().get(0);
+ assertNull(topicPartitionInfo.leader());
+ assertEquals(1, topicPartitionInfo.lastKnownElr().size());
+ }
+ }
+
+ /*
+ This test is only valid for KIP-966 part 1. When the unclean recovery
is implemented, it should be removed.
+ */
+ @ClusterTest(types = {Type.KRAFT}, metadataVersion =
MetadataVersion.IBP_4_0_IV1)
+ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws
ExecutionException, InterruptedException {
+ try (var admin = clusterInstance.admin()) {
+ String testTopicName = String.format("%s-%s",
"testLastKnownLeaderShouldBeElectedIfEmptyElr", "ELR-test");
+
+ admin.updateFeatures(
+ Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
+ new
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE)),
+ new UpdateFeaturesOptions()).all().get();
+ admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short)
4))).all().get();
+ clusterInstance.waitTopicCreation(testTopicName, 1);
+
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+ Collection<AlterConfigOp> ops = new ArrayList<>();
+ ops.add(new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"),
AlterConfigOp.OpType.SET));
+ Map<ConfigResource, Collection<AlterConfigOp>> configOps =
Map.of(configResource, ops);
+ // alter configs on target cluster
+ admin.incrementalAlterConfigs(configOps).all().get();
+
+
+ TopicDescription testTopicDescription =
admin.describeTopics(List.of(testTopicName))
+ .allTopicNames().get().get(testTopicName);
+ TopicPartitionInfo topicPartitionInfo =
testTopicDescription.partitions().get(0);
+ List<Node> initialReplicas = topicPartitionInfo.replicas();
+ assertEquals(4, topicPartitionInfo.isr().size());
+ assertEquals(0, topicPartitionInfo.elr().size());
+ assertEquals(0, topicPartitionInfo.lastKnownElr().size());
+
+ clusterInstance.shutdownBroker(initialReplicas.get(0).id());
+ clusterInstance.shutdownBroker(initialReplicas.get(1).id());
+ clusterInstance.shutdownBroker(initialReplicas.get(2).id());
+ clusterInstance.shutdownBroker(initialReplicas.get(3).id());
+
+ waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize ==
3, admin, testTopicName);
+ topicPartitionInfo = admin.describeTopics(List.of(testTopicName))
+ .allTopicNames().get().get(testTopicName).partitions().get(0);
+ int lastKnownLeader =
topicPartitionInfo.lastKnownElr().get(0).id();
+
+ Set<Integer> initialReplicaSet = initialReplicas.stream().map(node
-> node.id()).collect(Collectors.toSet());
+ clusterInstance.brokers().forEach((id, broker) -> {
+ if (initialReplicaSet.contains(id)) {
+ List<File> dirs = new ArrayList<>();
+ broker.logManager().liveLogDirs().foreach(dirs::add);
+ assertEquals(1, dirs.size());
+ CleanShutdownFileHandler handler = new
CleanShutdownFileHandler(dirs.get(0).toString());
+ assertDoesNotThrow(handler::delete);
+ }
+ });
+
+ // After remove the clean shutdown file, the broker should report
unclean shutdown during restart.
+ topicPartitionInfo.replicas().forEach(replica -> {
+ if (replica.id() != lastKnownLeader)
clusterInstance.startBroker(replica.id());
+ });
+ waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize ==
1, admin, testTopicName);
+ topicPartitionInfo = admin.describeTopics(List.of(testTopicName))
+ .allTopicNames().get().get(testTopicName).partitions().get(0);
+ assertNull(topicPartitionInfo.leader());
+ assertEquals(1, topicPartitionInfo.lastKnownElr().size());
+
+ // Now if the last known leader goes through unclean shutdown, it
will still be elected.
+ clusterInstance.startBroker(lastKnownLeader);
+ waitForIsrAndElr((isrSize, elrSize) -> isrSize > 0 && elrSize ==
0, admin, testTopicName);
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ TopicPartitionInfo partition =
admin.describeTopics(List.of(testTopicName))
+
.allTopicNames().get().get(testTopicName).partitions().get(0);
+ if (partition.leader() == null) return false;
+ return partition.lastKnownElr().isEmpty() &&
partition.elr().isEmpty() && partition.leader().id() == lastKnownLeader;
+ } catch (Exception e) {
+ return false;
+ }
+ },
+ DEFAULT_MAX_WAIT_MS,
+ () -> String.format("Partition metadata for %s is not
correct", testTopicName)
+ );
+ }
+ }
+
+ void waitForIsrAndElr(BiFunction<Integer, Integer, Boolean>
isIsrAndElrSizeSatisfied, Admin admin, String testTopicName) throws
InterruptedException {
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ TopicDescription topicDescription =
admin.describeTopics(List.of(testTopicName))
+ .allTopicNames().get().get(testTopicName);
+ TopicPartitionInfo partition =
topicDescription.partitions().get(0);
+ return
isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size());
+ } catch (Exception e) {
+ return false;
+ }
+ },
+ DEFAULT_MAX_WAIT_MS,
+ () -> String.format("Partition metadata for %s is not propagated",
testTopicName)
+ );
+ }
+}