junrao commented on code in PR #18277: URL: https://github.com/apache/kafka/pull/18277#discussion_r1937822716
########## core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java: ########## @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.integration; +import kafka.integration.KafkaServerTestHarness; +import kafka.server.KafkaBroker; +import kafka.server.KafkaConfig; +import kafka.utils.Logging; +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.FeatureUpdate; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.UpdateFeaturesOptions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; +import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; +import scala.collection.Seq; +import scala.collection.mutable.HashMap; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarness implements Logging { + private String bootstrapServer; + private String testTopicName; + private Admin adminClient; + @Override + public Seq<KafkaConfig> generateConfigs() { + List<Properties> brokerConfigs = new ArrayList<>(); + brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs( + 5, + true, + true, + scala.Option.<SecurityProtocol>empty(), + scala.Option.<File>empty(), + scala.Option.<Properties>empty(), + true, + false, + false, + false, + new HashMap<>(), + 1, + false, + 1, + (short) 4, + 0, + false + ))); + List<KafkaConfig> configs = new ArrayList<>(); + for (Properties props : brokerConfigs) { + configs.add(KafkaConfig.fromProps(props)); + } + return JavaConverters.asScalaBuffer(configs).toSeq(); + } + + @BeforeEach + public void setUp(TestInfo info) { + super.setUp(info); + // create adminClient + Properties props = new Properties(); + bootstrapServer = bootstrapServers(listenerName()); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + adminClient = Admin.create(props); + adminClient.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() + ); + testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test"); + } + + @AfterEach + public void close() throws Exception { + if (adminClient != null) adminClient.close(); + super.tearDown(); + } + + @ParameterizedTest + @ValueSource(strings = {"kraft"}) + public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + adminClient.incrementalAlterConfigs(configOps).all().get(); + Producer producer = null; + Consumer consumer = null; + try { + // check which partition is on broker 0 which we'll kill + TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List<Node> initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + Properties producerProps = new Properties(); + producerProps.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + // Use Ack=1 for the producer. + producerProps.put(ProducerConfig.ACKS_CONFIG, "1"); + producer = new KafkaProducer(producerProps); + + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Collections.singleton(testTopicName)); + + producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get(); + Thread.sleep(1000); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1L)); + assertEquals(1, records.count()); + + killBroker(initialReplicas.get(0).id()); + killBroker(initialReplicas.get(1).id()); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 2 && elrSize == 1; + }); + + // Now the partition is under min ISR. HWM should not advance. + producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); + Thread.sleep(1000); Review Comment: 1 sec is quite long. How about 100ms? ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -335,7 +335,8 @@ Map<Integer, BrokerRegistration> brokerRegistrations() { public ControllerResult<BrokerRegistrationReply> registerBroker( BrokerRegistrationRequestData request, long newBrokerEpoch, - FinalizedControllerFeatures finalizedFeatures + FinalizedControllerFeatures finalizedFeatures, + boolean uncleanShutdownDetectionEnabled Review Comment: uncleanShutdownDetectionEnabled => cleanShutdownDetectionEnabled ? ########## metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java: ########## @@ -855,13 +866,65 @@ public void testReRegistrationAndBrokerEpoch(boolean newIncarnationId) { } } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testReRegistrationWithUncleanShutdownDetection(boolean isCleanShutdown) { Review Comment: testReRegistrationWithUncleanShutdownDetection => testReRegistrationWithCleanShutdownDetection ? ########## metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java: ########## @@ -460,39 +460,40 @@ public void testUncleanShutdownBrokerElrEnabled() throws Throwable { int brokerToUncleanShutdown, brokerToBeTheLeader; // lastKnownElr stores the last known leader. + brokerToUncleanShutdown = lastKnownElr[0]; if (lastKnownElr[0] == partition.elr[0]) { - brokerToUncleanShutdown = partition.elr[0]; brokerToBeTheLeader = partition.elr[1]; } else { - brokerToUncleanShutdown = partition.elr[1]; brokerToBeTheLeader = partition.elr[0]; } - // Unclean shutdown should remove the ELR members. - active.registerBroker( + // Unclean shutdown should remove brokerToUncleanShutdown from the ELR members, but it should still be in + // the lastKnownElr. + CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker( anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData(). setBrokerId(brokerToUncleanShutdown). setClusterId(active.clusterId()). setFeatures(features). setIncarnationId(Uuid.randomUuid()). setLogDirs(Collections.singletonList(Uuid.randomUuid())). - setListeners(listeners)).get(); + setListeners(listeners)); + brokerEpochs.put(brokerToUncleanShutdown, reply.get().epoch()); partition = active.replicationControl().getPartition(topicIdFoo, 0); assertArrayEquals(new int[]{brokerToBeTheLeader}, partition.elr, partition.toString()); + assertArrayEquals(lastKnownElr, partition.lastKnownElr, partition.toString()); // Unclean shutdown should not remove the last known ELR members. - active.registerBroker( + reply = active.registerBroker( Review Comment: reply is unused. ########## metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java: ########## @@ -855,13 +866,65 @@ public void testReRegistrationAndBrokerEpoch(boolean newIncarnationId) { } } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testReRegistrationWithUncleanShutdownDetection(boolean isCleanShutdown) { + ClusterControlManager clusterControl = new ClusterControlManager.Builder(). + setClusterId("pjvUwj3ZTEeSVQmUiH3IJw"). + setFeatureControlManager(new FeatureControlManager.Builder().build()). + setBrokerShutdownHandler((brokerId, cleanShutdown, records) -> { + if (!cleanShutdown) { + records.add(new ApiMessageAndVersion(new PartitionChangeRecord(), PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION)); + } + }). + build(); + clusterControl.activate(); + List<ApiMessageAndVersion> records = clusterControl.registerBroker( + new BrokerRegistrationRequestData(). + setBrokerId(1). + setClusterId(clusterControl.clusterId()). + setIncarnationId(Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")). + setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))), + 100, + new FinalizedControllerFeatures(Collections.emptyMap(), 100L), + true). + records(); + records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord(). + setBrokerId(1).setBrokerEpoch(100). + setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()), + (short) 1)); + RecordTestUtils.replayAll(clusterControl, records); + + records = clusterControl.registerBroker( + new BrokerRegistrationRequestData(). + setBrokerId(1). + setClusterId(clusterControl.clusterId()). + setIncarnationId(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww")). + setPreviousBrokerEpoch(isCleanShutdown ? 100 : 10). + setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))), + 111, + new FinalizedControllerFeatures(Collections.emptyMap(), 100L), + true).records(); + RecordTestUtils.replayAll(clusterControl, records); + assertEquals(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww"), + clusterControl.brokerRegistrations().get(1).incarnationId()); + assertFalse(clusterControl.brokerRegistrations().get(1).inControlledShutdown()); + if (isCleanShutdown) { + assertEquals(100, clusterControl.brokerRegistrations().get(1).epoch()); Review Comment: This seems incorrect. Independent of whether the shutdown was clean or not, the new broker epoch should always be reflected after broker registration. ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -1461,20 +1461,21 @@ void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List<ApiMe } /** - * Create partition change records to remove replicas from any ISR or ELR for brokers doing unclean shutdown. + * Create partition change records to remove replicas from any ISR or ELR for brokers when the shutdown is detected. * - * @param brokerId The broker id. - * @param records The record list to append to. + * @param brokerId The broker id. Review Comment: The broker id => The broker id to be shut down ########## core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java: ########## @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.integration; +import kafka.integration.KafkaServerTestHarness; +import kafka.server.KafkaBroker; +import kafka.server.KafkaConfig; +import kafka.utils.Logging; +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.FeatureUpdate; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.UpdateFeaturesOptions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; +import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; +import scala.collection.Seq; +import scala.collection.mutable.HashMap; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarness implements Logging { + private String bootstrapServer; + private String testTopicName; + private Admin adminClient; + @Override + public Seq<KafkaConfig> generateConfigs() { + List<Properties> brokerConfigs = new ArrayList<>(); + brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs( + 5, + true, + true, + scala.Option.<SecurityProtocol>empty(), + scala.Option.<File>empty(), + scala.Option.<Properties>empty(), + true, + false, + false, + false, + new HashMap<>(), + 1, + false, + 1, + (short) 4, + 0, + false + ))); + List<KafkaConfig> configs = new ArrayList<>(); + for (Properties props : brokerConfigs) { + configs.add(KafkaConfig.fromProps(props)); + } + return JavaConverters.asScalaBuffer(configs).toSeq(); + } + + @BeforeEach + public void setUp(TestInfo info) { + super.setUp(info); + // create adminClient + Properties props = new Properties(); + bootstrapServer = bootstrapServers(listenerName()); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + adminClient = Admin.create(props); + adminClient.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() + ); + testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test"); + } + + @AfterEach + public void close() throws Exception { + if (adminClient != null) adminClient.close(); + super.tearDown(); + } + + @ParameterizedTest + @ValueSource(strings = {"kraft"}) + public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + adminClient.incrementalAlterConfigs(configOps).all().get(); + Producer producer = null; + Consumer consumer = null; + try { + // check which partition is on broker 0 which we'll kill Review Comment: There is only 1 partition. Ditto below. ########## core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java: ########## @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.integration; +import kafka.integration.KafkaServerTestHarness; +import kafka.server.KafkaBroker; +import kafka.server.KafkaConfig; +import kafka.utils.Logging; +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.FeatureUpdate; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.UpdateFeaturesOptions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; +import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; +import scala.collection.Seq; +import scala.collection.mutable.HashMap; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarness implements Logging { + private String bootstrapServer; + private String testTopicName; + private Admin adminClient; + @Override + public Seq<KafkaConfig> generateConfigs() { + List<Properties> brokerConfigs = new ArrayList<>(); + brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs( + 5, + true, + true, + scala.Option.<SecurityProtocol>empty(), + scala.Option.<File>empty(), + scala.Option.<Properties>empty(), + true, + false, + false, + false, + new HashMap<>(), + 1, + false, + 1, + (short) 4, + 0, + false + ))); + List<KafkaConfig> configs = new ArrayList<>(); + for (Properties props : brokerConfigs) { + configs.add(KafkaConfig.fromProps(props)); + } + return JavaConverters.asScalaBuffer(configs).toSeq(); + } + + @BeforeEach + public void setUp(TestInfo info) { + super.setUp(info); + // create adminClient + Properties props = new Properties(); + bootstrapServer = bootstrapServers(listenerName()); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + adminClient = Admin.create(props); + adminClient.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() + ); + testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test"); + } + + @AfterEach + public void close() throws Exception { + if (adminClient != null) adminClient.close(); + super.tearDown(); + } + + @ParameterizedTest + @ValueSource(strings = {"kraft"}) + public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + adminClient.incrementalAlterConfigs(configOps).all().get(); + Producer producer = null; + Consumer consumer = null; + try { + // check which partition is on broker 0 which we'll kill + TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List<Node> initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + Properties producerProps = new Properties(); + producerProps.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + // Use Ack=1 for the producer. + producerProps.put(ProducerConfig.ACKS_CONFIG, "1"); + producer = new KafkaProducer(producerProps); + + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Collections.singleton(testTopicName)); + + producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get(); + Thread.sleep(1000); Review Comment: Hmm, why do we need to sleep here? Could we use `waitUntil` ? ########## core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java: ########## @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.integration; +import kafka.integration.KafkaServerTestHarness; +import kafka.server.KafkaBroker; +import kafka.server.KafkaConfig; +import kafka.utils.Logging; +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.FeatureUpdate; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.UpdateFeaturesOptions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; +import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; +import scala.collection.Seq; +import scala.collection.mutable.HashMap; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarness implements Logging { + private String bootstrapServer; + private String testTopicName; + private Admin adminClient; + @Override + public Seq<KafkaConfig> generateConfigs() { + List<Properties> brokerConfigs = new ArrayList<>(); + brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs( + 5, + true, + true, + scala.Option.<SecurityProtocol>empty(), + scala.Option.<File>empty(), + scala.Option.<Properties>empty(), + true, + false, + false, + false, + new HashMap<>(), + 1, + false, + 1, + (short) 4, + 0, + false + ))); + List<KafkaConfig> configs = new ArrayList<>(); + for (Properties props : brokerConfigs) { + configs.add(KafkaConfig.fromProps(props)); + } + return JavaConverters.asScalaBuffer(configs).toSeq(); + } + + @BeforeEach + public void setUp(TestInfo info) { + super.setUp(info); + // create adminClient + Properties props = new Properties(); + bootstrapServer = bootstrapServers(listenerName()); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + adminClient = Admin.create(props); + adminClient.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() + ); + testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test"); + } + + @AfterEach + public void close() throws Exception { + if (adminClient != null) adminClient.close(); + super.tearDown(); + } + + @ParameterizedTest + @ValueSource(strings = {"kraft"}) + public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + adminClient.incrementalAlterConfigs(configOps).all().get(); + Producer producer = null; + Consumer consumer = null; + try { + // check which partition is on broker 0 which we'll kill + TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List<Node> initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + Properties producerProps = new Properties(); + producerProps.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + // Use Ack=1 for the producer. + producerProps.put(ProducerConfig.ACKS_CONFIG, "1"); + producer = new KafkaProducer(producerProps); + + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Collections.singleton(testTopicName)); + + producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get(); + Thread.sleep(1000); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1L)); + assertEquals(1, records.count()); + + killBroker(initialReplicas.get(0).id()); + killBroker(initialReplicas.get(1).id()); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 2 && elrSize == 1; + }); + + // Now the partition is under min ISR. HWM should not advance. + producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); + Thread.sleep(1000); + records = consumer.poll(Duration.ofSeconds(1L)); + assertEquals(0, records.count()); + + // Restore the min ISR and the previous log should be visible. + startBroker(initialReplicas.get(1).id()); + startBroker(initialReplicas.get(0).id()); + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 4 && elrSize == 0; + }); + + Consumer finalConsumer = consumer; + kafka.utils.TestUtils.waitUntilTrue( + () -> { + try { + ConsumerRecords record = finalConsumer.poll(Duration.ofMillis(100L)); + return record.count() == 1; + } catch (Exception e) { + return false; + } + }, + () -> "fail to consume messages", + org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L + ); + } finally { + restartDeadBrokers(false); + if (consumer != null) consumer.close(); + if (producer != null) producer.close(); + } + } + + @ParameterizedTest + @ValueSource(strings = {"kraft"}) + public void testElrMemberCanBeElected(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + adminClient.incrementalAlterConfigs(configOps).all().get(); + + try { + // check which partition is on broker 0 which we'll kill + TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List<Node> initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + killBroker(initialReplicas.get(0).id()); + killBroker(initialReplicas.get(1).id()); + killBroker(initialReplicas.get(2).id()); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 1 && elrSize == 2; + }); + + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertEquals(2, topicPartitionInfo.elr().size()); Review Comment: This seems unnecessary since it's covered by the `waitForIsrAndElr` call earlier? ########## core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java: ########## @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.integration; +import kafka.integration.KafkaServerTestHarness; +import kafka.server.KafkaBroker; +import kafka.server.KafkaConfig; +import kafka.utils.Logging; +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.FeatureUpdate; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.UpdateFeaturesOptions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; +import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; +import scala.collection.Seq; +import scala.collection.mutable.HashMap; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarness implements Logging { + private String bootstrapServer; + private String testTopicName; + private Admin adminClient; + @Override + public Seq<KafkaConfig> generateConfigs() { + List<Properties> brokerConfigs = new ArrayList<>(); + brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs( + 5, + true, + true, + scala.Option.<SecurityProtocol>empty(), + scala.Option.<File>empty(), + scala.Option.<Properties>empty(), + true, + false, + false, + false, + new HashMap<>(), + 1, + false, + 1, + (short) 4, + 0, + false + ))); + List<KafkaConfig> configs = new ArrayList<>(); + for (Properties props : brokerConfigs) { + configs.add(KafkaConfig.fromProps(props)); + } + return JavaConverters.asScalaBuffer(configs).toSeq(); + } + + @BeforeEach + public void setUp(TestInfo info) { + super.setUp(info); + // create adminClient + Properties props = new Properties(); + bootstrapServer = bootstrapServers(listenerName()); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + adminClient = Admin.create(props); + adminClient.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() + ); + testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test"); + } + + @AfterEach + public void close() throws Exception { + if (adminClient != null) adminClient.close(); + super.tearDown(); + } + + @ParameterizedTest + @ValueSource(strings = {"kraft"}) + public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + adminClient.incrementalAlterConfigs(configOps).all().get(); + Producer producer = null; + Consumer consumer = null; + try { + // check which partition is on broker 0 which we'll kill + TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List<Node> initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + Properties producerProps = new Properties(); + producerProps.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + // Use Ack=1 for the producer. + producerProps.put(ProducerConfig.ACKS_CONFIG, "1"); + producer = new KafkaProducer(producerProps); + + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Collections.singleton(testTopicName)); + + producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get(); + Thread.sleep(1000); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1L)); + assertEquals(1, records.count()); + + killBroker(initialReplicas.get(0).id()); + killBroker(initialReplicas.get(1).id()); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 2 && elrSize == 1; + }); + + // Now the partition is under min ISR. HWM should not advance. + producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); + Thread.sleep(1000); + records = consumer.poll(Duration.ofSeconds(1L)); + assertEquals(0, records.count()); + + // Restore the min ISR and the previous log should be visible. + startBroker(initialReplicas.get(1).id()); + startBroker(initialReplicas.get(0).id()); + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 4 && elrSize == 0; + }); + + Consumer finalConsumer = consumer; + kafka.utils.TestUtils.waitUntilTrue( + () -> { + try { + ConsumerRecords record = finalConsumer.poll(Duration.ofMillis(100L)); + return record.count() == 1; + } catch (Exception e) { + return false; + } + }, + () -> "fail to consume messages", + org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L + ); + } finally { + restartDeadBrokers(false); + if (consumer != null) consumer.close(); + if (producer != null) producer.close(); + } + } + + @ParameterizedTest + @ValueSource(strings = {"kraft"}) + public void testElrMemberCanBeElected(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + adminClient.incrementalAlterConfigs(configOps).all().get(); + + try { + // check which partition is on broker 0 which we'll kill + TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List<Node> initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + killBroker(initialReplicas.get(0).id()); + killBroker(initialReplicas.get(1).id()); + killBroker(initialReplicas.get(2).id()); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 1 && elrSize == 2; + }); + + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertEquals(2, topicPartitionInfo.elr().size()); + + killBroker(initialReplicas.get(3).id()); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 0 && elrSize == 3; + }); + + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertEquals(1, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); + int expectLastKnownLeader = initialReplicas.get(3).id(); + assertEquals(expectLastKnownLeader, topicPartitionInfo.lastKnownElr().get(0).id(), topicPartitionInfo.toString()); + + // At this point, all the replicas are failed and the last know leader is No.3 and 3 members in the ELR. + // Restart one broker of the ELR and it should be the leader. + + int expectLeader = topicPartitionInfo.elr().stream() + .filter(node -> node.id() != expectLastKnownLeader).collect(Collectors.toList()).get(0).id(); + + startBroker(expectLeader); + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 1 && elrSize == 2; + }); + + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); + assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); + + // Start another 2 brokers and the ELR fields should be cleaned. + topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2) + .forEach(node -> startBroker(node.id())); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 3 && elrSize == 0; + }); + + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); + assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); + } finally { + restartDeadBrokers(false); + } + } + + @ParameterizedTest + @ValueSource(strings = {"kraft"}) + public void testElrMemberShouldBeKickOutWhenUncleanShutdown(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + adminClient.incrementalAlterConfigs(configOps).all().get(); + + try { + // check which partition is on broker 0 which we'll kill + TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List<Node> initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + killBroker(initialReplicas.get(0).id()); + killBroker(initialReplicas.get(1).id()); + killBroker(initialReplicas.get(2).id()); + killBroker(initialReplicas.get(3).id()); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 0 && elrSize == 3; + }); + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + + int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id(); + KafkaBroker broker = brokers().find(b -> { + return b.config().brokerId() == brokerToBeUncleanShutdown; + }).get(); + Seq<File> dirs = broker.logManager().liveLogDirs(); + assertEquals(1, dirs.size()); + CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); + assertTrue(handler.exists()); + assertDoesNotThrow(() -> handler.delete()); + + // After remove the clean shutdown file, the broker should report unclean shutdown during restart. + startBroker(brokerToBeUncleanShutdown); + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 0 && elrSize == 2; + }); + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertTrue(topicPartitionInfo.leader() == null); + assertEquals(1, topicPartitionInfo.lastKnownElr().size()); + } finally { + restartDeadBrokers(false); + } + } + + /* + This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed. + */ + @ParameterizedTest + @ValueSource(strings = {"kraft"}) + public void testLastKnownLeaderShouldBeElectedIfEmptyElr(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + adminClient.incrementalAlterConfigs(configOps).all().get(); + + try { + // check which partition is on broker 0 which we'll kill + TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List<Node> initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + killBroker(initialReplicas.get(0).id()); + killBroker(initialReplicas.get(1).id()); + killBroker(initialReplicas.get(2).id()); + killBroker(initialReplicas.get(3).id()); + + waitForIsrAndElr((isrSize, elrSize) -> { + return isrSize == 0 && elrSize == 3; + }); + topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id(); + + brokers().foreach(broker -> { Review Comment: One of the brokers is not killed. We should avoid deleting its file, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
