showuon commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2548357630
##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +219,70 @@ public void testControllersAutoJoinStandaloneVoter()
throws Exception {
}
}
+ @Test
+ public void testRemovedControllerWontJoinAgain() throws Exception {
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(), controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG,
true).
+ setInitialVoterSet(initialVoters).
+ build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (var admin = Admin.create(cluster.clientProperties())) {
+ // Static voter set is initialized
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+ AtomicLong removedAtHighWatermark = new AtomicLong();
+ // Remove 3002 from the voter set
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ if (!voters.containsKey(3002)) {
+ // if there are no node 3002, it should be return
+
removedAtHighWatermark.set(cluster.controllers().get(3002).raftManager().client().highWatermark().getAsLong());
+ return;
+ }
+
+ admin.removeRaftVoter(3002, voters.get(3002)).all().get();
+ assertEquals(Set.of(3000, 3001), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+ TestUtils.waitForCondition(() ->
cluster.controllers().get(3002).raftManager().client().highWatermark().getAsLong()
+ > removedAtHighWatermark.get() + 10,
+ 30_000, 100, () -> "High watermark is not advanced in
30000ms"
+ );
+
+ // 3002 does not join the voter set after high watermark
advance
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
Review Comment:
This is not right. Here, we want to verify `3002 does not join the voter set
after high watermark advance`, but you're testing if [3000, 3001] is in the
voters set. So, if 3002 is joined the voters, the test can still pass, right?
##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +219,70 @@ public void testControllersAutoJoinStandaloneVoter()
throws Exception {
}
}
+ @Test
+ public void testRemovedControllerWontJoinAgain() throws Exception {
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(), controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG,
true).
+ setInitialVoterSet(initialVoters).
+ build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (var admin = Admin.create(cluster.clientProperties())) {
+ // Static voter set is initialized
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+ AtomicLong removedAtHighWatermark = new AtomicLong();
+ // Remove 3002 from the voter set
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ if (!voters.containsKey(3002)) {
+ // if there are no node 3002, it should be return
+
removedAtHighWatermark.set(cluster.controllers().get(3002).raftManager().client().highWatermark().getAsLong());
+ return;
+ }
+
+ admin.removeRaftVoter(3002, voters.get(3002)).all().get();
+ assertEquals(Set.of(3000, 3001), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
Review Comment:
I think you don't understand what the method `retryOnExceptionWithTimeout`
is doing. You can check the implementation of the
`retryOnExceptionWithTimeout`. Basically, if the runnable has no exception
thrown, it won't retry.
So, with current logic, this could happen:
1. At L257, 3002 is in the voter, so jump to L263
2. After L263, the assertion in L264~ L266 passed without error.
3. Exit the `retryOnExceptionWithTimeout` method and go to L270 to check the
`removedAtHighWatermark` (which is not set at all).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]