sashapolo commented on code in PR #2066:
URL: https://github.com/apache/ignite-3/pull/2066#discussion_r1193112863
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +387,41 @@ private void onElectedAsLeader(long term) {
}
});
- raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
+ raftServiceAfterJoin().thenCompose(service ->
+ service.readClusterState()
+ .thenAccept(state -> {
Review Comment:
I think we need to use `whenComplete` here to complete
`updateDistributedConfigurationActionFuture` with exceptions in case previous
operations fail
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +387,41 @@ private void onElectedAsLeader(long term) {
}
});
- raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
+ raftServiceAfterJoin().thenCompose(service ->
+ service.readClusterState()
+ .thenAccept(state -> {
+
updateDistributedConfigurationActionFuture.complete(
+ new UpdateDistributedConfigurationAction(
+
state.clusterConfigurationToApply(),
+ (result) ->
removeClusterConfigFromClusterState(result, service))
+ );
+ }));
}
- private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService
service) {
- return service.readClusterState()
- .thenCompose(state -> {
- if (state == null) {
- LOG.info("No CMG state found in the Raft service");
- return completedFuture(null);
- } else if (state.clusterConfigurationToApply() == null) {
- // Config was applied or wasn't provided
- LOG.info("No cluster configuration found in the Raft
service");
- return completedFuture(null);
+ private CompletableFuture<Void> removeClusterConfigFromClusterState(
+ CompletableFuture<Void> configurationAppliedFuture,
+ CmgRaftService service
+ ) {
+ return configurationAppliedFuture.thenCombine(
+ service.readClusterState(),
+ (ignored, state) -> {
+ Collection<String> cmgNodes = state.cmgNodes();
Review Comment:
These local variables look redundant, we can inline them
##########
modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import
org.apache.ignite.internal.cluster.management.UpdateDistributedConfigurationAction;
+import
org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DistributedConfigurationUpdaterTest {
+
+ @Mock
+ public ConfigurationPresentation<String> presentation;
+
+ @Mock
+ public ClusterManagementGroupManager cmgMgr;
+
+ @Test
+ public void nextActionIsCompletedAfterUpdatingConfiguration() {
+
+ // Set up mocks.
+
when(presentation.update(anyString())).thenReturn(CompletableFuture.completedFuture(null));
+
+ CompletableFuture<Void> nextAction = new CompletableFuture<>();
+ String configuration = "security.authentication.enabled:true";
+ UpdateDistributedConfigurationAction
updateDistributedConfigurationAction =
+ new UpdateDistributedConfigurationAction(
+ configuration,
+ (result) -> result.whenComplete((v, e) ->
nextAction.complete(null)));
+
+ when(cmgMgr.clusterConfigurationToUpdate())
+
.thenReturn(CompletableFuture.completedFuture(updateDistributedConfigurationAction));
+
+ // Run updater.
+ DistributedConfigurationUpdater distributedConfigurationUpdater = new
DistributedConfigurationUpdater(
Review Comment:
`distributedConfigurationUpdater` can be extracted into a field
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Composite action to update the distributed configuration.
+ */
+public class UpdateDistributedConfigurationAction {
+
+ /**
+ * Configuration that should be applied.
+ */
+ private final String configuration;
+
+ private final Function<CompletableFuture<Void>, CompletableFuture<Void>>
nextAction;
+
+
+ /**
+ * Constructor.
+ *
+ * @param configuration Configuration that should be applied.
+ * @param nextAction The next action to be performed.
+ */
+ public UpdateDistributedConfigurationAction(
+ @Nullable String configuration,
+ Function<CompletableFuture<Void>, CompletableFuture<Void>>
nextAction
Review Comment:
Why do we need the first `CompletableFuture` parameter here? I can see that
it is only used in `DistributedConfigurationUpdater` in order to pass an
exception here. Why do we need that?
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +387,41 @@ private void onElectedAsLeader(long term) {
}
});
- raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
+ raftServiceAfterJoin().thenCompose(service ->
+ service.readClusterState()
+ .thenAccept(state -> {
+
updateDistributedConfigurationActionFuture.complete(
+ new UpdateDistributedConfigurationAction(
+
state.clusterConfigurationToApply(),
+ (result) ->
removeClusterConfigFromClusterState(result, service))
+ );
+ }));
}
- private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService
service) {
- return service.readClusterState()
- .thenCompose(state -> {
- if (state == null) {
- LOG.info("No CMG state found in the Raft service");
- return completedFuture(null);
- } else if (state.clusterConfigurationToApply() == null) {
- // Config was applied or wasn't provided
- LOG.info("No cluster configuration found in the Raft
service");
- return completedFuture(null);
+ private CompletableFuture<Void> removeClusterConfigFromClusterState(
+ CompletableFuture<Void> configurationAppliedFuture,
+ CmgRaftService service
+ ) {
+ return configurationAppliedFuture.thenCombine(
Review Comment:
This links to my previous question: I think `configurationAppliedFuture` is
redundant. Moreover, in case of an exception we will still read the cluster
state for some reason
##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java:
##########
@@ -31,37 +31,38 @@ public class DistributedConfigurationUpdater implements
IgniteComponent {
private static final IgniteLogger LOG =
Loggers.forClass(DistributedConfigurationUpdater.class);
- private final CompletableFuture<ConfigurationPresentation<String>>
clusterCfgPresentation = new CompletableFuture<>();
+ private final ClusterManagementGroupManager cmgMgr;
- public void
setDistributedConfigurationPresentation(ConfigurationPresentation<String>
presentation) {
- clusterCfgPresentation.complete(presentation);
+ private final ConfigurationPresentation<String> presentation;
+
+ public DistributedConfigurationUpdater(ClusterManagementGroupManager
cmgMgr, ConfigurationPresentation<String> presentation) {
+ this.cmgMgr = cmgMgr;
+ this.presentation = presentation;
}
- /**
- * Applies changes to the cluster configuration when {@link
DistributedConfigurationUpdater#clusterCfgPresentation}
- * is complete.
- *
- * @param configurationToApply Cluster configuration that should be
applied.
- * @return Future that will be completed when cluster configuration is
updated.
- */
- public CompletableFuture<Void> updateConfiguration(String
configurationToApply) {
- return clusterCfgPresentation.thenCompose(presentation ->
presentation.update(configurationToApply))
- .whenComplete((v, e) -> {
+ @Override
+ public void start() {
+ cmgMgr.clusterConfigurationToUpdate()
Review Comment:
How is this going to work? This future is only completed on the CMG leader,
what will happen on all other nodes? Will this future be stuck forever? I think
we need to have a mechanism to cancel it on all nodes apart from the CMG leader
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +387,41 @@ private void onElectedAsLeader(long term) {
}
});
- raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
+ raftServiceAfterJoin().thenCompose(service ->
+ service.readClusterState()
+ .thenAccept(state -> {
+
updateDistributedConfigurationActionFuture.complete(
+ new UpdateDistributedConfigurationAction(
+
state.clusterConfigurationToApply(),
+ (result) ->
removeClusterConfigFromClusterState(result, service))
+ );
+ }));
}
- private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService
service) {
- return service.readClusterState()
- .thenCompose(state -> {
- if (state == null) {
- LOG.info("No CMG state found in the Raft service");
- return completedFuture(null);
- } else if (state.clusterConfigurationToApply() == null) {
- // Config was applied or wasn't provided
- LOG.info("No cluster configuration found in the Raft
service");
- return completedFuture(null);
+ private CompletableFuture<Void> removeClusterConfigFromClusterState(
+ CompletableFuture<Void> configurationAppliedFuture,
+ CmgRaftService service
+ ) {
+ return configurationAppliedFuture.thenCombine(
+ service.readClusterState(),
+ (ignored, state) -> {
+ Collection<String> cmgNodes = state.cmgNodes();
+ Collection<String> msNodes =
state.metaStorageNodes();
+ IgniteProductVersion igniteVersion =
state.igniteVersion();
+ ClusterTag clusterTag = state.clusterTag();
+ return msgFactory.clusterState()
+ .cmgNodes(Set.copyOf(cmgNodes))
+ .metaStorageNodes(Set.copyOf(msNodes))
+ .version(igniteVersion.toString())
+ .clusterTag(clusterTag)
+ .build();
+ })
+ .thenCompose(service::updateClusterState)
+ .whenComplete((v2, e2) -> {
Review Comment:
`v2` and `e2`?
##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java:
##########
@@ -31,37 +31,38 @@ public class DistributedConfigurationUpdater implements
IgniteComponent {
private static final IgniteLogger LOG =
Loggers.forClass(DistributedConfigurationUpdater.class);
- private final CompletableFuture<ConfigurationPresentation<String>>
clusterCfgPresentation = new CompletableFuture<>();
+ private final ClusterManagementGroupManager cmgMgr;
- public void
setDistributedConfigurationPresentation(ConfigurationPresentation<String>
presentation) {
- clusterCfgPresentation.complete(presentation);
+ private final ConfigurationPresentation<String> presentation;
+
+ public DistributedConfigurationUpdater(ClusterManagementGroupManager
cmgMgr, ConfigurationPresentation<String> presentation) {
+ this.cmgMgr = cmgMgr;
+ this.presentation = presentation;
}
- /**
- * Applies changes to the cluster configuration when {@link
DistributedConfigurationUpdater#clusterCfgPresentation}
- * is complete.
- *
- * @param configurationToApply Cluster configuration that should be
applied.
- * @return Future that will be completed when cluster configuration is
updated.
- */
- public CompletableFuture<Void> updateConfiguration(String
configurationToApply) {
- return clusterCfgPresentation.thenCompose(presentation ->
presentation.update(configurationToApply))
- .whenComplete((v, e) -> {
+ @Override
+ public void start() {
+ cmgMgr.clusterConfigurationToUpdate()
+ .thenApply(action -> {
+ if (action.configuration() != null) {
+ presentation.update(action.configuration());
Review Comment:
`presentation.update` returns a future, which we ignore here, that's a bug
##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -351,6 +355,45 @@ void testLeaderChangeDuringJoin(TestInfo testInfo) throws
Exception {
assertThat(node.clusterManager().onJoinReady(),
willCompleteSuccessfully());
}
+ @Test
+ void
testClusterConfigurationIsRemovedFromClusterStateAfterUpdating(TestInfo
testInfo) throws Exception {
+ // Start a cluster of 3 nodes so that the CMG leader node could be
stopped later.
+ startCluster(3, testInfo);
+
+ String[] cmgNodes = clusterNodeNames();
+
+ // Start the CMG on all 3 nodes.
+ String clusterConfiguration = "security.authentication.enabled:true";
+ initCluster(cmgNodes, cmgNodes, clusterConfiguration);
+
+ // Find the CMG leader and stop it.
+ MockNode leaderNode = findLeaderNode(cluster).orElseThrow();
+
+ // Read cluster configuration from the cluster state and remove it.
+ UpdateDistributedConfigurationAction configurationAction =
leaderNode.clusterManager()
+ .clusterConfigurationToUpdate()
+ .get();
+
+ assertThat(configurationAction.configuration(),
is(clusterConfiguration));
+
configurationAction.nextAction().apply(CompletableFuture.completedFuture(null)).join();
+
+ // Stop the cluster leader.
+ stopNodes(List.of(leaderNode));
Review Comment:
What scenario are you testing here? Why do we need to stop the leader?
##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -440,11 +495,17 @@ private void waitForLogicalTopology() throws
InterruptedException {
}, 10000));
}
+
private void initCluster(String[] metaStorageNodes, String[] cmgNodes)
throws NodeStoppingException {
+ initCluster(metaStorageNodes, cmgNodes, null);
+ }
+
+ private void initCluster(String[] metaStorageNodes, String[] cmgNodes,
String clusterConfiguration) throws NodeStoppingException {
Review Comment:
`clusterConfiguration` should be annotated as `@Nullable`
##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -414,6 +457,18 @@ void
nonCmgNodeAddedLaterGetsLogicalTopologyChanges(TestInfo testInfo) throws Ex
assertTrue(waitForCondition(() ->
nonCmgTopology.getLogicalTopology().nodes().size() == 2, 10_000));
}
+ private Optional<MockNode> findLeaderNode(List<MockNode> cluster) {
Review Comment:
Since you have extracted a method, please use it in other places in this
class as well
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]