This is an automated email from the ASF dual-hosted git repository. rpuch pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 6fcadcb933 IGNITE-22241 Make ClusterManagementGroupManager extensible (#3756) 6fcadcb933 is described below commit 6fcadcb933c6cffd35b2d43f197c4a6f16a30b85 Author: Tiago Marques Godinho <tmgodi...@ua.pt> AuthorDate: Fri May 24 07:15:41 2024 +0100 IGNITE-22241 Make ClusterManagementGroupManager extensible (#3756) --- modules/cluster-management/build.gradle | 2 + .../management/ClusterManagementGroupManager.java | 64 ++++++++++++++++++---- .../BeforeStartRaftGroupEventParameters.java | 51 +++++++++++++++++ .../events/ClusterManagerGroupEvent.java | 32 +++++++++++ .../management/events/EmptyEventParameters.java | 29 ++++++++++ .../internal/cluster/management/MockNode.java | 7 ++- .../ItMetaStorageMultipleNodesAbstractTest.java | 10 +++- .../metastorage/impl/ItMetaStorageWatchTest.java | 7 ++- .../ItDistributedConfigurationPropertiesTest.java | 11 +++- .../ItDistributedConfigurationStorageTest.java | 12 +++- .../runner/app/ItIgniteNodeRestartTest.java | 3 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 3 +- .../rebalance/ItRebalanceDistributedTest.java | 6 +- 13 files changed, 214 insertions(+), 23 deletions(-) diff --git a/modules/cluster-management/build.gradle b/modules/cluster-management/build.gradle index e412100d46..ceb8530dfb 100644 --- a/modules/cluster-management/build.gradle +++ b/modules/cluster-management/build.gradle @@ -33,6 +33,7 @@ dependencies { implementation project(':ignite-configuration') implementation project(':ignite-configuration-api') implementation project(':ignite-configuration-presentation') + implementation project(':ignite-failure-handler') implementation project(':ignite-network') implementation project(':ignite-raft-api') implementation project(':ignite-vault') @@ -53,6 +54,7 @@ dependencies { testImplementation libs.mockito.junit testFixturesImplementation project(':ignite-core') + testFixturesImplementation project(':ignite-failure-handler') testFixturesImplementation project(':ignite-raft') testFixturesImplementation project(':ignite-raft-api') testFixturesImplementation project(':ignite-storage-api') diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java index 6efd1feea7..3b4f951417 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java @@ -23,10 +23,12 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toUnmodifiableSet; import static org.apache.ignite.internal.cluster.management.ClusterTag.clusterTag; +import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; +import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; import java.util.Collection; import java.util.List; @@ -41,6 +43,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.apache.ignite.internal.cluster.management.LocalStateStorage.LocalState; import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration; +import org.apache.ignite.internal.cluster.management.events.BeforeStartRaftGroupEventParameters; +import org.apache.ignite.internal.cluster.management.events.ClusterManagerGroupEvent; +import org.apache.ignite.internal.cluster.management.events.EmptyEventParameters; import org.apache.ignite.internal.cluster.management.network.CmgMessageHandlerFactory; import org.apache.ignite.internal.cluster.management.network.messages.CancelInitMessage; import org.apache.ignite.internal.cluster.management.network.messages.ClusterStateMessage; @@ -56,6 +61,10 @@ import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyComm import org.apache.ignite.internal.cluster.management.topology.LogicalTopology; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; +import org.apache.ignite.internal.event.AbstractEventProducer; +import org.apache.ignite.internal.event.EventParameters; +import org.apache.ignite.internal.failure.FailureContext; +import org.apache.ignite.internal.failure.FailureProcessor; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; @@ -85,7 +94,8 @@ import org.jetbrains.annotations.TestOnly; * <a href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-77%3A+Node+Join+Protocol+and+Initialization+for+Ignite+3">IEP-77</a> * for the description of the Cluster Management Group and its responsibilities. */ -public class ClusterManagementGroupManager implements IgniteComponent { +public class ClusterManagementGroupManager extends AbstractEventProducer<ClusterManagerGroupEvent, EventParameters> + implements IgniteComponent { private static final IgniteLogger LOG = Loggers.forClass(ClusterManagementGroupManager.class); /** Busy lock to stop synchronously. */ @@ -134,6 +144,9 @@ public class ClusterManagementGroupManager implements IgniteComponent { /** Future that resolves into the initial cluster configuration in HOCON format. */ private final CompletableFuture<String> initialClusterConfigurationFuture = new CompletableFuture<>(); + /** Failure processor that is used to handle critical errors. */ + private final FailureProcessor failureProcessor; + /** Constructor. */ public ClusterManagementGroupManager( VaultManager vault, @@ -143,7 +156,8 @@ public class ClusterManagementGroupManager implements IgniteComponent { ClusterStateStorage clusterStateStorage, LogicalTopology logicalTopology, ClusterManagementConfiguration configuration, - NodeAttributes nodeAttributes + NodeAttributes nodeAttributes, + FailureProcessor failureProcessor ) { this.clusterService = clusterService; this.clusterInitializer = clusterInitializer; @@ -153,6 +167,7 @@ public class ClusterManagementGroupManager implements IgniteComponent { this.configuration = configuration; this.localStateStorage = new LocalStateStorage(vault); this.nodeAttributes = nodeAttributes; + this.failureProcessor = failureProcessor; scheduledExecutor = Executors.newSingleThreadScheduledExecutor( NamedThreadFactory.create(clusterService.nodeName(), "cmg-manager", LOG) @@ -257,7 +272,8 @@ public class ClusterManagementGroupManager implements IgniteComponent { LOG.info("Local CMG state recovered, starting the CMG"); - return startCmgRaftService(localState.cmgNodeNames()) + // Since we recovered state we do not supply a new initialClusterConfig. + return startCmgRaftServiceWithEvents(localState.cmgNodeNames(), null) .thenCompose(service -> joinCluster(service, localState.clusterTag())); } @@ -285,7 +301,7 @@ public class ClusterManagementGroupManager implements IgniteComponent { // Raft service has not been started LOG.info("Init command received, starting the CMG [nodes={}]", msg.cmgNodes()); - serviceFuture = startCmgRaftService(msg.cmgNodes()); + serviceFuture = startCmgRaftServiceWithEvents(msg.cmgNodes(), msg.initialClusterConfiguration()); } else { // Raft service has been started, which means that this node has already received an init command at least once. LOG.info("Init command received, but the CMG has already been started"); @@ -426,8 +442,20 @@ public class ClusterManagementGroupManager implements IgniteComponent { private void handleCancelInit(CancelInitMessage msg) { LOG.info("CMG initialization cancelled [reason={}]", msg.reason()); - - destroyCmg(); + this.scheduledExecutor.execute(this::destroyCmgWithEvents); + } + + /** Delegates call to {@link #destroyCmg()} but fires the associated events. */ + private CompletableFuture<Void> destroyCmgWithEvents() { + LOG.info("CMG cancellation procedure started"); + return inBusyLockAsync(busyLock, + () -> fireEvent(ClusterManagerGroupEvent.BEFORE_DESTROY_RAFT_GROUP, EmptyEventParameters.INSTANCE) + .thenRunAsync(this::destroyCmg, this.scheduledExecutor) + .exceptionally(err -> { + failureProcessor.process(new FailureContext(CRITICAL_ERROR, err)); + throw (err instanceof RuntimeException) ? (RuntimeException) err : new CompletionException(err); + }) + ); } /** @@ -493,15 +521,16 @@ public class ClusterManagementGroupManager implements IgniteComponent { LOG.warn("CMG service could not be started on previous attempts. " + "Re-creating the CMG Raft service [reason={}]", e, e.getMessage()); + + return initCmgRaftService(state); } else { LOG.warn("CMG service started, but the cluster state is different. " + "Re-creating the CMG Raft service [localState={}, clusterState={}]", service.nodeNames(), state.cmgNodes()); - destroyCmg(); + return destroyCmgWithEvents() + .thenCompose(none -> initCmgRaftService(state)); } - - return initCmgRaftService(state); }) .thenCompose(Function.identity()); } @@ -522,6 +551,19 @@ public class ClusterManagementGroupManager implements IgniteComponent { }); } + /** + * Delegates call to {@link #startCmgRaftService(Set)} but fires the associated events. + * + * @param initialClusterConfig the initial cluster configuration provided by the + * {@link CmgInitMessage#initialClusterConfiguration()} if the cluster is being initialized for the first time, as part of a + * cluster init. Otherwise {@code null}, if starting after recovering state of an already initialized cluster. + */ + private CompletableFuture<CmgRaftService> startCmgRaftServiceWithEvents(Set<String> nodeNames, @Nullable String initialClusterConfig) { + BeforeStartRaftGroupEventParameters params = new BeforeStartRaftGroupEventParameters(nodeNames, initialClusterConfig); + return fireEvent(ClusterManagerGroupEvent.BEFORE_START_RAFT_GROUP, params) + .thenCompose(v -> startCmgRaftService(nodeNames)); + } + /** * Starts the CMG Raft service using the provided node names as its peers. */ @@ -578,7 +620,7 @@ public class ClusterManagementGroupManager implements IgniteComponent { * Starts the CMG Raft service using the given {@code state} and persists it to the local storage. */ private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState state) { - return startCmgRaftService(state.cmgNodes()) + return startCmgRaftServiceWithEvents(state.cmgNodes(), state.initialClusterConfiguration()) .thenCompose(service -> { var localState = new LocalState(state.cmgNodes(), state.clusterTag()); @@ -700,7 +742,7 @@ public class ClusterManagementGroupManager implements IgniteComponent { initialClusterConfigurationFuture.completeExceptionally(new NodeStoppingException()); - return nullCompletedFuture(); + return fireEvent(ClusterManagerGroupEvent.AFTER_STOP_RAFT_GROUP, EmptyEventParameters.INSTANCE); } /** diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/BeforeStartRaftGroupEventParameters.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/BeforeStartRaftGroupEventParameters.java new file mode 100644 index 0000000000..70c8df74c7 --- /dev/null +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/BeforeStartRaftGroupEventParameters.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster.management.events; + +import java.util.Set; +import org.apache.ignite.internal.cluster.management.network.messages.CmgInitMessage; +import org.apache.ignite.internal.event.EventParameters; +import org.jetbrains.annotations.Nullable; + +/** Transparent data container for the {@link ClusterManagerGroupEvent#BEFORE_START_RAFT_GROUP}. */ +public class BeforeStartRaftGroupEventParameters implements EventParameters { + private final Set<String> nodeNames; + private final @Nullable String initialClusterConfig; + + /** + * Constructor. + * + * @param nodeNames The names of the nodes in the cluster. This set is copied internally. + * @param initialClusterConfig the initial cluster configuration provided by the {@link CmgInitMessage#initialClusterConfiguration()}, + * if the cluster is being initialized for the first time, as part of a cluster init. Otherwise {@code null}, if starting after + * recovering state of an already initialized cluster. + */ + public BeforeStartRaftGroupEventParameters(Set<String> nodeNames, @Nullable String initialClusterConfig) { + this.nodeNames = Set.copyOf(nodeNames); + this.initialClusterConfig = initialClusterConfig; + } + + public Set<String> nodeNames() { + return nodeNames; + } + + @Nullable + public String initialClusterConfig() { + return initialClusterConfig; + } +} diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/ClusterManagerGroupEvent.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/ClusterManagerGroupEvent.java new file mode 100644 index 0000000000..314404b0d8 --- /dev/null +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/ClusterManagerGroupEvent.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster.management.events; + +import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; +import org.apache.ignite.internal.event.Event; + +/** Enum with events for the {@link ClusterManagementGroupManager}. */ + +public enum ClusterManagerGroupEvent implements Event { + /** Fired before starting the cmg raft group. */ + BEFORE_START_RAFT_GROUP, + /** Fired before destroying the cmg raft group and cleaning the local state. */ + BEFORE_DESTROY_RAFT_GROUP, + /** Fired after stopping the cmg raft group. */ + AFTER_STOP_RAFT_GROUP +} diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/EmptyEventParameters.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/EmptyEventParameters.java new file mode 100644 index 0000000000..53de4140e0 --- /dev/null +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/EmptyEventParameters.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster.management.events; + +import org.apache.ignite.internal.event.EventParameters; + +/** Empty Event Parameters Singleton. */ +public class EmptyEventParameters implements EventParameters { + public static final EmptyEventParameters INSTANCE = new EmptyEventParameters(); + + private EmptyEventParameters() { + // Intentionally left blank. + } +} diff --git a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java index 1a1cf3420e..094cfe022d 100644 --- a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java +++ b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImp import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator; +import org.apache.ignite.internal.failure.FailureProcessor; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metrics.NoOpMetricManager; @@ -101,6 +102,8 @@ public class MockNode { var clusterStateStorage = new RocksDbClusterStateStorage(this.workDir.resolve("cmg"), clusterService.nodeName()); + FailureProcessor failureProcessor = new FailureProcessor(nodeName); + this.clusterManager = new ClusterManagementGroupManager( vaultManager, clusterService, @@ -109,7 +112,8 @@ public class MockNode { clusterStateStorage, new LogicalTopologyImpl(clusterStateStorage), cmgConfiguration, - new NodeAttributesCollector(nodeAttributes, storageProfilesConfiguration) + new NodeAttributesCollector(nodeAttributes, storageProfilesConfiguration), + failureProcessor ); components = List.of( @@ -117,6 +121,7 @@ public class MockNode { clusterService, raftManager, clusterStateStorage, + failureProcessor, clusterManager ); } diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java index 557ad690f1..9ff9059bd6 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySer import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator; +import org.apache.ignite.internal.failure.FailureProcessor; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -138,6 +139,8 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr /** The future have to be complete after the node start and all Meta storage watches are deployd. */ private final CompletableFuture<Void> deployWatchesFut; + private final FailureProcessor failureProcessor; + Node(ClusterService clusterService, Path dataPath) { this.clusterService = clusterService; @@ -166,6 +169,8 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr new TestConfigurationValidator() ); + this.failureProcessor = new FailureProcessor(name()); + this.cmgManager = new ClusterManagementGroupManager( vaultManager, clusterService, @@ -174,7 +179,8 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr clusterStateStorage, logicalTopology, cmgConfiguration, - new NodeAttributesCollector(nodeAttributes, storageConfiguration) + new NodeAttributesCollector(nodeAttributes, storageConfiguration), + failureProcessor ); var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); @@ -207,6 +213,7 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr clusterService, raftManager, clusterStateStorage, + failureProcessor, cmgManager, metaStorageManager ); @@ -229,6 +236,7 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr List<IgniteComponent> components = List.of( metaStorageManager, cmgManager, + failureProcessor, raftManager, clusterStateStorage, clusterService, diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java index a87ebe6e13..ae241f68e4 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySer import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator; +import org.apache.ignite.internal.failure.FailureProcessor; import org.apache.ignite.internal.failure.NoOpFailureProcessor; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; @@ -156,6 +157,9 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest { new TestConfigurationValidator() ); + FailureProcessor failureProcessor = new FailureProcessor(name()); + components.add(failureProcessor); + this.cmgManager = new ClusterManagementGroupManager( vaultManager, clusterService, @@ -164,7 +168,8 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest { clusterStateStorage, logicalTopology, cmgConfiguration, - new NodeAttributesCollector(nodeAttributes, storageConfiguration) + new NodeAttributesCollector(nodeAttributes, storageConfiguration), + failureProcessor ); components.add(cmgManager); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java index 4ef7ae492c..10921b1a1b 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.configuration.storage.DistributedConfiguration import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator; +import org.apache.ignite.internal.failure.FailureProcessor; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.IgniteComponent; @@ -131,6 +132,8 @@ public class ItDistributedConfigurationPropertiesTest extends BaseIgniteAbstract /** The future have to be complete after the node start and all Meta storage watches are deployd. */ private final CompletableFuture<Void> deployWatchesFut; + private final FailureProcessor failureProcessor; + /** Flag that disables storage updates. */ private volatile boolean receivesUpdates = true; @@ -173,6 +176,8 @@ public class ItDistributedConfigurationPropertiesTest extends BaseIgniteAbstract new TestConfigurationValidator() ); + this.failureProcessor = new FailureProcessor(clusterService.nodeName()); + cmgManager = new ClusterManagementGroupManager( vaultManager, clusterService, @@ -181,7 +186,8 @@ public class ItDistributedConfigurationPropertiesTest extends BaseIgniteAbstract clusterStateStorage, logicalTopology, clusterManagementConfiguration, - new NodeAttributesCollector(nodeAttributes, storageConfiguration) + new NodeAttributesCollector(nodeAttributes, storageConfiguration), + failureProcessor ); var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); @@ -236,7 +242,7 @@ public class ItDistributedConfigurationPropertiesTest extends BaseIgniteAbstract */ CompletableFuture<Void> start() { assertThat( - startAsync(vaultManager, clusterService, raftManager, cmgManager, metaStorageManager), + startAsync(vaultManager, clusterService, raftManager, failureProcessor, cmgManager, metaStorageManager), willCompleteSuccessfully() ); @@ -257,6 +263,7 @@ public class ItDistributedConfigurationPropertiesTest extends BaseIgniteAbstract var components = List.of( distributedCfgManager, cmgManager, + failureProcessor, metaStorageManager, raftManager, clusterService, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java index 17cf8b3f03..791692cefa 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySer import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator; +import org.apache.ignite.internal.failure.FailureProcessor; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.IgniteComponent; @@ -109,6 +110,8 @@ public class ItDistributedConfigurationStorageTest extends BaseIgniteAbstractTes /** The future have to be complete after the node start and all Meta storage watches are deployd. */ private final CompletableFuture<Void> deployWatchesFut; + private final FailureProcessor failureProcessor; + /** * Constructor that simply creates a subset of components of this node. */ @@ -145,6 +148,8 @@ public class ItDistributedConfigurationStorageTest extends BaseIgniteAbstractTes new TestConfigurationValidator() ); + this.failureProcessor = new FailureProcessor(clusterService.nodeName()); + cmgManager = new ClusterManagementGroupManager( vaultManager, clusterService, @@ -153,7 +158,8 @@ public class ItDistributedConfigurationStorageTest extends BaseIgniteAbstractTes clusterStateStorage, logicalTopology, clusterManagementConfiguration, - new NodeAttributesCollector(nodeAttributes, storageConfiguration) + new NodeAttributesCollector(nodeAttributes, storageConfiguration), + failureProcessor ); var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); @@ -187,7 +193,7 @@ public class ItDistributedConfigurationStorageTest extends BaseIgniteAbstractTes */ void start() { assertThat( - startAsync(vaultManager, clusterService, raftManager, cmgManager, metaStorageManager), + startAsync(vaultManager, clusterService, raftManager, failureProcessor, cmgManager, metaStorageManager), willCompleteSuccessfully() ); @@ -207,7 +213,7 @@ public class ItDistributedConfigurationStorageTest extends BaseIgniteAbstractTes */ void stop() { var components = - List.of(metaStorageManager, cmgManager, raftManager, clusterService, vaultManager); + List.of(metaStorageManager, cmgManager, failureProcessor, raftManager, clusterService, vaultManager); for (IgniteComponent igniteComponent : components) { igniteComponent.beforeNodeStop(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index d33ff17a4c..ac876f0bd5 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -372,7 +372,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { logicalTopology, clusterManagementConfiguration, new NodeAttributesCollector(nodeAttributes, - nodeCfgMgr.configurationRegistry().getConfiguration(StorageConfiguration.KEY)) + nodeCfgMgr.configurationRegistry().getConfiguration(StorageConfiguration.KEY)), + failureProcessor ); LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 40f6d8194a..0e9c1fcf30 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -550,7 +550,8 @@ public class IgniteImpl implements Ignite { clusterStateStorage, logicalTopology, nodeConfigRegistry.getConfiguration(ClusterManagementConfiguration.KEY), - nodeAttributesCollector + nodeAttributesCollector, + failureProcessor ); logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgMgr); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 8e963c6e34..ac7e294e7c 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -1056,6 +1056,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { new TestConfigurationValidator() ); + failureProcessor = new FailureProcessor(name); + cmgManager = new ClusterManagementGroupManager( vaultManager, clusterService, @@ -1064,7 +1066,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { clusterStateStorage, logicalTopology, clusterManagementConfiguration, - new NodeAttributesCollector(nodeAttributes, storageConfiguration) + new NodeAttributesCollector(nodeAttributes, storageConfiguration), + failureProcessor ); LogicalTopologyServiceImpl logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); @@ -1146,7 +1149,6 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { Path storagePath = dir.resolve("storage"); - failureProcessor = new FailureProcessor(name); dataStorageMgr = new DataStorageManager( dataStorageModules.createStorageEngines(