This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new b6004047b3 IGNITE-19199 Propagate safe time when Meta Storage is idle (#2239) b6004047b3 is described below commit b6004047b3c3e9cd91b5ccf28c26ee206c1e3a7f Author: Alexander Polovtcev <alex.polovt...@gmail.com> AuthorDate: Thu Jun 29 17:53:44 2023 +0300 IGNITE-19199 Propagate safe time when Meta Storage is idle (#2239) --- modules/cli/build.gradle | 2 + .../repl/executor/ItIgnitePicocliCommandsTest.java | 31 +++- modules/metastorage-api/build.gradle | 4 + .../MetaStorageConfigurationModule.java} | 31 ++-- .../MetaStorageConfigurationSchema.java | 40 +++++ modules/metastorage/build.gradle | 5 + .../impl/ItMetaStorageManagerImplTest.java | 32 +++- .../ItMetaStorageMultipleNodesAbstractTest.java | 147 +++++++++++++--- ...MetaStorageSafeTimePropagationAbstractTest.java | 11 +- .../impl/ItMetaStorageServicePersistenceTest.java | 4 +- .../metastorage/impl/ItMetaStorageServiceTest.java | 2 +- .../metastorage/impl/ItMetaStorageWatchTest.java | 27 ++- .../metastorage/command/SyncTimeCommand.java | 3 + ...java => MetaStorageLeaderElectionListener.java} | 148 +++++++++------- .../metastorage/impl/MetaStorageManagerImpl.java | 192 +++++++++++++++------ .../metastorage/impl/MetaStorageServiceImpl.java | 4 +- .../server/raft/MetaStorageWriteHandler.java | 16 +- .../metastorage/server/time/ClusterTimeImpl.java | 165 ++++++++++++------ .../MetaStorageDeployWatchesCorrectnessTest.java | 22 ++- .../impl/MetaStorageManagerRecoveryTest.java | 23 ++- .../metastorage/server/time/ClusterTimeTest.java | 132 ++++++++++++++ .../impl/StandaloneMetaStorageManager.java | 50 +++++- .../server/AbstractKeyValueStorageTest.java | 4 +- .../internal/placementdriver/ActiveActorTest.java | 4 +- .../MultiActorPlacementDriverTest.java | 14 +- .../PlacementDriverManagerTest.java | 8 +- .../placementdriver/PlacementDriverManager.java | 2 +- .../internal/raft/LeaderElectionListener.java} | 24 +-- .../java/org/apache/ignite/internal/raft/Loza.java | 1 + .../rpc/impl/RaftGroupEventsClientListener.java | 28 +-- .../raft/client/TopologyAwareRaftGroupService.java | 44 +++-- .../TopologyAwareRaftGroupServiceFactory.java | 22 +-- .../apache/ignite/internal/replicator/Replica.java | 2 +- .../ignite/internal/replicator/ReplicaManager.java | 2 +- .../client/TopologyAwareRaftGroupServiceTest.java | 2 +- .../replicator/PlacementDriverReplicaSideTest.java | 10 +- .../ItDistributedConfigurationPropertiesTest.java | 27 ++- .../ItDistributedConfigurationStorageTest.java | 25 ++- .../storage/ItRebalanceDistributedTest.java | 33 ++-- .../runner/app/ItIgniteNodeRestartTest.java | 32 ++-- .../org/apache/ignite/internal/app/IgniteImpl.java | 25 +-- 41 files changed, 1019 insertions(+), 381 deletions(-) diff --git a/modules/cli/build.gradle b/modules/cli/build.gradle index 9f0eddd54d..bf52713ad4 100644 --- a/modules/cli/build.gradle +++ b/modules/cli/build.gradle @@ -91,8 +91,10 @@ dependencies { integrationTestAnnotationProcessor libs.picocli.annotation.processor integrationTestAnnotationProcessor libs.micronaut.inject.annotation.processor integrationTestAnnotationProcessor libs.micronaut.validation.annotation.processor + integrationTestImplementation testFixtures(project) integrationTestImplementation project(':ignite-api') + integrationTestImplementation project(':ignite-configuration-api') integrationTestImplementation project(':ignite-runner') integrationTestImplementation project(':ignite-schema') integrationTestImplementation project(':ignite-sql-engine') diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/core/repl/executor/ItIgnitePicocliCommandsTest.java b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/core/repl/executor/ItIgnitePicocliCommandsTest.java index e6e8f4d392..4fa20d8f85 100644 --- a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/core/repl/executor/ItIgnitePicocliCommandsTest.java +++ b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/core/repl/executor/ItIgnitePicocliCommandsTest.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.cli.core.repl.executor; +import static java.util.stream.Collectors.flatMapping; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toUnmodifiableList; import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -32,8 +35,12 @@ import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.ignite.configuration.ConfigurationModule; +import org.apache.ignite.configuration.RootKey; +import org.apache.ignite.configuration.annotation.ConfigurationType; import org.apache.ignite.internal.cli.commands.CliCommandTestInitializedIntegrationBase; import org.apache.ignite.internal.cli.commands.TopLevelCliReplCommand; import org.apache.ignite.internal.cli.core.repl.Session; @@ -44,6 +51,7 @@ import org.apache.ignite.internal.cli.core.repl.completer.filter.CompleterFilter import org.apache.ignite.internal.cli.core.repl.completer.filter.DynamicCompleterFilter; import org.apache.ignite.internal.cli.core.repl.completer.filter.NonRepeatableOptionsFilter; import org.apache.ignite.internal.cli.core.repl.completer.filter.ShortOptionsFilter; +import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider; import org.assertj.core.util.Files; import org.jline.reader.Candidate; import org.jline.reader.LineReader; @@ -65,6 +73,23 @@ public class ItIgnitePicocliCommandsTest extends CliCommandTestInitializedIntegr private static final String DEFAULT_REST_URL = "http://localhost:10300"; + private static final List<String> DISTRIBUTED_CONFIGURATION_KEYS; + + private static final List<String> LOCAL_CONFIGURATION_KEYS; + + static { + Map<ConfigurationType, List<String>> configKeysByType = new ServiceLoaderModulesProvider() + .modules(ItIgnitePicocliCommandsTest.class.getClassLoader()) + .stream() + .collect(groupingBy( + ConfigurationModule::type, + flatMapping(module -> module.rootKeys().stream().map(RootKey::key), toUnmodifiableList()) + )); + + DISTRIBUTED_CONFIGURATION_KEYS = configKeysByType.get(ConfigurationType.DISTRIBUTED); + LOCAL_CONFIGURATION_KEYS = configKeysByType.get(ConfigurationType.LOCAL); + } + @Inject DynamicCompleterRegistry dynamicCompleterRegistry; @@ -215,7 +240,7 @@ public class ItIgnitePicocliCommandsTest extends CliCommandTestInitializedIntegr // wait for lazy init of node config completer await("For given parsed words: " + givenParsedLine.words()).until( () -> complete(givenParsedLine), - containsInAnyOrder("rest", "compute", "clientConnector", "raft", "network", "cluster", "deployment", "nodeAttributes") + containsInAnyOrder(LOCAL_CONFIGURATION_KEYS.toArray(String[]::new)) ); } @@ -265,7 +290,7 @@ public class ItIgnitePicocliCommandsTest extends CliCommandTestInitializedIntegr // wait for lazy init of cluster config completer await("For given parsed words: " + givenParsedLine.words()).until( () -> complete(givenParsedLine), - containsInAnyOrder("aimem", "aipersist", "metrics", "rocksDb", "table", "zone", "security", "schemaSync", "gc") + containsInAnyOrder(DISTRIBUTED_CONFIGURATION_KEYS.toArray(String[]::new)) ); } @@ -288,7 +313,7 @@ public class ItIgnitePicocliCommandsTest extends CliCommandTestInitializedIntegr // wait for lazy init of cluster config completer await("For given parsed words: " + givenParsedLine.words()).until( () -> complete(givenParsedLine), - containsInAnyOrder("aimem", "aipersist", "metrics", "rocksDb", "table", "zone", "security", "schemaSync", "gc") + containsInAnyOrder(DISTRIBUTED_CONFIGURATION_KEYS.toArray(String[]::new)) ); } diff --git a/modules/metastorage-api/build.gradle b/modules/metastorage-api/build.gradle index c5aefc2800..afb4c5d9a0 100644 --- a/modules/metastorage-api/build.gradle +++ b/modules/metastorage-api/build.gradle @@ -23,9 +23,13 @@ description = 'ignite-metastorage-api' dependencies { api project(':ignite-network-api') + api project(':ignite-configuration-api') implementation project(':ignite-core') implementation libs.jetbrains.annotations + implementation libs.auto.service.annotations annotationProcessor project(':ignite-network-annotation-processor') + annotationProcessor project(':ignite-configuration-annotation-processor') + annotationProcessor libs.auto.service } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationModule.java similarity index 51% copy from modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java copy to modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationModule.java index 85059bbba0..646c64b1f4 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java +++ b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationModule.java @@ -15,24 +15,27 @@ * limitations under the License. */ -package org.apache.ignite.internal.metastorage.command; +package org.apache.ignite.internal.metastorage.configuration; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; - -import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.annotations.Transferable; +import com.google.auto.service.AutoService; +import java.util.Collection; +import java.util.Collections; +import org.apache.ignite.configuration.ConfigurationModule; +import org.apache.ignite.configuration.RootKey; +import org.apache.ignite.configuration.annotation.ConfigurationType; /** - * Command that initiates idle safe time synchronization. + * {@link ConfigurationModule} for Meta Storage configuration. */ -@Transferable(MetastorageCommandsMessageGroup.SYNC_TIME) -public interface SyncTimeCommand extends WriteCommand { - /** New safe time. */ - long safeTimeLong(); +@AutoService(ConfigurationModule.class) +public class MetaStorageConfigurationModule implements ConfigurationModule { + @Override + public ConfigurationType type() { + return ConfigurationType.DISTRIBUTED; + } - /** New safe time. */ - default HybridTimestamp safeTime() { - return hybridTimestamp(safeTimeLong()); + @Override + public Collection<RootKey<?, ?>> rootKeys() { + return Collections.singleton(MetaStorageConfiguration.KEY); } } diff --git a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java new file mode 100644 index 0000000000..a08b04e2d3 --- /dev/null +++ b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.configuration; + +import org.apache.ignite.configuration.annotation.ConfigurationRoot; +import org.apache.ignite.configuration.annotation.ConfigurationType; +import org.apache.ignite.configuration.annotation.Value; +import org.apache.ignite.configuration.validation.Range; + +/** + * Configuration schema for the Meta Storage module. + */ +@ConfigurationRoot(rootName = "metaStorage", type = ConfigurationType.DISTRIBUTED) +public class MetaStorageConfigurationSchema { + /** + * Duration (in milliseconds) used to determine how often to issue time sync commands when the Meta Storage is idle + * (no writes are being issued). + * + * <p>Making this value too small increases the network load, while making this value too large can lead to increased latency of + * Meta Storage reads. + */ + @Value(hasDefault = true) + @Range(min = 1) + public long idleSyncTimeInterval = 500; +} diff --git a/modules/metastorage/build.gradle b/modules/metastorage/build.gradle index 3ce2ee1fde..a7e9f66380 100644 --- a/modules/metastorage/build.gradle +++ b/modules/metastorage/build.gradle @@ -29,6 +29,7 @@ dependencies { implementation project(':ignite-network-api') implementation project(':ignite-vault') implementation project(':ignite-raft-api') + implementation project(':ignite-replicator') implementation project(':ignite-api') implementation project(':ignite-core') implementation project(':ignite-rocksdb-common') @@ -39,12 +40,15 @@ dependencies { testImplementation testFixtures(project(':ignite-core')) testImplementation testFixtures(project(':ignite-vault')) + testImplementation testFixtures(project(':ignite-configuration')) + testImplementation testFixtures(project(':ignite-replicator')) testImplementation libs.mockito.junit testImplementation libs.hamcrest.core integrationTestImplementation libs.jetbrains.annotations integrationTestImplementation project(":ignite-cluster-management") integrationTestImplementation project(':ignite-network') + integrationTestImplementation project(':ignite-replicator') integrationTestImplementation project(':ignite-rest') integrationTestImplementation project(':ignite-raft') integrationTestImplementation project(":ignite-raft-api") @@ -61,6 +65,7 @@ dependencies { testFixturesImplementation project(':ignite-cluster-management') testFixturesImplementation project(':ignite-core') testFixturesImplementation project(':ignite-raft-api') + testFixturesImplementation project(':ignite-replicator') testFixturesImplementation project(':ignite-rocksdb-common') testFixturesImplementation project(':ignite-vault') testFixturesImplementation libs.jetbrains.annotations diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java index c7773691ac..ff2a75c687 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java @@ -55,11 +55,13 @@ import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.RevisionUpdateListener; import org.apache.ignite.internal.metastorage.WatchEvent; import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.dsl.Conditions; import org.apache.ignite.internal.metastorage.dsl.Operations; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.util.IgniteUtils; @@ -70,6 +72,7 @@ import org.apache.ignite.lang.ByteArray; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.StaticNodeFinder; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -93,13 +96,29 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { private MetaStorageManagerImpl metaStorageManager; @BeforeEach - void setUp(TestInfo testInfo, @InjectConfiguration RaftConfiguration raftConfiguration) { + void setUp( + TestInfo testInfo, + @InjectConfiguration RaftConfiguration raftConfiguration, + @InjectConfiguration MetaStorageConfiguration metaStorageConfiguration + ) { var addr = new NetworkAddress("localhost", 10_000); clusterService = clusterService(testInfo, addr.port(), new StaticNodeFinder(List.of(addr))); HybridClock clock = new HybridClockImpl(); - raftManager = new Loza(clusterService, raftConfiguration, workDir.resolve("loza"), clock); + + var raftGroupEventsClientListener = new RaftGroupEventsClientListener(); + + raftManager = new Loza(clusterService, raftConfiguration, workDir.resolve("loza"), clock, raftGroupEventsClientListener); + + var logicalTopologyService = mock(LogicalTopologyService.class); + + var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + clusterService, + logicalTopologyService, + Loza.FACTORY, + raftGroupEventsClientListener + ); vaultManager = new VaultManager(new InMemoryVaultService()); @@ -113,10 +132,12 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { vaultManager, clusterService, cmgManager, - mock(LogicalTopologyService.class), + logicalTopologyService, raftManager, storage, - clock + clock, + topologyAwareRaftGroupServiceFactory, + metaStorageConfiguration ); vaultManager.start(); @@ -279,7 +300,8 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { mock(LogicalTopologyService.class), raftManager, storage, - new HybridClockImpl() + new HybridClockImpl(), + mock(TopologyAwareRaftGroupServiceFactory.class) ); metaStorageManager.stop(); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java index 9103686e64..1e039b2bf8 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java @@ -29,6 +29,8 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -59,14 +61,15 @@ import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.EntryEvent; import org.apache.ignite.internal.metastorage.WatchEvent; import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; -import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.Peer; -import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; -import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.vault.VaultManager; @@ -77,7 +80,7 @@ import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.StaticNodeFinder; -import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.apache.ignite.utils.ClusterServiceTestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -100,6 +103,12 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr @InjectConfiguration private static NodeAttributesConfiguration nodeAttributes; + /** + * Large interval to effectively disable idle safe time propagation. + */ + @InjectConfiguration("mock.idleSyncTimeInterval=1000000") + private static MetaStorageConfiguration metaStorageConfiguration; + public abstract KeyValueStorage createStorage(String nodeName, Path path); private class Node { @@ -126,11 +135,15 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr Path basePath = dataPath.resolve(name()); HybridClock clock = new HybridClockImpl(); + + var raftGroupEventsClientListener = new RaftGroupEventsClientListener(); + this.raftManager = new Loza( clusterService, raftConfiguration, basePath.resolve("raft"), - clock + clock, + raftGroupEventsClientListener ); var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); @@ -145,20 +158,31 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr nodeAttributes, new TestConfigurationValidator()); + var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); + + var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + clusterService, + logicalTopologyService, + Loza.FACTORY, + raftGroupEventsClientListener + ); + this.metaStorageManager = new MetaStorageManagerImpl( vaultManager, clusterService, cmgManager, - new LogicalTopologyServiceImpl(logicalTopology, cmgManager), + logicalTopologyService, raftManager, createStorage(name(), basePath), - clock + clock, + topologyAwareRaftGroupServiceFactory, + metaStorageConfiguration ); deployWatchesFut = metaStorageManager.deployWatches(); } - void start() throws NodeStoppingException { + void start() { List<IgniteComponent> components = List.of( vaultManager, clusterService, @@ -492,36 +516,105 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr }, TimeUnit.SECONDS.toMillis(1))); } + /** + * Tests that safe time is propagated from the leader even if the Meta Storage is idle. + */ + @Test + void testIdleSafeTimePropagation(TestInfo testInfo) throws Exception { + // Enable idle safe time sync. + CompletableFuture<Void> updateIdleSyncTimeIntervalFuture = metaStorageConfiguration.idleSyncTimeInterval().update(100L); + + assertThat(updateIdleSyncTimeIntervalFuture, willCompleteSuccessfully()); + + Node firstNode = startNode(testInfo); + Node secondNode = startNode(testInfo); + + firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test"); + + assertThat(firstNode.cmgManager.onJoinReady(), willCompleteSuccessfully()); + assertThat(secondNode.cmgManager.onJoinReady(), willCompleteSuccessfully()); + + firstNode.waitWatches(); + secondNode.waitWatches(); + + ClusterTime firstNodeTime = firstNode.metaStorageManager.clusterTime(); + ClusterTime secondNodeTime = secondNode.metaStorageManager.clusterTime(); + + HybridTimestamp now = firstNodeTime.now(); + + assertThat(firstNodeTime.waitFor(now), willCompleteSuccessfully()); + assertThat(secondNodeTime.waitFor(now), willCompleteSuccessfully()); + } + + /** + * Tests that safe time is propagated after leader was changed and the Meta Storage is idle. + */ + @Test + void testIdleSafeTimePropagationLeaderTransferred(TestInfo testInfo) throws Exception { + // Enable idle safe time sync. + CompletableFuture<Void> updateIdleSyncTimeIntervalFuture = metaStorageConfiguration.idleSyncTimeInterval().update(100L); + + assertThat(updateIdleSyncTimeIntervalFuture, willCompleteSuccessfully()); + + Node firstNode = startNode(testInfo); + Node secondNode = startNode(testInfo); + + firstNode.cmgManager.initCluster(List.of(firstNode.name(), secondNode.name()), List.of(firstNode.name()), "test"); + + assertThat(firstNode.cmgManager.onJoinReady(), willCompleteSuccessfully()); + assertThat(secondNode.cmgManager.onJoinReady(), willCompleteSuccessfully()); + + firstNode.waitWatches(); + secondNode.waitWatches(); + + ClusterTime firstNodeTime = firstNode.metaStorageManager.clusterTime(); + ClusterTime secondNodeTime = secondNode.metaStorageManager.clusterTime(); + + Node leader = transferLeadership(firstNode, secondNode); + + HybridTimestamp now = leader.metaStorageManager.clusterTime().now(); + + assertThat(firstNodeTime.waitFor(now), willCompleteSuccessfully()); + assertThat(secondNodeTime.waitFor(now), willCompleteSuccessfully()); + + leader = transferLeadership(firstNode, secondNode); + + now = leader.metaStorageManager.clusterTime().now(); + + assertThat(firstNodeTime.waitFor(now), willCompleteSuccessfully()); + assertThat(secondNodeTime.waitFor(now), willCompleteSuccessfully()); + } + private Node transferLeadership(Node firstNode, Node secondNode) { - RaftGroupService svc1 = getMetastorageService(firstNode); - RaftGroupService svc2 = getMetastorageService(secondNode); + RaftGroupService svc = getMetastorageService(firstNode); - boolean leaderFirst = false; + CompletableFuture<Node> future = svc.refreshLeader() + .thenCompose(v -> { + Peer leader = svc.leader(); - RaftGroupService leader; - RaftGroupService notLeader; + assertThat(leader, is(notNullValue())); - if (svc1.getRaftNode().isLeader()) { - leader = svc1; - notLeader = svc2; + Peer newLeader = svc.peers().stream() + .filter(p -> !p.equals(leader)) + .findFirst() + .orElseThrow(); - leaderFirst = true; - } else { - leader = svc2; - notLeader = svc1; - } + Node newLeaderNode = newLeader.consistentId().equals(firstNode.name()) ? firstNode : secondNode; + + return svc.transferLeadership(newLeader).thenApply(unused -> newLeaderNode); + }); - leader.getRaftNode().transferLeadershipTo(notLeader.getServerId()); + assertThat(future, willCompleteSuccessfully()); - return leaderFirst ? secondNode : firstNode; + return future.join(); } private RaftGroupService getMetastorageService(Node node) { - JraftServerImpl server1 = (JraftServerImpl) node.raftManager.server(); + CompletableFuture<RaftGroupService> future = node.metaStorageManager.metaStorageServiceFuture() + .thenApply(MetaStorageServiceImpl::raftGroupService); - RaftNodeId nodeId = server1.localNodes().stream() - .filter(id -> MetastorageGroupId.INSTANCE.equals(id.groupId())).findFirst().get(); + assertThat(future, willCompleteSuccessfully()); - return server1.raftGroupService(nodeId); + return future.join(); } } diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java index c73c6ac301..068b2b2721 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java @@ -41,13 +41,10 @@ import org.junit.jupiter.api.Test; public abstract class ItMetaStorageSafeTimePropagationAbstractTest extends AbstractKeyValueStorageTest { private final HybridClock clock = new HybridClockImpl(); - private final ClusterTimeImpl time = new ClusterTimeImpl(new IgniteSpinBusyLock(), clock); + private final ClusterTimeImpl time = new ClusterTimeImpl("node", new IgniteSpinBusyLock(), clock); @BeforeEach - @Override - public void setUp() { - super.setUp(); - + public void startWatches() { storage.startWatches(0, (e, t) -> { time.updateSafeTime(t); @@ -56,8 +53,8 @@ public abstract class ItMetaStorageSafeTimePropagationAbstractTest extends Abstr } @AfterEach - void tearDown() throws Exception { - storage.close(); + public void stopTime() throws Exception { + time.close(); } @Test diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java index 33575176b7..8fa5a64371 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java @@ -71,7 +71,7 @@ public class ItMetaStorageServicePersistenceTest extends ItAbstractListenerSnaps public void beforeFollowerStop(RaftGroupService service, RaftServer server) throws Exception { ClusterNode followerNode = getNode(server); - var clusterTime = new ClusterTimeImpl(new IgniteSpinBusyLock(), new HybridClockImpl()); + var clusterTime = new ClusterTimeImpl(followerNode.name(), new IgniteSpinBusyLock(), new HybridClockImpl()); metaStorage = new MetaStorageServiceImpl(followerNode.name(), service, new IgniteSpinBusyLock(), clusterTime); @@ -149,7 +149,7 @@ public class ItMetaStorageServicePersistenceTest extends ItAbstractListenerSnaps return s; }); - return new MetaStorageListener(storage, new ClusterTimeImpl(new IgniteSpinBusyLock(), new HybridClockImpl())); + return new MetaStorageListener(storage, new ClusterTimeImpl(nodeName, new IgniteSpinBusyLock(), new HybridClockImpl())); } /** {@inheritDoc} */ diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java index dfc5a3d3ab..dac71c9166 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java @@ -194,7 +194,7 @@ public class ItMetaStorageServiceTest { clock ); - this.clusterTime = new ClusterTimeImpl(new IgniteSpinBusyLock(), clock); + this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), new IgniteSpinBusyLock(), clock); this.mockStorage = mock(KeyValueStorage.class); } diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java index eb7701110f..c0e2d93e9e 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java @@ -53,10 +53,12 @@ import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.WatchEvent; import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.dsl.Conditions; import org.apache.ignite.internal.metastorage.dsl.Operations; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.util.IgniteUtils; @@ -67,6 +69,7 @@ import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.StaticNodeFinder; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.apache.ignite.utils.ClusterServiceTestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -83,6 +86,9 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest { @InjectConfiguration private static NodeAttributesConfiguration nodeAttributes; + @InjectConfiguration + private static MetaStorageConfiguration metaStorageConfiguration; + private static class Node { private final List<IgniteComponent> components = new ArrayList<>(); @@ -104,11 +110,15 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest { Path basePath = dataPath.resolve(name()); HybridClock clock = new HybridClockImpl(); + + var raftGroupEventsClientListener = new RaftGroupEventsClientListener(); + var raftManager = new Loza( clusterService, raftConfiguration, basePath.resolve("raft"), - clock + clock, + raftGroupEventsClientListener ); components.add(raftManager); @@ -131,14 +141,25 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest { components.add(cmgManager); + var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); + + var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + clusterService, + logicalTopologyService, + Loza.FACTORY, + raftGroupEventsClientListener + ); + this.metaStorageManager = new MetaStorageManagerImpl( vaultManager, clusterService, cmgManager, - new LogicalTopologyServiceImpl(logicalTopology, cmgManager), + logicalTopologyService, raftManager, new RocksDbKeyValueStorage(name(), basePath.resolve("storage")), - clock + clock, + topologyAwareRaftGroupServiceFactory, + metaStorageConfiguration ); components.add(metaStorageManager); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java index 85059bbba0..4032f0f8f2 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java @@ -31,6 +31,9 @@ public interface SyncTimeCommand extends WriteCommand { /** New safe time. */ long safeTimeLong(); + /** Term of the initiator. */ + long initiatorTerm(); + /** New safe time. */ default HybridTimestamp safeTime() { return hybridTimestamp(safeTimeLong()); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java similarity index 62% rename from modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java rename to modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java index 36d571bf5b..518cf51338 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java @@ -19,9 +19,11 @@ package org.apache.ignite.internal.metastorage.impl; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; +import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import java.util.List; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.function.Supplier; @@ -31,20 +33,22 @@ import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; +import org.apache.ignite.internal.raft.LeaderElectionListener; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; -import org.apache.ignite.internal.raft.RaftGroupEventsListener; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; +import org.jetbrains.annotations.Nullable; /** - * Raft Group Events listener that registers Logical Topology listener for updating the list of Meta Storage Raft group listeners. + * Meta Storage leader election listener. */ -public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListener { - private static final IgniteLogger LOG = Loggers.forClass(MetaStorageManagerImpl.class); +public class MetaStorageLeaderElectionListener implements LeaderElectionListener { + private static final IgniteLogger LOG = Loggers.forClass(MetaStorageLeaderElectionListener.class); private final IgniteSpinBusyLock busyLock; @@ -62,87 +66,112 @@ public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListen * * <p>Multi-threaded access is guarded by {@code serializationFutureMux}. */ + @Nullable private CompletableFuture<Void> serializationFuture = null; private final Object serializationFutureMux = new Object(); private final ClusterTimeImpl clusterTime; - MetaStorageRaftGroupEventsListener( + private final LogicalTopologyEventListener logicalTopologyEventListener = new MetaStorageLogicalTopologyEventListener(); + + private final CompletableFuture<MetaStorageConfiguration> metaStorageConfigurationFuture; + + /** + * Leader term if this node is a leader, {@code null} otherwise. + * + * <p>Multi-threaded access is guarded by {@code serializationFutureMux}. + */ + @Nullable + private Long thisNodeTerm = null; + + MetaStorageLeaderElectionListener( IgniteSpinBusyLock busyLock, ClusterService clusterService, LogicalTopologyService logicalTopologyService, CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut, - ClusterTimeImpl clusterTime + ClusterTimeImpl clusterTime, + CompletableFuture<MetaStorageConfiguration> metaStorageConfigurationFuture ) { this.busyLock = busyLock; this.nodeName = clusterService.nodeName(); this.logicalTopologyService = logicalTopologyService; this.metaStorageSvcFut = metaStorageSvcFut; this.clusterTime = clusterTime; + this.metaStorageConfigurationFuture = metaStorageConfigurationFuture; } @Override - public void onLeaderElected(long term) { + public void onLeaderElected(ClusterNode node, long term) { synchronized (serializationFutureMux) { - registerTopologyEventListeners(); + if (node.name().equals(nodeName) && serializationFuture == null) { + LOG.info("Node has been elected as the leader, starting Idle Safe Time scheduler"); - // Update learner configuration (in case we missed some topology updates) and initialize the serialization future. - serializationFuture = executeWithStatus((service, term1, isLeader) -> { - CompletableFuture<Void> fut; - if (isLeader) { - fut = this.resetLearners(service, term1); + thisNodeTerm = term; - clusterTime.startLeaderTimer(service); - } else { - fut = completedFuture(null); + logicalTopologyService.addEventListener(logicalTopologyEventListener); - clusterTime.stopLeaderTimer(); - } + metaStorageSvcFut + .thenAcceptBoth(metaStorageConfigurationFuture, (service, metaStorageConfiguration) -> + clusterTime.startSafeTimeScheduler( + safeTime -> service.syncTime(safeTime, term), + metaStorageConfiguration + )) + .whenComplete((v, e) -> { + if (e != null) { + LOG.error("Unable to start Idle Safe Time scheduler", e); + } + }); - return fut; - }); - } - } + // Update learner configuration (in case we missed some topology updates between elections). + serializationFuture = metaStorageSvcFut.thenCompose(service -> resetLearners(service.raftGroupService(), term)); + } else if (serializationFuture != null) { + LOG.info("Node has lost the leadership, stopping Idle Safe Time scheduler"); - private void registerTopologyEventListeners() { - logicalTopologyService.addEventListener(new LogicalTopologyEventListener() { - @Override - public void onNodeValidated(LogicalNode validatedNode) { - executeIfLeader((service, term) -> addLearner(service.raftGroupService(), validatedNode)); - } + thisNodeTerm = null; - @Override - public void onNodeInvalidated(LogicalNode invalidatedNode) { - executeIfLeader((service, term) -> removeLearner(service.raftGroupService(), invalidatedNode)); - } + logicalTopologyService.removeEventListener(logicalTopologyEventListener); - @Override - public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) { - onNodeInvalidated(leftNode); - } + clusterTime.stopSafeTimeScheduler(); + + serializationFuture.cancel(false); - @Override - public void onTopologyLeap(LogicalTopologySnapshot newTopology) { - executeIfLeader(MetaStorageRaftGroupEventsListener.this::resetLearners); + serializationFuture = null; } - }); + } } - @FunctionalInterface - private interface OnLeaderAction { - CompletableFuture<Void> apply(MetaStorageServiceImpl service, long term); + private class MetaStorageLogicalTopologyEventListener implements LogicalTopologyEventListener { + @Override + public void onNodeValidated(LogicalNode validatedNode) { + execute((raftService, term) -> addLearner(raftService, validatedNode)); + } + + @Override + public void onNodeInvalidated(LogicalNode invalidatedNode) { + execute((raftService, term) -> removeLearner(raftService, invalidatedNode)); + } + + @Override + public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) { + onNodeInvalidated(leftNode); + } + + @Override + public void onTopologyLeap(LogicalTopologySnapshot newTopology) { + execute(MetaStorageLeaderElectionListener.this::resetLearners); + } } @FunctionalInterface - private interface OnStatusAction { - CompletableFuture<Void> apply(MetaStorageServiceImpl service, long term, boolean isLeader); + private interface Action { + CompletableFuture<Void> apply(RaftGroupService raftService, long term); } /** * Executes the given action if the current node is the Meta Storage leader. */ - private void executeIfLeader(OnLeaderAction action) { + private void execute(Action action) { if (!busyLock.enterBusy()) { LOG.info("Skipping Meta Storage configuration update because the node is stopping"); @@ -156,9 +185,14 @@ public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListen return; } + // Term and serialization future are initialized together. + assert thisNodeTerm != null; + + long term = thisNodeTerm; + serializationFuture = serializationFuture // we don't care about exceptions here, they should be logged independently - .handle((v, e) -> executeIfLeaderImpl(action)) + .handle((v, e) -> metaStorageSvcFut.thenCompose(service -> action.apply(service.raftGroupService(), term))) .thenCompose(Function.identity()); } } finally { @@ -166,20 +200,6 @@ public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListen } } - private CompletableFuture<Void> executeIfLeaderImpl(OnLeaderAction action) { - return executeWithStatus((service, term, isLeader) -> isLeader ? action.apply(service, term) : completedFuture(null)); - } - - private CompletableFuture<Void> executeWithStatus(OnStatusAction action) { - return metaStorageSvcFut.thenCompose(service -> service.raftGroupService().refreshAndGetLeaderWithTerm() - .thenCompose(leaderWithTerm -> { - String leaderName = leaderWithTerm.leader().consistentId(); - - boolean isLeader = leaderName.equals(nodeName); - return action.apply(service, leaderWithTerm.term(), isLeader); - })); - } - private CompletableFuture<Void> addLearner(RaftGroupService raftService, ClusterNode learner) { return updateConfigUnderLock(() -> isPeer(raftService, learner) ? completedFuture(null) @@ -207,11 +227,9 @@ public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListen }))); } - private CompletableFuture<Void> resetLearners(MetaStorageServiceImpl service, long term) { + private CompletableFuture<Void> resetLearners(RaftGroupService raftService, long term) { return updateConfigUnderLock(() -> logicalTopologyService.validatedNodesOnLeader() .thenCompose(validatedNodes -> updateConfigUnderLock(() -> { - RaftGroupService raftService = service.raftGroupService(); - Set<String> peers = raftService.peers().stream().map(Peer::consistentId).collect(toSet()); Set<String> learners = validatedNodes.stream() @@ -236,7 +254,7 @@ public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListen try { return action.get() .whenComplete((v, e) -> { - if (e != null) { + if (e != null && !(unwrapCause(e) instanceof CancellationException)) { LOG.error("Unable to change peers on topology update", e); } }); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index fbac3f4cf5..5c9c72d8df 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.RevisionUpdateListener; import org.apache.ignite.internal.metastorage.WatchEvent; import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.dsl.Condition; import org.apache.ignite.internal.metastorage.dsl.Iif; import org.apache.ignite.internal.metastorage.dsl.Operation; @@ -62,6 +63,8 @@ import org.apache.ignite.internal.raft.RaftGroupEventsListener; import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration; import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; @@ -117,15 +120,24 @@ public class MetaStorageManagerImpl implements MetaStorageManager { private final AtomicBoolean isStopped = new AtomicBoolean(); /** - * Future which completes when MetaStorage manager finished local recovery. - * The value of the future is the revision which must be used for state recovery by other components. + * Future which completes when MetaStorage manager finished local recovery. The value of the future is the revision which must be used + * for state recovery by other components. */ private final CompletableFuture<Long> recoveryFinishedFuture = new CompletableFuture<>(); + /** + * Future that gets completed after {@link #deployWatches} method has been called. + */ + private final CompletableFuture<Void> deployWatchesFuture = new CompletableFuture<>(); + private final ClusterTimeImpl clusterTime; + private final TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory; + private volatile long appliedRevision; + private volatile MetaStorageConfiguration metaStorageConfiguration; + /** * The constructor. * @@ -144,7 +156,8 @@ public class MetaStorageManagerImpl implements MetaStorageManager { LogicalTopologyService logicalTopologyService, RaftManager raftMgr, KeyValueStorage storage, - HybridClock clock + HybridClock clock, + TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory ) { this.vaultMgr = vaultMgr; this.clusterService = clusterService; @@ -152,7 +165,28 @@ public class MetaStorageManagerImpl implements MetaStorageManager { this.cmgMgr = cmgMgr; this.logicalTopologyService = logicalTopologyService; this.storage = storage; - this.clusterTime = new ClusterTimeImpl(busyLock, clock); + this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), busyLock, clock); + this.topologyAwareRaftGroupServiceFactory = topologyAwareRaftGroupServiceFactory; + } + + /** + * Constructor for tests, that allows to pass Meta Storage configuration. + */ + @TestOnly + public MetaStorageManagerImpl( + VaultManager vaultMgr, + ClusterService clusterService, + ClusterManagementGroupManager cmgMgr, + LogicalTopologyService logicalTopologyService, + RaftManager raftMgr, + KeyValueStorage storage, + HybridClock clock, + TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory, + MetaStorageConfiguration configuration + ) { + this(vaultMgr, clusterService, cmgMgr, logicalTopologyService, raftMgr, storage, clock, topologyAwareRaftGroupServiceFactory); + + configure(configuration); } private CompletableFuture<Long> recover(MetaStorageServiceImpl service) { @@ -225,54 +259,97 @@ public class MetaStorageManagerImpl implements MetaStorageManager { } private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) { - String thisNodeName = clusterService.nodeName(); - - CompletableFuture<RaftGroupService> raftServiceFuture; - try { - var ownFsmCallerExecutorDisruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1); - - // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica. - if (metaStorageNodes.contains(thisNodeName)) { - PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes); - - Peer localPeer = configuration.peer(thisNodeName); - - assert localPeer != null; - - raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture( - new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), - configuration, - new MetaStorageListener(storage, clusterTime), - new MetaStorageRaftGroupEventsListener( - busyLock, - clusterService, - logicalTopologyService, - metaStorageSvcFut, - clusterTime - ), - ownFsmCallerExecutorDisruptorConfig - ); - } else { - PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName)); + String thisNodeName = clusterService.nodeName(); - Peer localPeer = configuration.learner(thisNodeName); + var disruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1); - assert localPeer != null; + CompletableFuture<? extends RaftGroupService> raftServiceFuture = metaStorageNodes.contains(thisNodeName) + ? startFollowerNode(metaStorageNodes, disruptorConfig) + : startLearnerNode(metaStorageNodes, disruptorConfig); - raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture( - new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), - configuration, - new MetaStorageListener(storage, clusterTime), - RaftGroupEventsListener.noopLsnr, - ownFsmCallerExecutorDisruptorConfig - ); - } + return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime)); } catch (NodeStoppingException e) { return CompletableFuture.failedFuture(e); } + } + + private CompletableFuture<? extends RaftGroupService> startFollowerNode( + Set<String> metaStorageNodes, + RaftNodeDisruptorConfiguration disruptorConfig + ) throws NodeStoppingException { + String thisNodeName = clusterService.nodeName(); + + PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes); + + Peer localPeer = configuration.peer(thisNodeName); + + assert localPeer != null; + + MetaStorageConfiguration localMetaStorageConfiguration = metaStorageConfiguration; + + assert localMetaStorageConfiguration != null : "Meta Storage configuration has not been set"; + + CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture( + new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), + configuration, + new MetaStorageListener(storage, clusterTime), + RaftGroupEventsListener.noopLsnr, + disruptorConfig, + topologyAwareRaftGroupServiceFactory + ); + + raftServiceFuture + .thenAccept(service -> service.subscribeLeader(new MetaStorageLeaderElectionListener( + busyLock, + clusterService, + logicalTopologyService, + metaStorageSvcFut, + clusterTime, + // We use the "deployWatchesFuture" to guarantee that the Configuration Manager will be started + // when the underlying code tries to read Meta Storage configuration. This is a consequence of having a circular + // dependency between these two components. + deployWatchesFuture.thenApply(v -> localMetaStorageConfiguration) + ))) + .whenComplete((v, e) -> { + if (e != null) { + LOG.error("Unable to register MetaStorageLeaderElectionListener", e); + } + }); + + return raftServiceFuture; + } + + private CompletableFuture<? extends RaftGroupService> startLearnerNode( + Set<String> metaStorageNodes, RaftNodeDisruptorConfiguration disruptorConfig + ) throws NodeStoppingException { + String thisNodeName = clusterService.nodeName(); + + PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName)); - return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime)); + Peer localPeer = configuration.learner(thisNodeName); + + assert localPeer != null; + + return raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture( + new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), + configuration, + new MetaStorageListener(storage, clusterTime), + RaftGroupEventsListener.noopLsnr, + disruptorConfig + ); + } + + /** + * Sets the Meta Storage configuration. + * + * <p>This method is needed to avoid the cyclic dependency between the Meta Storage and distributed configuration (built on top of the + * Meta Storage). + * + * <p>This method <b>must</b> always be called <b>before</b> calling {@link #start}. + */ + public final void configure(MetaStorageConfiguration metaStorageConfiguration) { + this.metaStorageConfiguration = metaStorageConfiguration; } @Override @@ -297,6 +374,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { .whenComplete((service, e) -> { if (e != null) { metaStorageSvcFut.completeExceptionally(e); + recoveryFinishedFuture.completeExceptionally(e); } else { assert service != null; @@ -323,13 +401,15 @@ public class MetaStorageManagerImpl implements MetaStorageManager { busyLock.block(); - clusterTime.stopLeaderTimer(); + deployWatchesFuture.cancel(true); - cancelOrConsume(metaStorageSvcFut, MetaStorageServiceImpl::close); + recoveryFinishedFuture.cancel(true); - IgniteUtils.closeAll( + IgniteUtils.closeAllManually( + clusterTime, + () -> cancelOrConsume(metaStorageSvcFut, MetaStorageServiceImpl::close), () -> raftMgr.stopRaftNodes(MetastorageGroupId.INSTANCE), - storage::close + storage ); } @@ -365,10 +445,18 @@ public class MetaStorageManagerImpl implements MetaStorageManager { } try { - return recoveryFinishedFuture.thenAccept(revision -> inBusyLock(busyLock, () -> { - // Meta Storage contract states that all updated entries under a particular revision must be stored in the Vault. - storage.startWatches(revision + 1, this::onRevisionApplied); - })); + return recoveryFinishedFuture + .thenAccept(revision -> inBusyLock(busyLock, () -> { + // Meta Storage contract states that all updated entries under a particular revision must be stored in the Vault. + storage.startWatches(revision + 1, this::onRevisionApplied); + })) + .whenComplete((v, e) -> { + if (e == null) { + deployWatchesFuture.complete(null); + } else { + deployWatchesFuture.completeExceptionally(e); + } + }); } finally { busyLock.leaveBusy(); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java index 6d0c535bc2..4f32decdff 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java @@ -277,10 +277,10 @@ public class MetaStorageServiceImpl implements MetaStorageService { * @param safeTime New safe time. * @return Future that will be completed when message is sent. */ - public CompletableFuture<Void> syncTime(HybridTimestamp safeTime) { - // TODO: https://issues.apache.org/jira/browse/IGNITE-19199 Only propagate safe time when ms is idle + public CompletableFuture<Void> syncTime(HybridTimestamp safeTime, long term) { SyncTimeCommand syncTimeCommand = context.commandsFactory().syncTimeCommand() .safeTimeLong(safeTime.longValue()) + .initiatorTerm(term) .build(); return context.raftService().run(syncTimeCommand); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java index d84c675aa4..8f836a1963 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java @@ -80,17 +80,21 @@ public class MetaStorageWriteHandler { WriteCommand command = clo.command(); try { - HybridTimestamp safeTime; - if (command instanceof MetaStorageWriteCommand) { - MetaStorageWriteCommand cmdWithTime = (MetaStorageWriteCommand) command; + var cmdWithTime = (MetaStorageWriteCommand) command; - safeTime = cmdWithTime.safeTime(); + HybridTimestamp safeTime = cmdWithTime.safeTime(); handleWriteWithTime(clo, cmdWithTime, safeTime); } else if (command instanceof SyncTimeCommand) { - // TODO: IGNITE-19199 WatchProcessor must be notified of the new safe time. - throw new UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-19199"); + var syncTimeCommand = (SyncTimeCommand) command; + + // Ignore the command if it has been sent by a stale leader. + if (clo.term() == syncTimeCommand.initiatorTerm()) { + clusterTime.updateSafeTime(syncTimeCommand.safeTime()); + } + + clo.result(null); } else { assert false : "Command was not found [cmd=" + command + ']'; } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java index b3aa7253b9..f1c953ca8a 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java @@ -17,26 +17,57 @@ package org.apache.ignite.internal.metastorage.server.time; +import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; + +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; +import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.lang.NodeStoppingException; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; /** * Cluster time implementation with additional methods to adjust time and update safe time. */ -public class ClusterTimeImpl implements ClusterTime { - private final IgniteSpinBusyLock busyLock; +public class ClusterTimeImpl implements ClusterTime, ManuallyCloseable { + private static final IgniteLogger LOG = Loggers.forClass(ClusterTimeImpl.class); - private volatile @Nullable LeaderTimer leaderTimer; + private final String nodeName; + + private final IgniteSpinBusyLock busyLock; private final HybridClock clock; - private final PendingComparableValuesTracker<HybridTimestamp, Void> safeTime; + private final PendingComparableValuesTracker<HybridTimestamp, Void> safeTime = + new PendingComparableValuesTracker<>(HybridTimestamp.MIN_VALUE); + + /** + * Scheduler for sending safe time periodically when Meta Storage is idle. + * + * <p>Scheduler is only created if this node has been elected as the Meta Storage leader. + * + * <p>Concurrent access is guarded by {@code this}. + */ + private @Nullable SafeTimeScheduler safeTimeScheduler; + + /** Action that issues a time sync command. */ + @FunctionalInterface + public interface SyncTimeAction { + CompletableFuture<Void> syncTime(HybridTimestamp time); + } /** * Constructor. @@ -44,30 +75,30 @@ public class ClusterTimeImpl implements ClusterTime { * @param busyLock Busy lock. * @param clock Node's hybrid clock. */ - public ClusterTimeImpl(IgniteSpinBusyLock busyLock, HybridClock clock) { + public ClusterTimeImpl(String nodeName, IgniteSpinBusyLock busyLock, HybridClock clock) { + this.nodeName = nodeName; this.busyLock = busyLock; this.clock = clock; - this.safeTime = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0)); } /** * Starts sync time scheduler. * - * @param service MetaStorage service that is used by scheduler to sync time. + * @param syncTimeAction Action that performs the time sync operation. */ - public void startLeaderTimer(MetaStorageServiceImpl service) { + public void startSafeTimeScheduler(SyncTimeAction syncTimeAction, MetaStorageConfiguration configuration) { if (!busyLock.enterBusy()) { return; } try { - assert leaderTimer == null; + synchronized (this) { + assert safeTimeScheduler == null; - LeaderTimer newTimer = new LeaderTimer(service); + safeTimeScheduler = new SafeTimeScheduler(syncTimeAction, configuration); - leaderTimer = newTimer; - - newTimer.start(); + safeTimeScheduler.start(); + } } finally { busyLock.leaveBusy(); } @@ -76,22 +107,19 @@ public class ClusterTimeImpl implements ClusterTime { /** * Stops sync time scheduler if it exists. */ - public void stopLeaderTimer() { - if (!busyLock.enterBusy()) { - return; - } + public synchronized void stopSafeTimeScheduler() { + if (safeTimeScheduler != null) { + safeTimeScheduler.stop(); - try { - LeaderTimer timer = leaderTimer; + safeTimeScheduler = null; + } + } - if (timer != null) { - timer.stop(); + @Override + public void close() throws Exception { + stopSafeTimeScheduler(); - leaderTimer = null; - } - } finally { - busyLock.leaveBusy(); - } + safeTime.close(); } @Override @@ -119,48 +147,85 @@ public class ClusterTimeImpl implements ClusterTime { } /** - * Updates hybrid logical clock using {@code ts}. Selects the maximum between current system time, - * hybrid clock's latest time and {@code ts} adding 1 logical tick to the result. + * Updates hybrid logical clock using {@code ts}. Selects the maximum between current system time, hybrid clock's latest time and + * {@code ts} adding 1 logical tick to the result. * * @param ts Timestamp. */ - public void adjust(HybridTimestamp ts) { + public synchronized void adjust(HybridTimestamp ts) { this.clock.update(ts); + + // Since this method is called when a write command is being processed and safe time is also updated by write commands, + // we need to re-schedule the idle time scheduler. + if (safeTimeScheduler != null) { + safeTimeScheduler.schedule(); + } } - private class LeaderTimer { + private class SafeTimeScheduler { + private final SyncTimeAction syncTimeAction; - private final MetaStorageServiceImpl service; + private final MetaStorageConfiguration configuration; - private LeaderTimer(MetaStorageServiceImpl service) { - this.service = service; + private final ScheduledExecutorService executorService = + Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.create(nodeName, "meta-storage-safe-time", LOG)); + + /** + * Current scheduled task. + * + * <p>Concurrent access is guarded by {@code this}. + */ + @Nullable + private ScheduledFuture<?> currentTask; + + SafeTimeScheduler(SyncTimeAction syncTimeAction, MetaStorageConfiguration configuration) { + this.syncTimeAction = syncTimeAction; + this.configuration = configuration; } void start() { schedule(); } - private void schedule() { - // TODO: https://issues.apache.org/jira/browse/IGNITE-19199 Only propagate safe time when ms is idle - } - - void disseminateTime() { - if (!busyLock.enterBusy()) { - // Shutting down. - return; + synchronized void schedule() { + // Cancel the previous task if we were re-scheduled because Meta Storage was not actually idle. + if (currentTask != null) { + currentTask.cancel(false); } - try { - HybridTimestamp now = clock.now(); - - service.syncTime(now); - } finally { - busyLock.leaveBusy(); - } + currentTask = executorService.schedule(() -> { + if (!busyLock.enterBusy()) { + return; + } + + try { + syncTimeAction.syncTime(clock.now()) + .whenComplete((v, e) -> { + if (e != null) { + Throwable cause = unwrapCause(e); + + if (!(cause instanceof CancellationException) && !(cause instanceof NodeStoppingException)) { + LOG.error("Unable to perform idle time sync", e); + } + } + }); + + // Re-schedule the task again. + schedule(); + } finally { + busyLock.leaveBusy(); + } + }, configuration.idleSyncTimeInterval().value(), TimeUnit.MILLISECONDS); } void stop() { - // TODO: https://issues.apache.org/jira/browse/IGNITE-19199 Stop safe time propagation + synchronized (this) { + if (currentTask != null) { + currentTask.cancel(false); + } + } + + IgniteUtils.shutdownAndAwaitTermination(executorService, 10, TimeUnit.SECONDS); } } diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java index a3aeb4e824..570295b0fa 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java @@ -29,29 +29,38 @@ import java.util.Set; import java.util.stream.Stream; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.raft.RaftManager; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; import org.apache.ignite.network.ClusterService; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; /** * Tests that check correctness of an invocation {@link MetaStorageManager#deployWatches()}. */ +@ExtendWith(ConfigurationExtension.class) public class MetaStorageDeployWatchesCorrectnessTest extends IgniteAbstractTest { /** Vault manager. */ private static VaultManager vaultManager; + @InjectConfiguration + private static MetaStorageConfiguration metaStorageConfiguration; + @BeforeAll public static void init() { vaultManager = new VaultManager(new InMemoryVaultService()); @@ -78,13 +87,12 @@ public class MetaStorageDeployWatchesCorrectnessTest extends IgniteAbstractTest ClusterManagementGroupManager cmgManager = mock(ClusterManagementGroupManager.class); ClusterService clusterService = mock(ClusterService.class); - LogicalTopologyService logicalTopologyService = mock(LogicalTopologyService.class); RaftManager raftManager = mock(RaftManager.class); - RaftGroupService raftGroupService = mock(RaftGroupService.class); + TopologyAwareRaftGroupService raftGroupService = mock(TopologyAwareRaftGroupService.class); when(cmgManager.metaStorageNodes()).thenReturn(completedFuture(Set.of(mcNodeName))); when(clusterService.nodeName()).thenReturn(mcNodeName); - when(raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(any(), any(), any(), any(), any())) + when(raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(any(), any(), any(), any(), any(), any())) .thenReturn(completedFuture(raftGroupService)); when(raftGroupService.run(any(GetCurrentRevisionCommand.class))).thenAnswer(invocation -> completedFuture(0L)); @@ -93,10 +101,12 @@ public class MetaStorageDeployWatchesCorrectnessTest extends IgniteAbstractTest vaultManager, clusterService, cmgManager, - logicalTopologyService, + mock(LogicalTopologyService.class), raftManager, new SimpleInMemoryKeyValueStorage(mcNodeName), - clock + clock, + mock(TopologyAwareRaftGroupServiceFactory.class), + metaStorageConfiguration ), StandaloneMetaStorageManager.create(vaultManager) ); diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java index 7993242b54..e9155fbb84 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.metastorage.impl; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -30,12 +31,16 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.raft.RaftManager; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; @@ -45,13 +50,18 @@ import org.apache.ignite.network.NodeMetadata; import org.apache.ignite.network.TopologyService; import org.apache.ignite.network.serialization.MessageSerializationRegistry; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; /** Tests MetaStorage manager recovery basics. */ +@ExtendWith(ConfigurationExtension.class) public class MetaStorageManagerRecoveryTest { private static final String NODE_NAME = "node"; private static final String LEADER_NAME = "ms-leader"; + @InjectConfiguration + private static MetaStorageConfiguration metaStorageConfiguration; + private MetaStorageManagerImpl metaStorageManager; private KeyValueStorage kvs; @@ -75,7 +85,9 @@ public class MetaStorageManagerRecoveryTest { topologyService, raftManager, kvs, - clock + clock, + mock(TopologyAwareRaftGroupServiceFactory.class), + metaStorageConfiguration ); } @@ -84,12 +96,11 @@ public class MetaStorageManagerRecoveryTest { RaftGroupService service = mock(RaftGroupService.class); - when(service.run(any(GetCurrentRevisionCommand.class))).thenAnswer(invocation -> { - return CompletableFuture.completedFuture(remoteRevision); - }); + when(service.run(any(GetCurrentRevisionCommand.class))) + .thenAnswer(invocation -> completedFuture(remoteRevision)); when(raft.startRaftGroupNodeAndWaitNodeReadyFuture(any(), any(), any(), any(), any())) - .thenAnswer(invocation -> CompletableFuture.completedFuture(service)); + .thenAnswer(invocation -> completedFuture(service)); return raft; } @@ -135,7 +146,7 @@ public class MetaStorageManagerRecoveryTest { ClusterManagementGroupManager mock = mock(ClusterManagementGroupManager.class); when(mock.metaStorageNodes()) - .thenAnswer(invocation -> CompletableFuture.completedFuture(Set.of(LEADER_NAME))); + .thenAnswer(invocation -> completedFuture(Set.of(LEADER_NAME))); return mock; } diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeTest.java new file mode 100644 index 0000000000..cc3c13cb52 --- /dev/null +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.server.time; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertTimeout; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.after; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; +import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl.SyncTimeAction; +import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.internal.util.TrackerClosedException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Tests for {@link ClusterTimeImpl}. + */ +@ExtendWith(ConfigurationExtension.class) +public class ClusterTimeTest { + private final ClusterTimeImpl clusterTime = new ClusterTimeImpl("foo", new IgniteSpinBusyLock(), new HybridClockImpl()); + + @AfterEach + void tearDown() { + // Stop the time and verify that all internal scheduled tasks do not impede the stop process. + assertTimeout(Duration.ofSeconds(1), clusterTime::close); + } + + @Test + void testWaitFor() { + HybridTimestamp now = clusterTime.now(); + + CompletableFuture<Void> future = clusterTime.waitFor(now); + + clusterTime.updateSafeTime(now); + + assertThat(future, willCompleteSuccessfully()); + } + + @Test + void testWaitForCancellation() throws Exception { + HybridTimestamp now = clusterTime.now(); + + CompletableFuture<Void> future = clusterTime.waitFor(now); + + clusterTime.close(); + + assertThat(future, willThrow(TrackerClosedException.class)); + } + + @Test + void testIdleSafeTimeScheduler(@InjectConfiguration("mock.idleSyncTimeInterval=1") MetaStorageConfiguration config) { + SyncTimeAction action = mock(SyncTimeAction.class); + + when(action.syncTime(any())).thenReturn(completedFuture(null)); + + clusterTime.startSafeTimeScheduler(action, config); + + verify(action, timeout(100).atLeast(3)).syncTime(any()); + } + + @Test + void testIdleSafeTimeSchedulerStop(@InjectConfiguration("mock.idleSyncTimeInterval=1") MetaStorageConfiguration config) { + SyncTimeAction action = mock(SyncTimeAction.class); + + when(action.syncTime(any())).thenReturn(completedFuture(null)); + + clusterTime.startSafeTimeScheduler(action, config); + + verify(action, timeout(100).atLeast(1)).syncTime(any()); + + clusterTime.stopSafeTimeScheduler(); + + clearInvocations(action); + + verify(action, after(100).never()).syncTime(any()); + } + + /** + * Tests that {@link ClusterTimeImpl#adjust} re-schedules the idle time sync timer. + */ + @Test + void testSchedulerProlongation(@InjectConfiguration("mock.idleSyncTimeInterval=50") MetaStorageConfiguration config) { + assertDoesNotThrow(() -> clusterTime.adjust(clusterTime.now())); + + SyncTimeAction action = mock(SyncTimeAction.class); + + when(action.syncTime(any())).thenReturn(completedFuture(null)); + + clusterTime.startSafeTimeScheduler(action, config); + + verify(action, after(30).never()).syncTime(any()); + + clusterTime.adjust(clusterTime.now()); + + verify(action, after(30).never()).syncTime(any()); + + verify(action, after(50)).syncTime(any()); + } +} diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java index 4f3770c00e..2801ff77cf 100644 --- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java +++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java @@ -26,19 +26,21 @@ import static org.mockito.Mockito.when; import java.io.Serializable; import java.util.Set; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.configuration.ConfigurationValue; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.RaftManager; -import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration; import org.apache.ignite.internal.raft.ReadCommand; import org.apache.ignite.internal.raft.WriteCommand; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupListener; -import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.network.ClusterService; @@ -78,7 +80,9 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl { mockClusterGroupManager(), mock(LogicalTopologyService.class), mockRaftManager(), - keyValueStorage + keyValueStorage, + mock(TopologyAwareRaftGroupServiceFactory.class), + mockConfiguration() ); } @@ -92,9 +96,27 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl { * @param raftMgr Raft manager. * @param storage Storage. This component owns this resource and will manage its lifecycle. */ - private StandaloneMetaStorageManager(VaultManager vaultMgr, ClusterService clusterService, ClusterManagementGroupManager cmgMgr, - LogicalTopologyService logicalTopologyService, RaftManager raftMgr, KeyValueStorage storage) { - super(vaultMgr, clusterService, cmgMgr, logicalTopologyService, raftMgr, storage, new HybridClockImpl()); + private StandaloneMetaStorageManager( + VaultManager vaultMgr, + ClusterService clusterService, + ClusterManagementGroupManager cmgMgr, + LogicalTopologyService logicalTopologyService, + RaftManager raftMgr, + KeyValueStorage storage, + TopologyAwareRaftGroupServiceFactory raftServiceFactory, + MetaStorageConfiguration configuration + ) { + super( + vaultMgr, + clusterService, + cmgMgr, + logicalTopologyService, + raftMgr, + storage, + new HybridClockImpl(), + raftServiceFactory, + configuration + ); } private static ClusterService mockClusterService() { @@ -115,13 +137,14 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl { private static RaftManager mockRaftManager() { ArgumentCaptor<RaftGroupListener> listenerCaptor = ArgumentCaptor.forClass(RaftGroupListener.class); RaftManager raftManager = mock(RaftManager.class); - RaftGroupService raftGroupService = mock(RaftGroupService.class); + TopologyAwareRaftGroupService raftGroupService = mock(TopologyAwareRaftGroupService.class); try { when(raftManager.startRaftGroupNodeAndWaitNodeReadyFuture( any(), any(), listenerCaptor.capture(), + any(), any() )).thenReturn(completedFuture(raftGroupService)); @@ -130,7 +153,8 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl { any(), listenerCaptor.capture(), any(), - any(RaftNodeDisruptorConfiguration.class) + any(), + any() )).thenReturn(completedFuture(raftGroupService)); } catch (NodeStoppingException e) { throw new RuntimeException(e); @@ -148,6 +172,16 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl { return raftManager; } + private static MetaStorageConfiguration mockConfiguration() { + MetaStorageConfiguration configuration = mock(MetaStorageConfiguration.class); + ConfigurationValue<Long> value = mock(ConfigurationValue.class); + + when(configuration.idleSyncTimeInterval()).thenReturn(value); + when(value.value()).thenReturn(1000L); + + return configuration; + } + private static CompletableFuture<Serializable> runCommand(Command command, RaftGroupListener listener) { CompletableFuture<Serializable> future = new CompletableFuture<>(); diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java index ce7a4a5b53..166889fad9 100644 --- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java +++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java @@ -36,7 +36,7 @@ public abstract class AbstractKeyValueStorageTest { } @AfterEach - void tearDown() throws Exception { + public void tearDown() throws Exception { storage.close(); } @@ -52,4 +52,4 @@ public abstract class AbstractKeyValueStorageTest { protected static byte[] keyValue(int k, int v) { return ("key" + k + '_' + "val" + v).getBytes(UTF_8); } -} \ No newline at end of file +} diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java index 472263fb94..6ef7b70be1 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java @@ -150,7 +150,7 @@ public class ActiveActorTest extends IgniteAbstractTest { Set<String> placementDriverNodesNames, RaftGroupEventsClientListener eventsClientListener ) { - var raftManager = new Loza(clusterService, raftConfiguration, dataPath, new HybridClockImpl()); + var raftManager = new Loza(clusterService, raftConfiguration, dataPath, new HybridClockImpl(), eventsClientListener); LogicalTopologyService logicalTopologyService = new LogicalTopologyServiceTestImpl(clusterService); @@ -524,7 +524,7 @@ public class ActiveActorTest extends IgniteAbstractTest { }); } - return (TopologyAwareRaftGroupService) TopologyAwareRaftGroupService.start( + return TopologyAwareRaftGroupService.start( GROUP_ID, localClusterService, FACTORY, diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java index a5938a5163..3144068e70 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguratio import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; @@ -96,7 +97,7 @@ public class MultiActorPlacementDriverTest extends IgniteAbstractTest { private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory(); - private HybridClock clock = new HybridClockImpl(); + private final HybridClock clock = new HybridClockImpl(); @InjectConfiguration private RaftConfiguration raftConfiguration; @@ -107,6 +108,9 @@ public class MultiActorPlacementDriverTest extends IgniteAbstractTest { @InjectConfiguration private DistributionZonesConfiguration dstZnsCfg; + @InjectConfiguration + private MetaStorageConfiguration metaStorageConfiguration; + private List<String> placementDriverNodeNames; private List<String> nodeNames; @@ -119,8 +123,6 @@ public class MultiActorPlacementDriverTest extends IgniteAbstractTest { /** Cluster service by node name. */ private Map<String, ClusterService> clusterServices; - private TestInfo testInfo; - /** This closure handles {@link LeaseGrantedMessage} to check the placement driver manager behavior. */ private IgniteTriFunction<LeaseGrantedMessage, String, String, LeaseGrantedMessageResponse> leaseGrantHandler; @@ -133,8 +135,6 @@ public class MultiActorPlacementDriverTest extends IgniteAbstractTest { this.nodeNames = IntStream.range(BASE_PORT, BASE_PORT + 5).mapToObj(port -> testNodeName(testInfo, port)) .collect(Collectors.toList()); - this.testInfo = testInfo; - this.clusterServices = startNodes(); List<LogicalTopologyServiceTestImpl> logicalTopManagers = new ArrayList<>(); @@ -283,7 +283,9 @@ public class MultiActorPlacementDriverTest extends IgniteAbstractTest { logicalTopologyService, raftManager, storage, - nodeClock + nodeClock, + topologyAwareRaftGroupServiceFactory, + metaStorageConfiguration ); if (this.metaStorageManager == null) { diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java index 592a2e67b9..e5c75c5fad 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguratio import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; @@ -128,6 +129,9 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest { @InjectConfiguration private DistributionZonesConfiguration dstZnsCfg; + @InjectConfiguration + private MetaStorageConfiguration metaStorageConfiguration; + private MetaStorageManagerImpl metaStorageManager; private PlacementDriverManager placementDriverManager; @@ -199,7 +203,9 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest { logicalTopologyService, raftManager, storage, - nodeClock + nodeClock, + topologyAwareRaftGroupServiceFactory, + metaStorageConfiguration ); placementDriverManager = new PlacementDriverManager( diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java index 60cc1e0e75..d164cb64e0 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java @@ -208,7 +208,7 @@ public class PlacementDriverManager implements IgniteComponent { }); } - private void onLeaderChange(ClusterNode leader, Long term) { + private void onLeaderChange(ClusterNode leader, long term) { if (!busyLock.enterBusy()) { throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()); } diff --git a/modules/metastorage-api/build.gradle b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/LeaderElectionListener.java similarity index 64% copy from modules/metastorage-api/build.gradle copy to modules/raft-api/src/main/java/org/apache/ignite/internal/raft/LeaderElectionListener.java index c5aefc2800..1dd1f95a96 100644 --- a/modules/metastorage-api/build.gradle +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/LeaderElectionListener.java @@ -15,17 +15,19 @@ * limitations under the License. */ -apply from: "$rootDir/buildscripts/java-core.gradle" -apply from: "$rootDir/buildscripts/publishing.gradle" -apply from: "$rootDir/buildscripts/java-junit5.gradle" +package org.apache.ignite.internal.raft; -description = 'ignite-metastorage-api' +import org.apache.ignite.network.ClusterNode; -dependencies { - api project(':ignite-network-api') - - implementation project(':ignite-core') - implementation libs.jetbrains.annotations - - annotationProcessor project(':ignite-network-annotation-processor') +/** + * Listener that gets called after a new Raft group leader has been elected. + */ +public interface LeaderElectionListener { + /** + * Callback that gets called after a new Raft group leader has been elected. + * + * @param leader New leader node. + * @param term New leader term. + */ + void onLeaderElected(ClusterNode leader, long term); } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java index 9e4ea288cf..5e9fe64fb5 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java @@ -130,6 +130,7 @@ public class Loza implements RaftManager { * @param dataPath Data path. * @param clock A hybrid logical clock. */ + @TestOnly public Loza( ClusterService clusterNetSvc, RaftConfiguration raftConfiguration, diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java index 077421ba56..e3685b8085 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java @@ -18,12 +18,12 @@ package org.apache.ignite.raft.jraft.rpc.impl; import java.util.ArrayList; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.List; -import java.util.Map; -import java.util.function.BiConsumer; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.raft.LeaderElectionListener; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.network.ClusterNode; @@ -33,7 +33,7 @@ import org.apache.ignite.network.ClusterNode; public class RaftGroupEventsClientListener { private static final IgniteLogger LOG = Loggers.forClass(RaftGroupEventsClientListener.class); - private final Map<ReplicationGroupId, List<BiConsumer<ClusterNode, Long>>> leaderElectionListeners = new ConcurrentHashMap<>(); + private final Map<ReplicationGroupId, List<LeaderElectionListener>> leaderElectionListeners = new ConcurrentHashMap<>(); /** * Register leader election listener for client. @@ -41,7 +41,7 @@ public class RaftGroupEventsClientListener { * @param groupId Group id. * @param listener Listener. */ - public void addLeaderElectionListener(ReplicationGroupId groupId, BiConsumer<ClusterNode, Long> listener) { + public void addLeaderElectionListener(ReplicationGroupId groupId, LeaderElectionListener listener) { leaderElectionListeners.compute(groupId, (k, listeners) -> { if (listeners == null) { listeners = new ArrayList<>(); @@ -53,13 +53,13 @@ public class RaftGroupEventsClientListener { }); } - /** - * Unregister leader election listener for client. - * - * @param groupId Group id. - * @param listener Listener. - */ - public void removeLeaderElectionListener(ReplicationGroupId groupId, BiConsumer<ClusterNode, Long> listener) { + /** + * Unregister leader election listener for client. + * + * @param groupId Group id. + * @param listener Listener. + */ + public void removeLeaderElectionListener(ReplicationGroupId groupId, LeaderElectionListener listener) { leaderElectionListeners.compute(groupId, (k, listeners) -> { if (listeners == null) { return null; @@ -79,12 +79,12 @@ public class RaftGroupEventsClientListener { * @param term Election term. */ public void onLeaderElected(ReplicationGroupId groupId, ClusterNode leader, long term) { - List<BiConsumer<ClusterNode, Long>> listeners = leaderElectionListeners.get(groupId); + List<LeaderElectionListener> listeners = leaderElectionListeners.get(groupId); if (listeners != null) { - for (BiConsumer<ClusterNode, Long> listener : listeners) { + for (LeaderElectionListener listener : listeners) { try { - listener.accept(leader, term); + listener.onLeaderElected(leader, term); } catch (Exception e) { LOG.warn("Failed to notify leader election listener for group=" + groupId, e); } diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java index cc5a82c84f..6fc49e2422 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java @@ -26,7 +26,6 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; -import java.util.function.BiConsumer; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; @@ -34,6 +33,7 @@ import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.raft.Command; +import org.apache.ignite.internal.raft.LeaderElectionListener; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupServiceImpl; @@ -83,8 +83,8 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { private final RaftConfiguration raftConfiguration; /** - * Whether to notify callback after subscription to pass the current leader and term into it, even if the leader - * did not change in that moment (see {@link #subscribeLeader(BiConsumer)}). + * Whether to notify callback after subscription to pass the current leader and term into it, even if the leader did not change in that + * moment (see {@link #subscribeLeader}). */ private final boolean notifyOnSubscription; @@ -96,8 +96,8 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { * @param executor RPC executor. * @param raftClient RPC RAFT client. * @param logicalTopologyService Logical topology. - * @param notifyOnSubscription Whether to notify callback after subscription to pass the current leader and term into it, - * even if the leader did not change in that moment (see {@link #subscribeLeader(BiConsumer)}). + * @param notifyOnSubscription Whether to notify callback after subscription to pass the current leader and term into it, even + * if the leader did not change in that moment (see {@link #subscribeLeader}). */ private TopologyAwareRaftGroupService( ClusterService cluster, @@ -163,11 +163,11 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { * @param getLeader True to get the group's leader upon service creation. * @param executor RPC executor. * @param logicalTopologyService Logical topology service. - * @param notifyOnSubscription Whether to notify callback after subscription to pass the current leader and term into it, - * even if the leader did not change in that moment (see {@link #subscribeLeader(BiConsumer)}). + * @param notifyOnSubscription Whether to notify callback after subscription to pass the current leader and term into it, even + * if the leader did not change in that moment (see {@link #subscribeLeader}). * @return Future to create a raft client. */ - public static CompletableFuture<RaftGroupService> start( + public static CompletableFuture<TopologyAwareRaftGroupService> start( ReplicationGroupId groupId, ClusterService cluster, RaftMessagesFactory factory, @@ -188,7 +188,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { * Sends a subscribe message to a specific node of the cluster. * * @param node Node. - * @param msg Subscribe message. + * @param msg Subscribe message. * @return A future that completes with true when the message sent and false value when the node left the cluster. */ private CompletableFuture<Boolean> sendSubscribeMessage(ClusterNode node, SubscriptionLeaderChangeRequest msg) { @@ -204,8 +204,8 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { * * @param node Node. * @param msg Subscribe message to send. - * @param msgSendFut Future that completes with true when the message sent and with false when the node left topology and cannot get a - * cluster. + * @param msgSendFut Future that completes with true when the message sent and with false when the node left topology and cannot + * get a cluster. */ private void sendWithRetry(ClusterNode node, SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) { clusterService.messagingService().invoke(node, msg, raftConfiguration.responseTimeout().value()).whenCompleteAsync((unused, th) -> { @@ -264,7 +264,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { * @param callback Callback closure. * @return Future that is completed when all subscription messages to peers are sent. */ - public CompletableFuture<Void> subscribeLeader(BiConsumer<ClusterNode, Long> callback) { + public CompletableFuture<Void> subscribeLeader(LeaderElectionListener callback) { assert !serverEventHandler.isSubscribed() : "The node already subscribed"; int peers = peers().size(); @@ -443,15 +443,15 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { /** * Leader election handler. */ - private static class ServerEventHandler implements BiConsumer<ClusterNode, Long> { + private static class ServerEventHandler implements LeaderElectionListener { /** A term of last elected leader. */ private long term = 0; /** Last elected leader. */ - private Peer leaderPeer; + private volatile Peer leaderPeer; /** A leader elected callback. */ - private BiConsumer<ClusterNode, Long> onLeaderElectedCallback; + private LeaderElectionListener onLeaderElectedCallback; /** * Notifies about a new leader elected, if it did not make before. @@ -459,12 +459,13 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { * @param node Node. * @param term Term. */ - private synchronized void onLeaderElected(ClusterNode node, long term) { + @Override + public synchronized void onLeaderElected(ClusterNode node, long term) { if (onLeaderElectedCallback != null && term > this.term) { this.term = term; this.leaderPeer = new Peer(node.name()); - onLeaderElectedCallback.accept(node, term); + onLeaderElectedCallback.onLeaderElected(node, term); } } @@ -473,7 +474,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { * * @param onLeaderElectedCallback A callback closure. */ - public synchronized void setOnLeaderElectedCallback(BiConsumer<ClusterNode, Long> onLeaderElectedCallback) { + synchronized void setOnLeaderElectedCallback(LeaderElectionListener onLeaderElectedCallback) { this.onLeaderElectedCallback = onLeaderElectedCallback; } @@ -482,15 +483,10 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { * * @return True if notification required, false otherwise. */ - public synchronized boolean isSubscribed() { + synchronized boolean isSubscribed() { return onLeaderElectedCallback != null; } - @Override - public void accept(ClusterNode clusterNode, Long term) { - onLeaderElected(clusterNode, term); - } - Peer leader() { return leaderPeer; } diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java index 8bb79db8a9..959f21f8ea 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java @@ -69,16 +69,16 @@ public class TopologyAwareRaftGroupServiceFactory implements RaftServiceFactory< ScheduledExecutorService raftClientExecutor ) { return TopologyAwareRaftGroupService.start( - groupId, - clusterService, - raftMessagesFactory, - raftConfiguration, - peersAndLearners, - true, - raftClientExecutor, - logicalTopologyService, - eventsClientListener, - true - ).thenApply(TopologyAwareRaftGroupService.class::cast); + groupId, + clusterService, + raftMessagesFactory, + raftConfiguration, + peersAndLearners, + true, + raftClientExecutor, + logicalTopologyService, + eventsClientListener, + true + ); } } diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java index 5aaca91e2a..669f388dbb 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java @@ -139,7 +139,7 @@ public class Replica { return whenReplicaReady; } - private void onLeaderElected(ClusterNode clusterNode, Long term) { + private void onLeaderElected(ClusterNode clusterNode, long term) { leaderRef.set(clusterNode); if (!leaderFuture.isDone()) { diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index c2b1008f20..868fd17acb 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -412,7 +412,7 @@ public class ReplicaManager implements IgniteComponent { if (throwable == null) { return true; } else { - LOG.error("Failed to stop replica [replicaGrpId={}]", replicaGrpId, throwable); + LOG.error("Failed to stop replica [replicaGrpId={}]", throwable, replicaGrpId); return false; } diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java index d9bd5bfd11..e0039f1f63 100644 --- a/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java +++ b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java @@ -391,7 +391,7 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { }); } - return (TopologyAwareRaftGroupService) TopologyAwareRaftGroupService.start( + return TopologyAwareRaftGroupService.start( GROUP_ID, localClusterService, FACTORY, diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java index d5ffb24812..83957a180d 100644 --- a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java +++ b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java @@ -35,11 +35,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse; import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory; import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage; +import org.apache.ignite.internal.raft.LeaderElectionListener; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; import org.apache.ignite.internal.replicator.listener.ReplicaListener; @@ -62,11 +62,11 @@ public class PlacementDriverReplicaSideTest { private Replica replica; - private AtomicReference<BiConsumer<ClusterNode, Long>> callbackHolder = new AtomicReference<>(); + private final AtomicReference<LeaderElectionListener> callbackHolder = new AtomicReference<>(); private PendingComparableValuesTracker<Long, Void> storageIndexTracker; - private AtomicLong indexOnLeader = new AtomicLong(0); + private final AtomicLong indexOnLeader = new AtomicLong(0); private Peer currentLeader = null; @@ -76,7 +76,7 @@ public class PlacementDriverReplicaSideTest { TopologyAwareRaftGroupService raftClient = mock(TopologyAwareRaftGroupService.class); when(raftClient.subscribeLeader(any())).thenAnswer(invocationOnMock -> { - BiConsumer<ClusterNode, Long> callback = invocationOnMock.getArgument(0); + LeaderElectionListener callback = invocationOnMock.getArgument(0); callbackHolder.set(callback); return completedFuture(null); @@ -126,7 +126,7 @@ public class PlacementDriverReplicaSideTest { */ private void leaderElection(ClusterNode leader) { if (callbackHolder.get() != null) { - callbackHolder.get().accept(leader, 1L); + callbackHolder.get().onLeaderElected(leader, 1L); } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java index 54881775cb..9a98a90d9d 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java @@ -52,9 +52,11 @@ import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; @@ -64,6 +66,7 @@ import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.StaticNodeFinder; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.apache.ignite.utils.ClusterServiceTestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -91,10 +94,10 @@ public class ItDistributedConfigurationPropertiesTest { private static ClusterManagementConfiguration clusterManagementConfiguration; @InjectConfiguration - private static SecurityConfiguration securityConfiguration; + private static NodeAttributesConfiguration nodeAttributes; @InjectConfiguration - private static NodeAttributesConfiguration nodeAttributes; + private static MetaStorageConfiguration metaStorageConfiguration; /** * An emulation of an Ignite node, that only contains components necessary for tests. @@ -139,7 +142,10 @@ public class ItDistributedConfigurationPropertiesTest { ); HybridClock clock = new HybridClockImpl(); - raftManager = new Loza(clusterService, raftConfiguration, workDir, clock); + + var raftGroupEventsClientListener = new RaftGroupEventsClientListener(); + + raftManager = new Loza(clusterService, raftConfiguration, workDir, clock, raftGroupEventsClientListener); var clusterStateStorage = new TestClusterStateStorage(); var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); @@ -154,14 +160,25 @@ public class ItDistributedConfigurationPropertiesTest { nodeAttributes, new TestConfigurationValidator()); + var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); + + var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + clusterService, + logicalTopologyService, + Loza.FACTORY, + raftGroupEventsClientListener + ); + metaStorageManager = new MetaStorageManagerImpl( vaultManager, clusterService, cmgManager, - new LogicalTopologyServiceImpl(logicalTopology, cmgManager), + logicalTopologyService, raftManager, new SimpleInMemoryKeyValueStorage(name()), - clock + clock, + topologyAwareRaftGroupServiceFactory, + metaStorageConfiguration ); deployWatchesFut = metaStorageManager.deployWatches(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java index 86a8c015b5..0a88f8fd22 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java @@ -45,9 +45,11 @@ import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; @@ -56,6 +58,7 @@ import org.apache.ignite.internal.vault.persistence.PersistentVaultService; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.StaticNodeFinder; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.apache.ignite.utils.ClusterServiceTestUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -76,6 +79,9 @@ public class ItDistributedConfigurationStorageTest { @InjectConfiguration private static NodeAttributesConfiguration nodeAttributes; + @InjectConfiguration + private static MetaStorageConfiguration metaStorageConfiguration; + /** * An emulation of an Ignite node, that only contains components necessary for tests. */ @@ -111,7 +117,9 @@ public class ItDistributedConfigurationStorageTest { HybridClock clock = new HybridClockImpl(); - raftManager = new Loza(clusterService, raftConfiguration, workDir, clock); + var raftGroupEventsClientListener = new RaftGroupEventsClientListener(); + + raftManager = new Loza(clusterService, raftConfiguration, workDir, clock, raftGroupEventsClientListener); var clusterStateStorage = new TestClusterStateStorage(); var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); @@ -126,14 +134,25 @@ public class ItDistributedConfigurationStorageTest { nodeAttributes, new TestConfigurationValidator()); + var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); + + var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + clusterService, + logicalTopologyService, + Loza.FACTORY, + raftGroupEventsClientListener + ); + metaStorageManager = new MetaStorageManagerImpl( vaultManager, clusterService, cmgManager, - new LogicalTopologyServiceImpl(logicalTopology, cmgManager), + logicalTopologyService, raftManager, new SimpleInMemoryKeyValueStorage(name()), - clock + clock, + topologyAwareRaftGroupServiceFactory, + metaStorageConfiguration ); deployWatchesFut = metaStorageManager.deployWatches(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java index ac1e021304..23fff6513d 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java @@ -96,6 +96,7 @@ import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.WatchEvent; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; @@ -198,6 +199,9 @@ public class ItRebalanceDistributedTest { @InjectConfiguration private static NodeAttributesConfiguration nodeAttributes; + @InjectConfiguration + private static MetaStorageConfiguration metaStorageConfiguration; + @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) private @interface UseTestTxStateStorage { @@ -647,7 +651,11 @@ public class ItRebalanceDistributedTest { lockManager = new HeapLockManager(); - raftManager = new Loza(clusterService, raftConfiguration, dir, new HybridClockImpl()); + HybridClock hybridClock = new HybridClockImpl(); + + var raftGroupEventsClientListener = new RaftGroupEventsClientListener(); + + raftManager = new Loza(clusterService, raftConfiguration, dir, hybridClock, raftGroupEventsClientListener); var clusterStateStorage = new TestClusterStateStorage(); var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); @@ -665,12 +673,10 @@ public class ItRebalanceDistributedTest { replicaManager = new ReplicaManager( clusterService, cmgManager, - new HybridClockImpl(), + hybridClock, Set.of(TableMessageGroup.class, TxMessageGroup.class) ); - HybridClock hybridClock = new HybridClockImpl(); - ReplicaService replicaSvc = new ReplicaService( clusterService.messagingService(), hybridClock @@ -682,9 +688,17 @@ public class ItRebalanceDistributedTest { LogicalTopologyServiceImpl logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); + var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + clusterService, + logicalTopologyService, + Loza.FACTORY, + raftGroupEventsClientListener + ); + KeyValueStorage keyValueStorage = testInfo.getTestMethod().get().isAnnotationPresent(UseRocksMetaStorage.class) ? new RocksDbKeyValueStorage(nodeName, resolveDir(dir, "metaStorage")) : new SimpleInMemoryKeyValueStorage(nodeName); + metaStorageManager = new MetaStorageManagerImpl( vaultManager, clusterService, @@ -692,7 +706,9 @@ public class ItRebalanceDistributedTest { logicalTopologyService, raftManager, keyValueStorage, - hybridClock + hybridClock, + topologyAwareRaftGroupServiceFactory, + metaStorageConfiguration ); cfgStorage = new DistributedConfigurationStorage(metaStorageManager, vaultManager); @@ -771,13 +787,6 @@ public class ItRebalanceDistributedTest { schemaManager = new SchemaManager(registry, tablesCfg, metaStorageManager); - TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( - clusterService, - logicalTopologyService, - Loza.FACTORY, - new RaftGroupEventsClientListener() - ); - distributionZoneManager = new DistributionZoneManager( zonesCfg, tablesCfg, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index f57fae0c9e..5d3923ef7d 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -87,6 +87,7 @@ import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.index.IndexManager; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; @@ -170,6 +171,9 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { @InjectConfiguration private static NodeAttributesConfiguration nodeAttributes; + @InjectConfiguration + private static MetaStorageConfiguration metaStorageConfiguration; + /** * Start some of Ignite components that are able to serve as Ignite node for test purposes. * @@ -239,7 +243,9 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { HybridClock hybridClock = new HybridClockImpl(); - var raftMgr = new Loza(clusterSvc, raftConfiguration, dir, hybridClock); + var raftGroupEventsClientListener = new RaftGroupEventsClientListener(); + + var raftMgr = new Loza(clusterSvc, raftConfiguration, dir, hybridClock, raftGroupEventsClientListener); var clusterStateStorage = new RocksDbClusterStateStorage(dir.resolve("cmg")); @@ -270,14 +276,25 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { var txManager = new TxManagerImpl(replicaService, lockManager, hybridClock, new TransactionIdGenerator(idx)); + var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); + + var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + clusterSvc, + logicalTopologyService, + Loza.FACTORY, + raftGroupEventsClientListener + ); + var metaStorageMgr = new MetaStorageManagerImpl( vault, clusterSvc, cmgManager, - new LogicalTopologyServiceImpl(logicalTopology, cmgManager), + logicalTopologyService, raftMgr, new RocksDbKeyValueStorage(name, dir.resolve("metastorage")), - hybridClock + hybridClock, + topologyAwareRaftGroupServiceFactory, + metaStorageConfiguration ); var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, vault); @@ -321,8 +338,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { SchemaManager schemaManager = new SchemaManager(registry, tablesConfig, metaStorageMgr); - LogicalTopologyServiceImpl logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); - DistributionZoneManager distributionZoneManager = new DistributionZoneManager( zonesConfig, tablesConfig, @@ -332,13 +347,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { name ); - TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( - clusterSvc, - new LogicalTopologyServiceImpl(logicalTopology, cmgManager), - Loza.FACTORY, - new RaftGroupEventsClientListener() - ); - var clockWaiter = new ClockWaiter("test", hybridClock); var catalogManager = new CatalogServiceImpl( diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 1811fd2310..0b015acd6a 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -88,6 +88,7 @@ import org.apache.ignite.internal.index.IndexManager; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.metrics.MetricManager; @@ -217,7 +218,7 @@ public class IgniteImpl implements Ignite { private final Loza raftMgr; /** Meta storage manager. */ - private final MetaStorageManager metaStorageMgr; + private final MetaStorageManagerImpl metaStorageMgr; /** Distributed configuration validator. */ private final ConfigurationValidator distributedConfigurationValidator; @@ -406,6 +407,13 @@ public class IgniteImpl implements Ignite { logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgMgr); + var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + clusterSvc, + logicalTopologyService, + Loza.FACTORY, + raftGroupEventsClientListener + ); + metaStorageMgr = new MetaStorageManagerImpl( vaultMgr, clusterSvc, @@ -413,7 +421,8 @@ public class IgniteImpl implements Ignite { logicalTopologyService, raftMgr, new RocksDbKeyValueStorage(name, workDir.resolve(METASTORAGE_DB_PATH)), - clock + clock, + topologyAwareRaftGroupServiceFactory ); this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, vaultMgr); @@ -434,15 +443,7 @@ public class IgniteImpl implements Ignite { TablesConfiguration tablesConfig = clusterConfigRegistry.getConfiguration(TablesConfiguration.KEY); - TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( - clusterSvc, - logicalTopologyService, - Loza.FACTORY, - raftGroupEventsClientListener - ); - - DistributionZonesConfiguration zonesConfiguration = clusterConfigRegistry - .getConfiguration(DistributionZonesConfiguration.KEY); + metaStorageMgr.configure(clusterConfigRegistry.getConfiguration(MetaStorageConfiguration.KEY)); metricManager.configure(clusterConfigRegistry.getConfiguration(MetricConfiguration.KEY)); @@ -479,7 +480,7 @@ public class IgniteImpl implements Ignite { schemaManager = new SchemaManager(registry, tablesConfig, metaStorageMgr); distributionZoneManager = new DistributionZoneManager( - zonesConfiguration, + clusterConfigRegistry.getConfiguration(DistributionZonesConfiguration.KEY), tablesConfig, metaStorageMgr, logicalTopologyService,