This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 0b683ddc7 IGNITE-17100 Make JoinReady command idempotent (#853) 0b683ddc7 is described below commit 0b683ddc79a35b7c34c1a35d4d0cc8f6ee71d78d Author: Alexander Polovtcev <alex.polovt...@gmail.com> AuthorDate: Mon Jun 6 11:33:07 2022 +0300 IGNITE-17100 Make JoinReady command idempotent (#853) --- .../management/raft/ItCmgRaftServiceTest.java | 40 ++++++-- .../management/ClusterManagementGroupManager.java | 2 +- .../management/raft/CmgRaftGroupListener.java | 6 ++ .../cluster/management/raft/CmgRaftService.java | 4 + .../management/raft/RaftStorageManager.java | 17 +++- .../cluster/management/raft/ValidationManager.java | 4 +- .../management/raft/CmgRaftGroupListenerTest.java | 101 +++++++++++++++++++++ .../raft/ConcurrentMapClusterStateStorage.java | 4 + .../org/apache/ignite/internal/app/IgniteImpl.java | 12 ++- .../ignite/internal/app/LifecycleManager.java | 2 + 10 files changed, 175 insertions(+), 17 deletions(-) diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java index 92d6a89e7..534566ad2 100644 --- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java +++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java @@ -37,6 +37,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.cluster.management.ClusterState; import org.apache.ignite.internal.cluster.management.ClusterTag; +import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand; +import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand; import org.apache.ignite.internal.properties.IgniteProductVersion; import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.testframework.WorkDirectory; @@ -91,6 +93,8 @@ public class ItCmgRaftServiceTest { () -> new CmgRaftGroupListener(raftStorage) ); + assertThat(raftService, willCompleteSuccessfully()); + this.raftService = new CmgRaftService(raftService.get(), clusterService); } @@ -361,7 +365,7 @@ public class ItCmgRaftServiceTest { // Node has not passed validation. String errMsg = String.format( "JoinReady request denied, reason: Node \"%s\" has not yet passed the validation step", - cluster.get(0).clusterService.topologyService().localMember().id() + cluster.get(0).clusterService.topologyService().localMember() ); assertThrowsWithCause( @@ -374,13 +378,6 @@ public class ItCmgRaftServiceTest { // Everything is ok after the node has passed validation. assertThat(raftService.completeJoinCluster(), willCompleteSuccessfully()); - - // Validation state is cleared after the first successful attempt. - assertThrowsWithCause( - () -> raftService.completeJoinCluster().get(10, TimeUnit.SECONDS), - IgniteInternalException.class, - errMsg - ); } /** @@ -471,4 +468,31 @@ public class ItCmgRaftServiceTest { ) ); } + + /** + * Tests that {@link JoinRequestCommand} and {@link JoinReadyCommand} are idempotent. + */ + @Test + void testJoinCommandsIdempotence() { + ClusterState state = new ClusterState( + List.of("foo"), + List.of("bar"), + IgniteProductVersion.CURRENT_VERSION, + new ClusterTag("cluster") + ); + + assertThat(cluster.get(0).raftService.initClusterState(state), willCompleteSuccessfully()); + + CmgRaftService service = cluster.get(1).raftService; + + assertThat(service.startJoinCluster(state.clusterTag()), willCompleteSuccessfully()); + + assertThat(service.startJoinCluster(state.clusterTag()), willCompleteSuccessfully()); + + assertThat(service.completeJoinCluster(), willCompleteSuccessfully()); + + assertThat(service.completeJoinCluster(), willCompleteSuccessfully()); + + assertThat(service.completeJoinCluster(), willCompleteSuccessfully()); + } } diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java index d825f1d6d..c26653f8b 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java @@ -256,7 +256,7 @@ public class ClusterManagementGroupManager implements IgniteComponent { synchronized (raftServiceLock) { if (raftService == null) { // Raft service has not been started - LOG.info("Init command received, starting the CMG: {}", msg); + LOG.info("Init command received, starting the CMG on: {}", msg.cmgNodes()); raftService = startCmgRaftService(msg.cmgNodes()); } else { diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java index 17e085e5a..5d138a1a8 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java @@ -43,6 +43,7 @@ import org.apache.ignite.raft.client.WriteCommand; import org.apache.ignite.raft.client.service.CommandClosure; import org.apache.ignite.raft.client.service.RaftGroupListener; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; /** * {@link RaftGroupListener} implementation for the CMG. @@ -176,4 +177,9 @@ public class CmgRaftGroupListener implements RaftGroupListener { public @Nullable CompletableFuture<Void> onBeforeApply(Command command) { return null; } + + @TestOnly + public RaftStorageManager storage() { + return storage; + } } diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java index 338317a6f..255bdc06a 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java @@ -129,6 +129,8 @@ public class CmgRaftService { throw new JoinDeniedException("Join request denied, reason: " + ((ValidationErrorResponse) response).reason()); } else if (response != null) { throw new IgniteInternalException("Unexpected response: " + response); + } else { + LOG.info("JoinRequest command executed successfully"); } }); } @@ -150,6 +152,8 @@ public class CmgRaftService { + ((ValidationErrorResponse) response).reason()); } else if (response != null) { throw new IgniteInternalException("Unexpected response: " + response); + } else { + LOG.info("JoinReady command executed successfully"); } }); } diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java index ef81d6c31..13520bd17 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java @@ -110,14 +110,21 @@ class RaftStorageManager { storage.removeAll(keys); } + /** + * Returns {@code true} if a given node is present in the logical topology or {@code false} otherwise. + */ + boolean isNodeInLogicalTopology(ClusterNode node) { + byte[] value = storage.get(logicalTopologyKey(node)); + + return value != null; + } + private static byte[] logicalTopologyKey(ClusterNode node) { return prefixedKey(LOGICAL_TOPOLOGY_PREFIX, node.id()); } /** - * Retrieves the validation token for a given node. - * - * @return Validation token or {@code null} if it does not exist. + * Returns {@code true} if a given node has been previously validated or {@code false} otherwise. */ boolean isNodeValidated(String nodeId) { byte[] value = storage.get(validatedNodeKey(nodeId)); @@ -126,14 +133,14 @@ class RaftStorageManager { } /** - * Saves the validation token for a given node. + * Marks the given node as validated. */ void putValidatedNode(String nodeId) { storage.put(validatedNodeKey(nodeId), EMPTY_VALUE); } /** - * Removes the validation token for a given node. + * Removes the given node from the validated node set. */ void removeValidatedNode(String nodeId) { storage.remove(validatedNodeKey(nodeId)); diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java index 7abb2c5a9..79dcfb0d7 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java @@ -150,8 +150,8 @@ class ValidationManager implements AutoCloseable { ValidationResult completeValidation(ClusterNode node) { String nodeId = node.id(); - if (!storage.isNodeValidated(nodeId)) { - return ValidationResult.errorResult(String.format("Node \"%s\" has not yet passed the validation step", nodeId)); + if (!storage.isNodeValidated(nodeId) && !storage.isNodeInLogicalTopology(node)) { + return ValidationResult.errorResult(String.format("Node \"%s\" has not yet passed the validation step", node)); } Future<?> cleanupFuture = cleanupFutures.remove(nodeId); diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java new file mode 100644 index 000000000..692abfe16 --- /dev/null +++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster.management.raft; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import org.apache.ignite.internal.cluster.management.ClusterState; +import org.apache.ignite.internal.cluster.management.ClusterTag; +import org.apache.ignite.internal.cluster.management.raft.commands.InitCmgStateCommand; +import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand; +import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand; +import org.apache.ignite.internal.properties.IgniteProductVersion; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.raft.client.Command; +import org.apache.ignite.raft.client.service.CommandClosure; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Tests for the {@link CmgRaftGroupListener}. + */ +public class CmgRaftGroupListenerTest { + private final ClusterStateStorage storage = new ConcurrentMapClusterStateStorage(); + + private final CmgRaftGroupListener listener = new CmgRaftGroupListener(storage); + + @BeforeEach + void setUp() { + storage.start(); + } + + @AfterEach + void tearDown() throws Exception { + storage.close(); + } + + /** + * Test that validated node IDs get added and removed from the storage. + */ + @Test + void testValidatedNodeIds() { + var state = new ClusterState( + List.of("foo"), + List.of("bar"), + IgniteProductVersion.CURRENT_VERSION, + new ClusterTag("cluster") + ); + + var node = new ClusterNode("foo", "bar", new NetworkAddress("localhost", 666)); + + listener.onWrite(iterator(new InitCmgStateCommand(node, state))); + + listener.onWrite(iterator(new JoinRequestCommand(node, state.igniteVersion(), state.clusterTag()))); + + assertThat(listener.storage().getValidatedNodeIds(), contains(node.id())); + + listener.onWrite(iterator(new JoinReadyCommand(node))); + + assertThat(listener.storage().getValidatedNodeIds(), is(empty())); + } + + private static <T extends Command> Iterator<CommandClosure<T>> iterator(T obj) { + CommandClosure<T> closure = new CommandClosure<>() { + @Override + public T command() { + return obj; + } + + @Override + public void result(@Nullable Serializable res) { + // no-op. + } + }; + + return List.of(closure).iterator(); + } +} diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java index 185e9bbc4..ef89843ef 100644 --- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java +++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java @@ -83,6 +83,10 @@ public class ConcurrentMapClusterStateStorage implements ClusterStateStorage { .filter(e -> { byte[] key = e.getKey().bytes(); + if (key.length < prefix.length) { + return false; + } + return Arrays.equals(key, 0, prefix.length, prefix, 0, prefix.length); }) .map(e -> entryTransformer.apply(e.getKey().bytes(), e.getValue())) diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 9fd9b5f69..354f22191 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -419,9 +419,13 @@ public class IgniteImpl implements Ignite { cmgMgr ); + LOG.info("Components started, joining the cluster"); + return cmgMgr.joinFuture() // using the default executor to avoid blocking the CMG Manager threads .thenRunAsync(() -> { + LOG.info("Join complete, starting the remaining components"); + // Start all other components after the join request has completed and the node has been validated. try { lifecycleManager.startComponents( @@ -441,6 +445,8 @@ public class IgniteImpl implements Ignite { } }) .thenCompose(v -> { + LOG.info("Components started, performing recovery"); + // Recovery future must be created before configuration listeners are triggered. CompletableFuture<Void> recoveryFuture = RecoveryCompletionFutureFactory.create( clusterCfgMgr, @@ -460,7 +466,11 @@ public class IgniteImpl implements Ignite { }); }) // Signal that local recovery is complete and the node is ready to join the cluster. - .thenCompose(v -> cmgMgr.onJoinReady()) + .thenCompose(v -> { + LOG.info("Recovery complete, finishing join"); + + return cmgMgr.onJoinReady(); + }) .thenRun(() -> { try { // Transfer the node to the STARTED state. diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java index fe7563b99..a71d31dae 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java @@ -93,6 +93,8 @@ class LifecycleManager { * @throws NodeStoppingException If node stopping intention was detected. */ void onStartComplete() throws NodeStoppingException { + LOG.info("Start complete, transferring to {} state", Status.STARTED); + Status currentStatus = status.compareAndExchange(Status.STARTING, Status.STARTED); if (currentStatus == Status.STOPPING) {