This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 42cb0e5733e IGNITE-25019 Remove unneeded future from
ClusterManagementGroupManager (#5567)
42cb0e5733e is described below
commit 42cb0e5733e9b6f39fbe083b76212c39c5bccbeb
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Apr 3 16:12:15 2025 +0300
IGNITE-25019 Remove unneeded future from ClusterManagementGroupManager
(#5567)
---
.../management/ClusterManagementGroupManager.java | 58 ++++++++++++++--------
1 file changed, 36 insertions(+), 22 deletions(-)
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 0b93d8ca3ec..3c6e8594aaa 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -27,6 +27,7 @@ import static
org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.util.Collection;
import java.util.List;
@@ -648,9 +649,10 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
return inBusyLockAsync(
() ->
fireEvent(ClusterManagerGroupEvent.BEFORE_DESTROY_RAFT_GROUP,
EmptyEventParameters.INSTANCE)
.thenRunAsync(this::destroyCmg, this.scheduledExecutor)
- .exceptionally(err -> {
- failureManager.process(new
FailureContext(CRITICAL_ERROR, err));
- throw (err instanceof RuntimeException) ?
(RuntimeException) err : new CompletionException(err);
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ failureManager.process(new
FailureContext(CRITICAL_ERROR, e));
+ }
})
);
}
@@ -659,8 +661,12 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
* Completely destroys the local CMG Raft service.
*/
private void destroyCmg() {
- synchronized (raftServiceLock) {
- try {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+
+ try {
+ synchronized (raftServiceLock) {
if (raftService != null) {
raftService.cancel(true);
@@ -673,9 +679,11 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
raftManager.destroyRaftNodeStorages(nodeId,
raftGroupOptionsConfigurer);
localStateStorage.clear();
- } catch (Exception e) {
- throw new IgniteInternalException("Error when cleaning the CMG
state", e);
}
+ } catch (NodeStoppingException e) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, "Error when
cleaning the CMG state", e);
+ } finally {
+ busyLock.leaveBusy();
}
}
@@ -759,7 +767,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
private CompletableFuture<CmgRaftService>
startCmgRaftServiceWithEvents(Set<String> nodeNames, @Nullable String
initialClusterConfig) {
BeforeStartRaftGroupEventParameters params = new
BeforeStartRaftGroupEventParameters(nodeNames, initialClusterConfig);
return fireEvent(ClusterManagerGroupEvent.BEFORE_START_RAFT_GROUP,
params)
- .thenComposeAsync(v -> inBusyLockAsync(() ->
startCmgRaftService(nodeNames)), scheduledExecutor)
+ .thenApplyAsync(v -> startCmgRaftService(nodeNames),
scheduledExecutor)
.whenComplete((v, e) -> {
if (e != null) {
LOG.warn("Error when initializing the CMG", e);
@@ -770,23 +778,27 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
/**
* Starts the CMG Raft service using the provided node names as its peers.
*/
- private CompletableFuture<CmgRaftService> startCmgRaftService(Set<String>
nodeNames) {
- String thisNodeConsistentId = clusterService.nodeName();
+ private CmgRaftService startCmgRaftService(Set<String> nodeNames) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
- // If we are not in the CMG, we must be a learner. List of learners
will be updated by a leader accordingly,
- // but just to start a RAFT service we must include ourselves in the
initial learners list, that's why we
- // pass Set.of(we) as learners list if we are not in the CMG.
- boolean isLearner = !nodeNames.contains(thisNodeConsistentId);
+ try {
+ String thisNodeConsistentId = clusterService.nodeName();
- Set<String> learnerNames = isLearner ? Set.of(thisNodeConsistentId) :
Set.of();
+ // If we are not in the CMG, we must be a learner. List of
learners will be updated by a leader accordingly,
+ // but just to start a RAFT service we must include ourselves in
the initial learners list, that's why we
+ // pass Set.of(we) as learners list if we are not in the CMG.
+ boolean isLearner = !nodeNames.contains(thisNodeConsistentId);
- PeersAndLearners raftConfiguration =
PeersAndLearners.fromConsistentIds(nodeNames, learnerNames);
+ Set<String> learnerNames = isLearner ?
Set.of(thisNodeConsistentId) : Set.of();
- Peer serverPeer = isLearner ?
raftConfiguration.learner(thisNodeConsistentId) :
raftConfiguration.peer(thisNodeConsistentId);
+ PeersAndLearners raftConfiguration =
PeersAndLearners.fromConsistentIds(nodeNames, learnerNames);
- assert serverPeer != null;
+ Peer serverPeer = isLearner ?
raftConfiguration.learner(thisNodeConsistentId) :
raftConfiguration.peer(thisNodeConsistentId);
+
+ assert serverPeer != null;
- try {
RaftGroupService service =
raftManager.startSystemRaftGroupNodeAndWaitNodeReady(
raftNodeId(serverPeer),
raftConfiguration,
@@ -802,9 +814,11 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
raftGroupOptionsConfigurer
);
- return completedFuture(new CmgRaftService(service,
clusterService.topologyService(), logicalTopology));
- } catch (Exception e) {
- return failedFuture(e);
+ return new CmgRaftService(service,
clusterService.topologyService(), logicalTopology);
+ } catch (NodeStoppingException e) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, e);
+ } finally {
+ busyLock.leaveBusy();
}
}