ahuang98 commented on code in PR #20645:
URL: https://github.com/apache/kafka/pull/20645#discussion_r2604837819
##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -649,88 +649,115 @@ public void testMinIsrUpdateWithElr() throws Throwable {
// Unfence all brokers and create a topic foo (min ISR 2)
sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers,
brokerEpochs);
- CreateTopicsRequestData createTopicsRequestData = new
CreateTopicsRequestData().setTopics(
- new CreatableTopicCollection(List.of(
- new CreatableTopic().setName("foo").setNumPartitions(1).
- setReplicationFactor(replicationFactor),
- new CreatableTopic().setName("bar").setNumPartitions(1).
- setReplicationFactor(replicationFactor)
- ).iterator()));
- CreateTopicsResponseData createTopicsResponseData =
active.createTopics(
- ANONYMOUS_CONTEXT, createTopicsRequestData,
- Set.of("foo", "bar")).get();
- assertEquals(Errors.NONE,
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
- assertEquals(Errors.NONE,
Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
- Uuid topicIdFoo =
createTopicsResponseData.topics().find("foo").topicId();
- Uuid topicIdBar =
createTopicsResponseData.topics().find("bar").topicId();
- ConfigRecord configRecord = new ConfigRecord()
- .setResourceType(BROKER.id())
- .setResourceName("")
- .setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
- .setValue("2");
- RecordTestUtils.replayAll(active.configurationControl(),
List.of(new ApiMessageAndVersion(configRecord, (short) 0)));
- // Fence brokers
- TestUtils.waitForCondition(() -> {
- sendBrokerHeartbeatToUnfenceBrokers(active,
brokersToKeepUnfenced, brokerEpochs);
- for (Integer brokerId : brokersToFence) {
- if (active.clusterControl().isUnfenced(brokerId)) {
- return false;
- }
+ // Heartbeat pumper
+ final java.util.concurrent.ScheduledExecutorService hbExec =
+
java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
+ final java.util.concurrent.atomic.AtomicBoolean keepOnly =
+ new java.util.concurrent.atomic.AtomicBoolean(false);
+ final long periodMs = Math.max(50L, sessionTimeoutMillis / 3);
+
+ hbExec.scheduleAtFixedRate(() -> {
+ try {
+ if (keepOnly.get()) {
+ sendBrokerHeartbeatToUnfenceBrokers(active,
brokersToKeepUnfenced, brokerEpochs);
+ } else {
+ sendBrokerHeartbeatToUnfenceBrokers(active,
allBrokers, brokerEpochs);
}
- return true;
- }, sessionTimeoutMillis * 30,
- "Fencing of brokers did not process within expected time"
- );
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }, 0L, periodMs, java.util.concurrent.TimeUnit.MILLISECONDS);
- // Send another heartbeat to the brokers we want to keep alive
- sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced,
brokerEpochs);
+ try {
+ CreateTopicsRequestData createTopicsRequestData = new
CreateTopicsRequestData().setTopics(
+ new CreatableTopicCollection(List.of(
+ new
CreatableTopic().setName("foo").setNumPartitions(1).
+ setReplicationFactor(replicationFactor),
+ new
CreatableTopic().setName("bar").setNumPartitions(1).
+ setReplicationFactor(replicationFactor)
+ ).iterator()));
+ CreateTopicsResponseData createTopicsResponseData =
active.createTopics(
+ ANONYMOUS_CONTEXT, createTopicsRequestData,
+ Set.of("foo", "bar")).get();
+ assertEquals(Errors.NONE,
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
+ assertEquals(Errors.NONE,
Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
+ Uuid topicIdFoo =
createTopicsResponseData.topics().find("foo").topicId();
+ Uuid topicIdBar =
createTopicsResponseData.topics().find("bar").topicId();
+ ConfigRecord configRecord = new ConfigRecord()
+ .setResourceType(BROKER.id())
+ .setResourceName("")
+ .setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
+ .setValue("2");
+ RecordTestUtils.replayAll(active.configurationControl(),
List.of(new ApiMessageAndVersion(configRecord, (short) 0)));
+
+ // Before fencing wait, switch pumper to only keep
brokersToKeepUnfenced alive
+ keepOnly.set(true);
+
+ // Fence brokers
+ TestUtils.waitForCondition(() -> {
+ sendBrokerHeartbeatToUnfenceBrokers(active,
brokersToKeepUnfenced, brokerEpochs);
+ for (Integer brokerId : brokersToFence) {
+ if (active.clusterControl().isUnfenced(brokerId)) {
+ return false;
+ }
+ }
+ return true;
+ }, sessionTimeoutMillis * 30,
+ "Fencing of brokers did not process within expected time"
+ );
- // At this point only the brokers we want to fence (broker 2, 3)
should be fenced.
- brokersToKeepUnfenced.forEach(brokerId -> {
- assertTrue(active.clusterControl().isUnfenced(brokerId),
- "Broker " + brokerId + " should have been unfenced");
- });
- brokersToFence.forEach(brokerId -> {
- assertFalse(active.clusterControl().isUnfenced(brokerId),
- "Broker " + brokerId + " should have been fenced");
- });
- sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced,
brokerEpochs);
+ // Send another heartbeat to the brokers we want to keep alive
+ sendBrokerHeartbeatToUnfenceBrokers(active,
brokersToKeepUnfenced, brokerEpochs);
- // Verify the isr and elr for the topic partition
- PartitionRegistration partition =
active.replicationControl().getPartition(topicIdFoo, 0);
- assertArrayEquals(new int[]{1}, partition.isr,
partition.toString());
+ // At this point only the brokers we want to fence (broker 2,
3) should be fenced.
+ brokersToKeepUnfenced.forEach(brokerId -> {
+ assertTrue(active.clusterControl().isUnfenced(brokerId),
+ "Broker " + brokerId + " should have been unfenced");
+ });
+ brokersToFence.forEach(brokerId -> {
+ assertFalse(active.clusterControl().isUnfenced(brokerId),
+ "Broker " + brokerId + " should have been fenced");
+ });
+ sendBrokerHeartbeatToUnfenceBrokers(active,
brokersToKeepUnfenced, brokerEpochs);
- // The ELR set is not determined but the size is 1.
- assertEquals(1, partition.elr.length, partition.toString());
+ // Verify the isr and elr for the topic partition
Review Comment:
hm, what is the heart of the problem here - is it that by the time we've
last unfenced the brokers and then requested the partition information,
one/more of the brokers has already become fenced again?
I'm assuming based off the context of the Jira that the test flakes at one
of the `assertArrayEquals(new int[]{1}, partition.isr, partition.toString());`
assertions. Is it not enough to just call unfence before every assertion?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]