This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b6004047b3 IGNITE-19199 Propagate safe time when Meta Storage is idle 
(#2239)
b6004047b3 is described below

commit b6004047b3c3e9cd91b5ccf28c26ee206c1e3a7f
Author: Alexander Polovtcev <alex.polovt...@gmail.com>
AuthorDate: Thu Jun 29 17:53:44 2023 +0300

    IGNITE-19199 Propagate safe time when Meta Storage is idle (#2239)
---
 modules/cli/build.gradle                           |   2 +
 .../repl/executor/ItIgnitePicocliCommandsTest.java |  31 +++-
 modules/metastorage-api/build.gradle               |   4 +
 .../MetaStorageConfigurationModule.java}           |  31 ++--
 .../MetaStorageConfigurationSchema.java            |  40 +++++
 modules/metastorage/build.gradle                   |   5 +
 .../impl/ItMetaStorageManagerImplTest.java         |  32 +++-
 .../ItMetaStorageMultipleNodesAbstractTest.java    | 147 +++++++++++++---
 ...MetaStorageSafeTimePropagationAbstractTest.java |  11 +-
 .../impl/ItMetaStorageServicePersistenceTest.java  |   4 +-
 .../metastorage/impl/ItMetaStorageServiceTest.java |   2 +-
 .../metastorage/impl/ItMetaStorageWatchTest.java   |  27 ++-
 .../metastorage/command/SyncTimeCommand.java       |   3 +
 ...java => MetaStorageLeaderElectionListener.java} | 148 +++++++++-------
 .../metastorage/impl/MetaStorageManagerImpl.java   | 192 +++++++++++++++------
 .../metastorage/impl/MetaStorageServiceImpl.java   |   4 +-
 .../server/raft/MetaStorageWriteHandler.java       |  16 +-
 .../metastorage/server/time/ClusterTimeImpl.java   | 165 ++++++++++++------
 .../MetaStorageDeployWatchesCorrectnessTest.java   |  22 ++-
 .../impl/MetaStorageManagerRecoveryTest.java       |  23 ++-
 .../metastorage/server/time/ClusterTimeTest.java   | 132 ++++++++++++++
 .../impl/StandaloneMetaStorageManager.java         |  50 +++++-
 .../server/AbstractKeyValueStorageTest.java        |   4 +-
 .../internal/placementdriver/ActiveActorTest.java  |   4 +-
 .../MultiActorPlacementDriverTest.java             |  14 +-
 .../PlacementDriverManagerTest.java                |   8 +-
 .../placementdriver/PlacementDriverManager.java    |   2 +-
 .../internal/raft/LeaderElectionListener.java}     |  24 +--
 .../java/org/apache/ignite/internal/raft/Loza.java |   1 +
 .../rpc/impl/RaftGroupEventsClientListener.java    |  28 +--
 .../raft/client/TopologyAwareRaftGroupService.java |  44 +++--
 .../TopologyAwareRaftGroupServiceFactory.java      |  22 +--
 .../apache/ignite/internal/replicator/Replica.java |   2 +-
 .../ignite/internal/replicator/ReplicaManager.java |   2 +-
 .../client/TopologyAwareRaftGroupServiceTest.java  |   2 +-
 .../replicator/PlacementDriverReplicaSideTest.java |  10 +-
 .../ItDistributedConfigurationPropertiesTest.java  |  27 ++-
 .../ItDistributedConfigurationStorageTest.java     |  25 ++-
 .../storage/ItRebalanceDistributedTest.java        |  33 ++--
 .../runner/app/ItIgniteNodeRestartTest.java        |  32 ++--
 .../org/apache/ignite/internal/app/IgniteImpl.java |  25 +--
 41 files changed, 1019 insertions(+), 381 deletions(-)

diff --git a/modules/cli/build.gradle b/modules/cli/build.gradle
index 9f0eddd54d..bf52713ad4 100644
--- a/modules/cli/build.gradle
+++ b/modules/cli/build.gradle
@@ -91,8 +91,10 @@ dependencies {
     integrationTestAnnotationProcessor libs.picocli.annotation.processor
     integrationTestAnnotationProcessor 
libs.micronaut.inject.annotation.processor
     integrationTestAnnotationProcessor 
libs.micronaut.validation.annotation.processor
+
     integrationTestImplementation testFixtures(project)
     integrationTestImplementation project(':ignite-api')
+    integrationTestImplementation project(':ignite-configuration-api')
     integrationTestImplementation project(':ignite-runner')
     integrationTestImplementation project(':ignite-schema')
     integrationTestImplementation project(':ignite-sql-engine')
diff --git 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/core/repl/executor/ItIgnitePicocliCommandsTest.java
 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/core/repl/executor/ItIgnitePicocliCommandsTest.java
index e6e8f4d392..4fa20d8f85 100644
--- 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/core/repl/executor/ItIgnitePicocliCommandsTest.java
+++ 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/core/repl/executor/ItIgnitePicocliCommandsTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.cli.core.repl.executor;
 
+import static java.util.stream.Collectors.flatMapping;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toUnmodifiableList;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
@@ -32,8 +35,12 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.ignite.configuration.ConfigurationModule;
+import org.apache.ignite.configuration.RootKey;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
 import 
org.apache.ignite.internal.cli.commands.CliCommandTestInitializedIntegrationBase;
 import org.apache.ignite.internal.cli.commands.TopLevelCliReplCommand;
 import org.apache.ignite.internal.cli.core.repl.Session;
@@ -44,6 +51,7 @@ import 
org.apache.ignite.internal.cli.core.repl.completer.filter.CompleterFilter
 import 
org.apache.ignite.internal.cli.core.repl.completer.filter.DynamicCompleterFilter;
 import 
org.apache.ignite.internal.cli.core.repl.completer.filter.NonRepeatableOptionsFilter;
 import 
org.apache.ignite.internal.cli.core.repl.completer.filter.ShortOptionsFilter;
+import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
 import org.assertj.core.util.Files;
 import org.jline.reader.Candidate;
 import org.jline.reader.LineReader;
@@ -65,6 +73,23 @@ public class ItIgnitePicocliCommandsTest extends 
CliCommandTestInitializedIntegr
 
     private static final String DEFAULT_REST_URL = "http://localhost:10300";;
 
+    private static final List<String> DISTRIBUTED_CONFIGURATION_KEYS;
+
+    private static final List<String> LOCAL_CONFIGURATION_KEYS;
+
+    static {
+        Map<ConfigurationType, List<String>> configKeysByType = new 
ServiceLoaderModulesProvider()
+                .modules(ItIgnitePicocliCommandsTest.class.getClassLoader())
+                .stream()
+                .collect(groupingBy(
+                        ConfigurationModule::type,
+                        flatMapping(module -> 
module.rootKeys().stream().map(RootKey::key), toUnmodifiableList())
+                ));
+
+        DISTRIBUTED_CONFIGURATION_KEYS = 
configKeysByType.get(ConfigurationType.DISTRIBUTED);
+        LOCAL_CONFIGURATION_KEYS = 
configKeysByType.get(ConfigurationType.LOCAL);
+    }
+
     @Inject
     DynamicCompleterRegistry dynamicCompleterRegistry;
 
@@ -215,7 +240,7 @@ public class ItIgnitePicocliCommandsTest extends 
CliCommandTestInitializedIntegr
         // wait for lazy init of node config completer
         await("For given parsed words: " + givenParsedLine.words()).until(
                 () -> complete(givenParsedLine),
-                containsInAnyOrder("rest", "compute", "clientConnector", 
"raft", "network", "cluster", "deployment", "nodeAttributes")
+                
containsInAnyOrder(LOCAL_CONFIGURATION_KEYS.toArray(String[]::new))
         );
     }
 
@@ -265,7 +290,7 @@ public class ItIgnitePicocliCommandsTest extends 
CliCommandTestInitializedIntegr
         // wait for lazy init of cluster config completer
         await("For given parsed words: " + givenParsedLine.words()).until(
                 () -> complete(givenParsedLine),
-                containsInAnyOrder("aimem", "aipersist", "metrics", "rocksDb", 
"table", "zone", "security", "schemaSync", "gc")
+                
containsInAnyOrder(DISTRIBUTED_CONFIGURATION_KEYS.toArray(String[]::new))
         );
     }
 
@@ -288,7 +313,7 @@ public class ItIgnitePicocliCommandsTest extends 
CliCommandTestInitializedIntegr
         // wait for lazy init of cluster config completer
         await("For given parsed words: " + givenParsedLine.words()).until(
                 () -> complete(givenParsedLine),
-                containsInAnyOrder("aimem", "aipersist", "metrics", "rocksDb", 
"table", "zone", "security", "schemaSync", "gc")
+                
containsInAnyOrder(DISTRIBUTED_CONFIGURATION_KEYS.toArray(String[]::new))
         );
     }
 
diff --git a/modules/metastorage-api/build.gradle 
b/modules/metastorage-api/build.gradle
index c5aefc2800..afb4c5d9a0 100644
--- a/modules/metastorage-api/build.gradle
+++ b/modules/metastorage-api/build.gradle
@@ -23,9 +23,13 @@ description = 'ignite-metastorage-api'
 
 dependencies {
     api project(':ignite-network-api')
+    api project(':ignite-configuration-api')
 
     implementation project(':ignite-core')
     implementation libs.jetbrains.annotations
+    implementation libs.auto.service.annotations
 
     annotationProcessor project(':ignite-network-annotation-processor')
+    annotationProcessor project(':ignite-configuration-annotation-processor')
+    annotationProcessor libs.auto.service
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationModule.java
similarity index 51%
copy from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
copy to 
modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationModule.java
index 85059bbba0..646c64b1f4 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationModule.java
@@ -15,24 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.command;
+package org.apache.ignite.internal.metastorage.configuration;
 
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.annotations.Transferable;
+import com.google.auto.service.AutoService;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.ignite.configuration.ConfigurationModule;
+import org.apache.ignite.configuration.RootKey;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
 
 /**
- * Command that initiates idle safe time synchronization.
+ * {@link ConfigurationModule} for Meta Storage configuration.
  */
-@Transferable(MetastorageCommandsMessageGroup.SYNC_TIME)
-public interface SyncTimeCommand extends WriteCommand {
-    /** New safe time. */
-    long safeTimeLong();
+@AutoService(ConfigurationModule.class)
+public class MetaStorageConfigurationModule implements ConfigurationModule {
+    @Override
+    public ConfigurationType type() {
+        return ConfigurationType.DISTRIBUTED;
+    }
 
-    /** New safe time. */
-    default HybridTimestamp safeTime() {
-        return hybridTimestamp(safeTimeLong());
+    @Override
+    public Collection<RootKey<?, ?>> rootKeys() {
+        return Collections.singleton(MetaStorageConfiguration.KEY);
     }
 }
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java
new file mode 100644
index 0000000000..a08b04e2d3
--- /dev/null
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.configuration;
+
+import org.apache.ignite.configuration.annotation.ConfigurationRoot;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/**
+ * Configuration schema for the Meta Storage module.
+ */
+@ConfigurationRoot(rootName = "metaStorage", type = 
ConfigurationType.DISTRIBUTED)
+public class MetaStorageConfigurationSchema {
+    /**
+     * Duration (in milliseconds) used to determine how often to issue time 
sync commands when the Meta Storage is idle
+     * (no writes are being issued).
+     *
+     * <p>Making this value too small increases the network load, while making 
this value too large can lead to increased latency of
+     * Meta Storage reads.
+     */
+    @Value(hasDefault = true)
+    @Range(min = 1)
+    public long idleSyncTimeInterval = 500;
+}
diff --git a/modules/metastorage/build.gradle b/modules/metastorage/build.gradle
index 3ce2ee1fde..a7e9f66380 100644
--- a/modules/metastorage/build.gradle
+++ b/modules/metastorage/build.gradle
@@ -29,6 +29,7 @@ dependencies {
     implementation project(':ignite-network-api')
     implementation project(':ignite-vault')
     implementation project(':ignite-raft-api')
+    implementation project(':ignite-replicator')
     implementation project(':ignite-api')
     implementation project(':ignite-core')
     implementation project(':ignite-rocksdb-common')
@@ -39,12 +40,15 @@ dependencies {
 
     testImplementation testFixtures(project(':ignite-core'))
     testImplementation testFixtures(project(':ignite-vault'))
+    testImplementation testFixtures(project(':ignite-configuration'))
+    testImplementation testFixtures(project(':ignite-replicator'))
     testImplementation libs.mockito.junit
     testImplementation libs.hamcrest.core
 
     integrationTestImplementation libs.jetbrains.annotations
     integrationTestImplementation project(":ignite-cluster-management")
     integrationTestImplementation project(':ignite-network')
+    integrationTestImplementation project(':ignite-replicator')
     integrationTestImplementation project(':ignite-rest')
     integrationTestImplementation project(':ignite-raft')
     integrationTestImplementation project(":ignite-raft-api")
@@ -61,6 +65,7 @@ dependencies {
     testFixturesImplementation project(':ignite-cluster-management')
     testFixturesImplementation project(':ignite-core')
     testFixturesImplementation project(':ignite-raft-api')
+    testFixturesImplementation project(':ignite-replicator')
     testFixturesImplementation project(':ignite-rocksdb-common')
     testFixturesImplementation project(':ignite-vault')
     testFixturesImplementation libs.jetbrains.annotations
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index c7773691ac..ff2a75c687 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -55,11 +55,13 @@ import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.dsl.Conditions;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.raft.Loza;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -70,6 +72,7 @@ import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -93,13 +96,29 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
     private MetaStorageManagerImpl metaStorageManager;
 
     @BeforeEach
-    void setUp(TestInfo testInfo, @InjectConfiguration RaftConfiguration 
raftConfiguration) {
+    void setUp(
+            TestInfo testInfo,
+            @InjectConfiguration RaftConfiguration raftConfiguration,
+            @InjectConfiguration MetaStorageConfiguration 
metaStorageConfiguration
+    ) {
         var addr = new NetworkAddress("localhost", 10_000);
 
         clusterService = clusterService(testInfo, addr.port(), new 
StaticNodeFinder(List.of(addr)));
 
         HybridClock clock = new HybridClockImpl();
-        raftManager = new Loza(clusterService, raftConfiguration, 
workDir.resolve("loza"), clock);
+
+        var raftGroupEventsClientListener = new 
RaftGroupEventsClientListener();
+
+        raftManager = new Loza(clusterService, raftConfiguration, 
workDir.resolve("loza"), clock, raftGroupEventsClientListener);
+
+        var logicalTopologyService = mock(LogicalTopologyService.class);
+
+        var topologyAwareRaftGroupServiceFactory = new 
TopologyAwareRaftGroupServiceFactory(
+                clusterService,
+                logicalTopologyService,
+                Loza.FACTORY,
+                raftGroupEventsClientListener
+        );
 
         vaultManager = new VaultManager(new InMemoryVaultService());
 
@@ -113,10 +132,12 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
                 vaultManager,
                 clusterService,
                 cmgManager,
-                mock(LogicalTopologyService.class),
+                logicalTopologyService,
                 raftManager,
                 storage,
-                clock
+                clock,
+                topologyAwareRaftGroupServiceFactory,
+                metaStorageConfiguration
         );
 
         vaultManager.start();
@@ -279,7 +300,8 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
                 mock(LogicalTopologyService.class),
                 raftManager,
                 storage,
-                new HybridClockImpl()
+                new HybridClockImpl(),
+                mock(TopologyAwareRaftGroupServiceFactory.class)
         );
 
         metaStorageManager.stop();
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 9103686e64..1e039b2bf8 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -29,6 +29,8 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -59,14 +61,15 @@ import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
-import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.RaftNodeId;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
@@ -77,7 +80,7 @@ import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.StaticNodeFinder;
-import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -100,6 +103,12 @@ public abstract class 
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
     @InjectConfiguration
     private static NodeAttributesConfiguration nodeAttributes;
 
+    /**
+     * Large interval to effectively disable idle safe time propagation.
+     */
+    @InjectConfiguration("mock.idleSyncTimeInterval=1000000")
+    private static MetaStorageConfiguration metaStorageConfiguration;
+
     public abstract KeyValueStorage createStorage(String nodeName, Path path);
 
     private class Node {
@@ -126,11 +135,15 @@ public abstract class 
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
             Path basePath = dataPath.resolve(name());
 
             HybridClock clock = new HybridClockImpl();
+
+            var raftGroupEventsClientListener = new 
RaftGroupEventsClientListener();
+
             this.raftManager = new Loza(
                     clusterService,
                     raftConfiguration,
                     basePath.resolve("raft"),
-                    clock
+                    clock,
+                    raftGroupEventsClientListener
             );
 
             var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
@@ -145,20 +158,31 @@ public abstract class 
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
                     nodeAttributes,
                     new TestConfigurationValidator());
 
+            var logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+            var topologyAwareRaftGroupServiceFactory = new 
TopologyAwareRaftGroupServiceFactory(
+                    clusterService,
+                    logicalTopologyService,
+                    Loza.FACTORY,
+                    raftGroupEventsClientListener
+            );
+
             this.metaStorageManager = new MetaStorageManagerImpl(
                     vaultManager,
                     clusterService,
                     cmgManager,
-                    new LogicalTopologyServiceImpl(logicalTopology, 
cmgManager),
+                    logicalTopologyService,
                     raftManager,
                     createStorage(name(), basePath),
-                    clock
+                    clock,
+                    topologyAwareRaftGroupServiceFactory,
+                    metaStorageConfiguration
             );
 
             deployWatchesFut = metaStorageManager.deployWatches();
         }
 
-        void start() throws NodeStoppingException {
+        void start() {
             List<IgniteComponent> components = List.of(
                     vaultManager,
                     clusterService,
@@ -492,36 +516,105 @@ public abstract class 
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
         }, TimeUnit.SECONDS.toMillis(1)));
     }
 
+    /**
+     * Tests that safe time is propagated from the leader even if the Meta 
Storage is idle.
+     */
+    @Test
+    void testIdleSafeTimePropagation(TestInfo testInfo) throws Exception {
+        // Enable idle safe time sync.
+        CompletableFuture<Void> updateIdleSyncTimeIntervalFuture = 
metaStorageConfiguration.idleSyncTimeInterval().update(100L);
+
+        assertThat(updateIdleSyncTimeIntervalFuture, 
willCompleteSuccessfully());
+
+        Node firstNode = startNode(testInfo);
+        Node secondNode = startNode(testInfo);
+
+        firstNode.cmgManager.initCluster(List.of(firstNode.name()), 
List.of(firstNode.name()), "test");
+
+        assertThat(firstNode.cmgManager.onJoinReady(), 
willCompleteSuccessfully());
+        assertThat(secondNode.cmgManager.onJoinReady(), 
willCompleteSuccessfully());
+
+        firstNode.waitWatches();
+        secondNode.waitWatches();
+
+        ClusterTime firstNodeTime = firstNode.metaStorageManager.clusterTime();
+        ClusterTime secondNodeTime = 
secondNode.metaStorageManager.clusterTime();
+
+        HybridTimestamp now = firstNodeTime.now();
+
+        assertThat(firstNodeTime.waitFor(now), willCompleteSuccessfully());
+        assertThat(secondNodeTime.waitFor(now), willCompleteSuccessfully());
+    }
+
+    /**
+     * Tests that safe time is propagated after leader was changed and the 
Meta Storage is idle.
+     */
+    @Test
+    void testIdleSafeTimePropagationLeaderTransferred(TestInfo testInfo) 
throws Exception {
+        // Enable idle safe time sync.
+        CompletableFuture<Void> updateIdleSyncTimeIntervalFuture = 
metaStorageConfiguration.idleSyncTimeInterval().update(100L);
+
+        assertThat(updateIdleSyncTimeIntervalFuture, 
willCompleteSuccessfully());
+
+        Node firstNode = startNode(testInfo);
+        Node secondNode = startNode(testInfo);
+
+        firstNode.cmgManager.initCluster(List.of(firstNode.name(), 
secondNode.name()), List.of(firstNode.name()), "test");
+
+        assertThat(firstNode.cmgManager.onJoinReady(), 
willCompleteSuccessfully());
+        assertThat(secondNode.cmgManager.onJoinReady(), 
willCompleteSuccessfully());
+
+        firstNode.waitWatches();
+        secondNode.waitWatches();
+
+        ClusterTime firstNodeTime = firstNode.metaStorageManager.clusterTime();
+        ClusterTime secondNodeTime = 
secondNode.metaStorageManager.clusterTime();
+
+        Node leader = transferLeadership(firstNode, secondNode);
+
+        HybridTimestamp now = leader.metaStorageManager.clusterTime().now();
+
+        assertThat(firstNodeTime.waitFor(now), willCompleteSuccessfully());
+        assertThat(secondNodeTime.waitFor(now), willCompleteSuccessfully());
+
+        leader = transferLeadership(firstNode, secondNode);
+
+        now = leader.metaStorageManager.clusterTime().now();
+
+        assertThat(firstNodeTime.waitFor(now), willCompleteSuccessfully());
+        assertThat(secondNodeTime.waitFor(now), willCompleteSuccessfully());
+    }
+
     private Node transferLeadership(Node firstNode, Node secondNode) {
-        RaftGroupService svc1 = getMetastorageService(firstNode);
-        RaftGroupService svc2 = getMetastorageService(secondNode);
+        RaftGroupService svc = getMetastorageService(firstNode);
 
-        boolean leaderFirst = false;
+        CompletableFuture<Node> future = svc.refreshLeader()
+                .thenCompose(v -> {
+                    Peer leader = svc.leader();
 
-        RaftGroupService leader;
-        RaftGroupService notLeader;
+                    assertThat(leader, is(notNullValue()));
 
-        if (svc1.getRaftNode().isLeader()) {
-            leader = svc1;
-            notLeader = svc2;
+                    Peer newLeader = svc.peers().stream()
+                            .filter(p -> !p.equals(leader))
+                            .findFirst()
+                            .orElseThrow();
 
-            leaderFirst = true;
-        } else {
-            leader = svc2;
-            notLeader = svc1;
-        }
+                    Node newLeaderNode = 
newLeader.consistentId().equals(firstNode.name()) ? firstNode : secondNode;
+
+                    return svc.transferLeadership(newLeader).thenApply(unused 
-> newLeaderNode);
+                });
 
-        leader.getRaftNode().transferLeadershipTo(notLeader.getServerId());
+        assertThat(future, willCompleteSuccessfully());
 
-        return leaderFirst ? secondNode : firstNode;
+        return future.join();
     }
 
     private RaftGroupService getMetastorageService(Node node) {
-        JraftServerImpl server1 = (JraftServerImpl) node.raftManager.server();
+        CompletableFuture<RaftGroupService> future = 
node.metaStorageManager.metaStorageServiceFuture()
+                .thenApply(MetaStorageServiceImpl::raftGroupService);
 
-        RaftNodeId nodeId = server1.localNodes().stream()
-                .filter(id -> 
MetastorageGroupId.INSTANCE.equals(id.groupId())).findFirst().get();
+        assertThat(future, willCompleteSuccessfully());
 
-        return server1.raftGroupService(nodeId);
+        return future.join();
     }
 }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
index c73c6ac301..068b2b2721 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
@@ -41,13 +41,10 @@ import org.junit.jupiter.api.Test;
 public abstract class ItMetaStorageSafeTimePropagationAbstractTest extends 
AbstractKeyValueStorageTest {
     private final HybridClock clock = new HybridClockImpl();
 
-    private final ClusterTimeImpl time = new ClusterTimeImpl(new 
IgniteSpinBusyLock(), clock);
+    private final ClusterTimeImpl time = new ClusterTimeImpl("node", new 
IgniteSpinBusyLock(), clock);
 
     @BeforeEach
-    @Override
-    public void setUp() {
-        super.setUp();
-
+    public void startWatches() {
         storage.startWatches(0, (e, t) -> {
             time.updateSafeTime(t);
 
@@ -56,8 +53,8 @@ public abstract class 
ItMetaStorageSafeTimePropagationAbstractTest extends Abstr
     }
 
     @AfterEach
-    void tearDown() throws Exception {
-        storage.close();
+    public void stopTime() throws Exception {
+        time.close();
     }
 
     @Test
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
index 33575176b7..8fa5a64371 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
@@ -71,7 +71,7 @@ public class ItMetaStorageServicePersistenceTest extends 
ItAbstractListenerSnaps
     public void beforeFollowerStop(RaftGroupService service, RaftServer 
server) throws Exception {
         ClusterNode followerNode = getNode(server);
 
-        var clusterTime = new ClusterTimeImpl(new IgniteSpinBusyLock(), new 
HybridClockImpl());
+        var clusterTime = new ClusterTimeImpl(followerNode.name(), new 
IgniteSpinBusyLock(), new HybridClockImpl());
 
         metaStorage = new MetaStorageServiceImpl(followerNode.name(), service, 
new IgniteSpinBusyLock(), clusterTime);
 
@@ -149,7 +149,7 @@ public class ItMetaStorageServicePersistenceTest extends 
ItAbstractListenerSnaps
             return s;
         });
 
-        return new MetaStorageListener(storage, new ClusterTimeImpl(new 
IgniteSpinBusyLock(), new HybridClockImpl()));
+        return new MetaStorageListener(storage, new ClusterTimeImpl(nodeName, 
new IgniteSpinBusyLock(), new HybridClockImpl()));
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index dfc5a3d3ab..dac71c9166 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -194,7 +194,7 @@ public class ItMetaStorageServiceTest {
                     clock
             );
 
-            this.clusterTime = new ClusterTimeImpl(new IgniteSpinBusyLock(), 
clock);
+            this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), 
new IgniteSpinBusyLock(), clock);
 
             this.mockStorage = mock(KeyValueStorage.class);
         }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index eb7701110f..c0e2d93e9e 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -53,10 +53,12 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.dsl.Conditions;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.raft.Loza;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -67,6 +69,7 @@ import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -83,6 +86,9 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
     @InjectConfiguration
     private static NodeAttributesConfiguration nodeAttributes;
 
+    @InjectConfiguration
+    private static MetaStorageConfiguration metaStorageConfiguration;
+
     private static class Node {
         private final List<IgniteComponent> components = new ArrayList<>();
 
@@ -104,11 +110,15 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
             Path basePath = dataPath.resolve(name());
 
             HybridClock clock = new HybridClockImpl();
+
+            var raftGroupEventsClientListener = new 
RaftGroupEventsClientListener();
+
             var raftManager = new Loza(
                     clusterService,
                     raftConfiguration,
                     basePath.resolve("raft"),
-                    clock
+                    clock,
+                    raftGroupEventsClientListener
             );
 
             components.add(raftManager);
@@ -131,14 +141,25 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
 
             components.add(cmgManager);
 
+            var logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+            var topologyAwareRaftGroupServiceFactory = new 
TopologyAwareRaftGroupServiceFactory(
+                    clusterService,
+                    logicalTopologyService,
+                    Loza.FACTORY,
+                    raftGroupEventsClientListener
+            );
+
             this.metaStorageManager = new MetaStorageManagerImpl(
                     vaultManager,
                     clusterService,
                     cmgManager,
-                    new LogicalTopologyServiceImpl(logicalTopology, 
cmgManager),
+                    logicalTopologyService,
                     raftManager,
                     new RocksDbKeyValueStorage(name(), 
basePath.resolve("storage")),
-                    clock
+                    clock,
+                    topologyAwareRaftGroupServiceFactory,
+                    metaStorageConfiguration
             );
 
             components.add(metaStorageManager);
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
index 85059bbba0..4032f0f8f2 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
@@ -31,6 +31,9 @@ public interface SyncTimeCommand extends WriteCommand {
     /** New safe time. */
     long safeTimeLong();
 
+    /** Term of the initiator. */
+    long initiatorTerm();
+
     /** New safe time. */
     default HybridTimestamp safeTime() {
         return hybridTimestamp(safeTimeLong());
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
similarity index 62%
rename from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java
rename to 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
index 36d571bf5b..518cf51338 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -31,20 +33,22 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.raft.LeaderElectionListener;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
-import org.apache.ignite.internal.raft.RaftGroupEventsListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Raft Group Events listener that registers Logical Topology listener for 
updating the list of Meta Storage Raft group listeners.
+ * Meta Storage leader election listener.
  */
-public class MetaStorageRaftGroupEventsListener implements 
RaftGroupEventsListener {
-    private static final IgniteLogger LOG = 
Loggers.forClass(MetaStorageManagerImpl.class);
+public class MetaStorageLeaderElectionListener implements 
LeaderElectionListener {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MetaStorageLeaderElectionListener.class);
 
     private final IgniteSpinBusyLock busyLock;
 
@@ -62,87 +66,112 @@ public class MetaStorageRaftGroupEventsListener implements 
RaftGroupEventsListen
      *
      * <p>Multi-threaded access is guarded by {@code serializationFutureMux}.
      */
+    @Nullable
     private CompletableFuture<Void> serializationFuture = null;
 
     private final Object serializationFutureMux = new Object();
 
     private final ClusterTimeImpl clusterTime;
 
-    MetaStorageRaftGroupEventsListener(
+    private final LogicalTopologyEventListener logicalTopologyEventListener = 
new MetaStorageLogicalTopologyEventListener();
+
+    private final CompletableFuture<MetaStorageConfiguration> 
metaStorageConfigurationFuture;
+
+    /**
+     * Leader term if this node is a leader, {@code null} otherwise.
+     *
+     * <p>Multi-threaded access is guarded by {@code serializationFutureMux}.
+     */
+    @Nullable
+    private Long thisNodeTerm = null;
+
+    MetaStorageLeaderElectionListener(
             IgniteSpinBusyLock busyLock,
             ClusterService clusterService,
             LogicalTopologyService logicalTopologyService,
             CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut,
-            ClusterTimeImpl clusterTime
+            ClusterTimeImpl clusterTime,
+            CompletableFuture<MetaStorageConfiguration> 
metaStorageConfigurationFuture
     ) {
         this.busyLock = busyLock;
         this.nodeName = clusterService.nodeName();
         this.logicalTopologyService = logicalTopologyService;
         this.metaStorageSvcFut = metaStorageSvcFut;
         this.clusterTime = clusterTime;
+        this.metaStorageConfigurationFuture = metaStorageConfigurationFuture;
     }
 
     @Override
-    public void onLeaderElected(long term) {
+    public void onLeaderElected(ClusterNode node, long term) {
         synchronized (serializationFutureMux) {
-            registerTopologyEventListeners();
+            if (node.name().equals(nodeName) && serializationFuture == null) {
+                LOG.info("Node has been elected as the leader, starting Idle 
Safe Time scheduler");
 
-            // Update learner configuration (in case we missed some topology 
updates) and initialize the serialization future.
-            serializationFuture = executeWithStatus((service, term1, isLeader) 
-> {
-                CompletableFuture<Void> fut;
-                if (isLeader) {
-                    fut = this.resetLearners(service, term1);
+                thisNodeTerm = term;
 
-                    clusterTime.startLeaderTimer(service);
-                } else {
-                    fut = completedFuture(null);
+                
logicalTopologyService.addEventListener(logicalTopologyEventListener);
 
-                    clusterTime.stopLeaderTimer();
-                }
+                metaStorageSvcFut
+                        .thenAcceptBoth(metaStorageConfigurationFuture, 
(service, metaStorageConfiguration) ->
+                                clusterTime.startSafeTimeScheduler(
+                                        safeTime -> service.syncTime(safeTime, 
term),
+                                        metaStorageConfiguration
+                                ))
+                        .whenComplete((v, e) -> {
+                            if (e != null) {
+                                LOG.error("Unable to start Idle Safe Time 
scheduler", e);
+                            }
+                        });
 
-                return fut;
-            });
-        }
-    }
+                // Update learner configuration (in case we missed some 
topology updates between elections).
+                serializationFuture = metaStorageSvcFut.thenCompose(service -> 
resetLearners(service.raftGroupService(), term));
+            } else if (serializationFuture != null) {
+                LOG.info("Node has lost the leadership, stopping Idle Safe 
Time scheduler");
 
-    private void registerTopologyEventListeners() {
-        logicalTopologyService.addEventListener(new 
LogicalTopologyEventListener() {
-            @Override
-            public void onNodeValidated(LogicalNode validatedNode) {
-                executeIfLeader((service, term) -> 
addLearner(service.raftGroupService(), validatedNode));
-            }
+                thisNodeTerm = null;
 
-            @Override
-            public void onNodeInvalidated(LogicalNode invalidatedNode) {
-                executeIfLeader((service, term) -> 
removeLearner(service.raftGroupService(), invalidatedNode));
-            }
+                
logicalTopologyService.removeEventListener(logicalTopologyEventListener);
 
-            @Override
-            public void onNodeLeft(LogicalNode leftNode, 
LogicalTopologySnapshot newTopology) {
-                onNodeInvalidated(leftNode);
-            }
+                clusterTime.stopSafeTimeScheduler();
+
+                serializationFuture.cancel(false);
 
-            @Override
-            public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
-                
executeIfLeader(MetaStorageRaftGroupEventsListener.this::resetLearners);
+                serializationFuture = null;
             }
-        });
+        }
     }
 
-    @FunctionalInterface
-    private interface OnLeaderAction {
-        CompletableFuture<Void> apply(MetaStorageServiceImpl service, long 
term);
+    private class MetaStorageLogicalTopologyEventListener implements 
LogicalTopologyEventListener {
+        @Override
+        public void onNodeValidated(LogicalNode validatedNode) {
+            execute((raftService, term) -> addLearner(raftService, 
validatedNode));
+        }
+
+        @Override
+        public void onNodeInvalidated(LogicalNode invalidatedNode) {
+            execute((raftService, term) -> removeLearner(raftService, 
invalidatedNode));
+        }
+
+        @Override
+        public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot 
newTopology) {
+            onNodeInvalidated(leftNode);
+        }
+
+        @Override
+        public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
+            execute(MetaStorageLeaderElectionListener.this::resetLearners);
+        }
     }
 
     @FunctionalInterface
-    private interface OnStatusAction {
-        CompletableFuture<Void> apply(MetaStorageServiceImpl service, long 
term, boolean isLeader);
+    private interface Action {
+        CompletableFuture<Void> apply(RaftGroupService raftService, long term);
     }
 
     /**
      * Executes the given action if the current node is the Meta Storage 
leader.
      */
-    private void executeIfLeader(OnLeaderAction action) {
+    private void execute(Action action) {
         if (!busyLock.enterBusy()) {
             LOG.info("Skipping Meta Storage configuration update because the 
node is stopping");
 
@@ -156,9 +185,14 @@ public class MetaStorageRaftGroupEventsListener implements 
RaftGroupEventsListen
                     return;
                 }
 
+                // Term and serialization future are initialized together.
+                assert thisNodeTerm != null;
+
+                long term = thisNodeTerm;
+
                 serializationFuture = serializationFuture
                         // we don't care about exceptions here, they should be 
logged independently
-                        .handle((v, e) -> executeIfLeaderImpl(action))
+                        .handle((v, e) -> 
metaStorageSvcFut.thenCompose(service -> 
action.apply(service.raftGroupService(), term)))
                         .thenCompose(Function.identity());
             }
         } finally {
@@ -166,20 +200,6 @@ public class MetaStorageRaftGroupEventsListener implements 
RaftGroupEventsListen
         }
     }
 
-    private CompletableFuture<Void> executeIfLeaderImpl(OnLeaderAction action) 
{
-        return executeWithStatus((service, term, isLeader) -> isLeader ? 
action.apply(service, term) : completedFuture(null));
-    }
-
-    private CompletableFuture<Void> executeWithStatus(OnStatusAction action) {
-        return metaStorageSvcFut.thenCompose(service -> 
service.raftGroupService().refreshAndGetLeaderWithTerm()
-                .thenCompose(leaderWithTerm -> {
-                    String leaderName = leaderWithTerm.leader().consistentId();
-
-                    boolean isLeader = leaderName.equals(nodeName);
-                    return action.apply(service, leaderWithTerm.term(), 
isLeader);
-                }));
-    }
-
     private CompletableFuture<Void> addLearner(RaftGroupService raftService, 
ClusterNode learner) {
         return updateConfigUnderLock(() -> isPeer(raftService, learner)
                 ? completedFuture(null)
@@ -207,11 +227,9 @@ public class MetaStorageRaftGroupEventsListener implements 
RaftGroupEventsListen
                 })));
     }
 
-    private CompletableFuture<Void> resetLearners(MetaStorageServiceImpl 
service, long term) {
+    private CompletableFuture<Void> resetLearners(RaftGroupService 
raftService, long term) {
         return updateConfigUnderLock(() -> 
logicalTopologyService.validatedNodesOnLeader()
                 .thenCompose(validatedNodes -> updateConfigUnderLock(() -> {
-                    RaftGroupService raftService = service.raftGroupService();
-
                     Set<String> peers = 
raftService.peers().stream().map(Peer::consistentId).collect(toSet());
 
                     Set<String> learners = validatedNodes.stream()
@@ -236,7 +254,7 @@ public class MetaStorageRaftGroupEventsListener implements 
RaftGroupEventsListen
         try {
             return action.get()
                     .whenComplete((v, e) -> {
-                        if (e != null) {
+                        if (e != null && !(unwrapCause(e) instanceof 
CancellationException)) {
                             LOG.error("Unable to change peers on topology 
update", e);
                         }
                     });
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index fbac3f4cf5..5c9c72d8df 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -46,6 +46,7 @@ import 
org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -62,6 +63,8 @@ import 
org.apache.ignite.internal.raft.RaftGroupEventsListener;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration;
 import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -117,15 +120,24 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     private final AtomicBoolean isStopped = new AtomicBoolean();
 
     /**
-     * Future which completes when MetaStorage manager finished local recovery.
-     * The value of the future is the revision which must be used for state 
recovery by other components.
+     * Future which completes when MetaStorage manager finished local 
recovery. The value of the future is the revision which must be used
+     * for state recovery by other components.
      */
     private final CompletableFuture<Long> recoveryFinishedFuture = new 
CompletableFuture<>();
 
+    /**
+     * Future that gets completed after {@link #deployWatches} method has been 
called.
+     */
+    private final CompletableFuture<Void> deployWatchesFuture = new 
CompletableFuture<>();
+
     private final ClusterTimeImpl clusterTime;
 
+    private final TopologyAwareRaftGroupServiceFactory 
topologyAwareRaftGroupServiceFactory;
+
     private volatile long appliedRevision;
 
+    private volatile MetaStorageConfiguration metaStorageConfiguration;
+
     /**
      * The constructor.
      *
@@ -144,7 +156,8 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
             LogicalTopologyService logicalTopologyService,
             RaftManager raftMgr,
             KeyValueStorage storage,
-            HybridClock clock
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory 
topologyAwareRaftGroupServiceFactory
     ) {
         this.vaultMgr = vaultMgr;
         this.clusterService = clusterService;
@@ -152,7 +165,28 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         this.cmgMgr = cmgMgr;
         this.logicalTopologyService = logicalTopologyService;
         this.storage = storage;
-        this.clusterTime = new ClusterTimeImpl(busyLock, clock);
+        this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), 
busyLock, clock);
+        this.topologyAwareRaftGroupServiceFactory = 
topologyAwareRaftGroupServiceFactory;
+    }
+
+    /**
+     * Constructor for tests, that allows to pass Meta Storage configuration.
+     */
+    @TestOnly
+    public MetaStorageManagerImpl(
+            VaultManager vaultMgr,
+            ClusterService clusterService,
+            ClusterManagementGroupManager cmgMgr,
+            LogicalTopologyService logicalTopologyService,
+            RaftManager raftMgr,
+            KeyValueStorage storage,
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory 
topologyAwareRaftGroupServiceFactory,
+            MetaStorageConfiguration configuration
+    ) {
+        this(vaultMgr, clusterService, cmgMgr, logicalTopologyService, 
raftMgr, storage, clock, topologyAwareRaftGroupServiceFactory);
+
+        configure(configuration);
     }
 
     private CompletableFuture<Long> recover(MetaStorageServiceImpl service) {
@@ -225,54 +259,97 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     }
 
     private CompletableFuture<MetaStorageServiceImpl> 
initializeMetaStorage(Set<String> metaStorageNodes) {
-        String thisNodeName = clusterService.nodeName();
-
-        CompletableFuture<RaftGroupService> raftServiceFuture;
-
         try {
-            var ownFsmCallerExecutorDisruptorConfig = new 
RaftNodeDisruptorConfiguration("metastorage", 1);
-
-            // We need to configure the replication protocol differently 
whether this node is a synchronous or asynchronous replica.
-            if (metaStorageNodes.contains(thisNodeName)) {
-                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
-
-                Peer localPeer = configuration.peer(thisNodeName);
-
-                assert localPeer != null;
-
-                raftServiceFuture = 
raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
-                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
-                        configuration,
-                        new MetaStorageListener(storage, clusterTime),
-                        new MetaStorageRaftGroupEventsListener(
-                                busyLock,
-                                clusterService,
-                                logicalTopologyService,
-                                metaStorageSvcFut,
-                                clusterTime
-                        ),
-                        ownFsmCallerExecutorDisruptorConfig
-                );
-            } else {
-                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+            String thisNodeName = clusterService.nodeName();
 
-                Peer localPeer = configuration.learner(thisNodeName);
+            var disruptorConfig = new 
RaftNodeDisruptorConfiguration("metastorage", 1);
 
-                assert localPeer != null;
+            CompletableFuture<? extends RaftGroupService> raftServiceFuture = 
metaStorageNodes.contains(thisNodeName)
+                    ? startFollowerNode(metaStorageNodes, disruptorConfig)
+                    : startLearnerNode(metaStorageNodes, disruptorConfig);
 
-                raftServiceFuture = 
raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
-                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
-                        configuration,
-                        new MetaStorageListener(storage, clusterTime),
-                        RaftGroupEventsListener.noopLsnr,
-                        ownFsmCallerExecutorDisruptorConfig
-                );
-            }
+            return raftServiceFuture.thenApply(raftService -> new 
MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime));
         } catch (NodeStoppingException e) {
             return CompletableFuture.failedFuture(e);
         }
+    }
+
+    private CompletableFuture<? extends RaftGroupService> startFollowerNode(
+            Set<String> metaStorageNodes,
+            RaftNodeDisruptorConfiguration disruptorConfig
+    ) throws NodeStoppingException {
+        String thisNodeName = clusterService.nodeName();
+
+        PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
+
+        Peer localPeer = configuration.peer(thisNodeName);
+
+        assert localPeer != null;
+
+        MetaStorageConfiguration localMetaStorageConfiguration = 
metaStorageConfiguration;
+
+        assert localMetaStorageConfiguration != null : "Meta Storage 
configuration has not been set";
+
+        CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture = 
raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
+                new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                configuration,
+                new MetaStorageListener(storage, clusterTime),
+                RaftGroupEventsListener.noopLsnr,
+                disruptorConfig,
+                topologyAwareRaftGroupServiceFactory
+        );
+
+        raftServiceFuture
+                .thenAccept(service -> service.subscribeLeader(new 
MetaStorageLeaderElectionListener(
+                        busyLock,
+                        clusterService,
+                        logicalTopologyService,
+                        metaStorageSvcFut,
+                        clusterTime,
+                        // We use the "deployWatchesFuture" to guarantee that 
the Configuration Manager will be started
+                        // when the underlying code tries to read Meta Storage 
configuration. This is a consequence of having a circular
+                        // dependency between these two components.
+                        deployWatchesFuture.thenApply(v -> 
localMetaStorageConfiguration)
+                )))
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error("Unable to register 
MetaStorageLeaderElectionListener", e);
+                    }
+                });
+
+        return raftServiceFuture;
+    }
+
+    private CompletableFuture<? extends RaftGroupService> startLearnerNode(
+            Set<String> metaStorageNodes, RaftNodeDisruptorConfiguration 
disruptorConfig
+    ) throws NodeStoppingException {
+        String thisNodeName = clusterService.nodeName();
+
+        PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
 
-        return raftServiceFuture.thenApply(raftService -> new 
MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime));
+        Peer localPeer = configuration.learner(thisNodeName);
+
+        assert localPeer != null;
+
+        return raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
+                new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                configuration,
+                new MetaStorageListener(storage, clusterTime),
+                RaftGroupEventsListener.noopLsnr,
+                disruptorConfig
+        );
+    }
+
+    /**
+     * Sets the Meta Storage configuration.
+     *
+     * <p>This method is needed to avoid the cyclic dependency between the 
Meta Storage and distributed configuration (built on top of the
+     * Meta Storage).
+     *
+     * <p>This method <b>must</b> always be called <b>before</b> calling 
{@link #start}.
+     */
+    public final void configure(MetaStorageConfiguration 
metaStorageConfiguration) {
+        this.metaStorageConfiguration = metaStorageConfiguration;
     }
 
     @Override
@@ -297,6 +374,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
                 .whenComplete((service, e) -> {
                     if (e != null) {
                         metaStorageSvcFut.completeExceptionally(e);
+                        recoveryFinishedFuture.completeExceptionally(e);
                     } else {
                         assert service != null;
 
@@ -323,13 +401,15 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
         busyLock.block();
 
-        clusterTime.stopLeaderTimer();
+        deployWatchesFuture.cancel(true);
 
-        cancelOrConsume(metaStorageSvcFut, MetaStorageServiceImpl::close);
+        recoveryFinishedFuture.cancel(true);
 
-        IgniteUtils.closeAll(
+        IgniteUtils.closeAllManually(
+                clusterTime,
+                () -> cancelOrConsume(metaStorageSvcFut, 
MetaStorageServiceImpl::close),
                 () -> raftMgr.stopRaftNodes(MetastorageGroupId.INSTANCE),
-                storage::close
+                storage
         );
     }
 
@@ -365,10 +445,18 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         }
 
         try {
-            return recoveryFinishedFuture.thenAccept(revision -> 
inBusyLock(busyLock, () -> {
-                // Meta Storage contract states that all updated entries under 
a particular revision must be stored in the Vault.
-                storage.startWatches(revision + 1, this::onRevisionApplied);
-            }));
+            return recoveryFinishedFuture
+                    .thenAccept(revision -> inBusyLock(busyLock, () -> {
+                        // Meta Storage contract states that all updated 
entries under a particular revision must be stored in the Vault.
+                        storage.startWatches(revision + 1, 
this::onRevisionApplied);
+                    }))
+                    .whenComplete((v, e) -> {
+                        if (e == null) {
+                            deployWatchesFuture.complete(null);
+                        } else {
+                            deployWatchesFuture.completeExceptionally(e);
+                        }
+                    });
         } finally {
             busyLock.leaveBusy();
         }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index 6d0c535bc2..4f32decdff 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -277,10 +277,10 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
      * @param safeTime New safe time.
      * @return Future that will be completed when message is sent.
      */
-    public CompletableFuture<Void> syncTime(HybridTimestamp safeTime) {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-19199 Only 
propagate safe time when ms is idle
+    public CompletableFuture<Void> syncTime(HybridTimestamp safeTime, long 
term) {
         SyncTimeCommand syncTimeCommand = 
context.commandsFactory().syncTimeCommand()
                 .safeTimeLong(safeTime.longValue())
+                .initiatorTerm(term)
                 .build();
 
         return context.raftService().run(syncTimeCommand);
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index d84c675aa4..8f836a1963 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -80,17 +80,21 @@ public class MetaStorageWriteHandler {
         WriteCommand command = clo.command();
 
         try {
-            HybridTimestamp safeTime;
-
             if (command instanceof MetaStorageWriteCommand) {
-                MetaStorageWriteCommand cmdWithTime = 
(MetaStorageWriteCommand) command;
+                var cmdWithTime = (MetaStorageWriteCommand) command;
 
-                safeTime = cmdWithTime.safeTime();
+                HybridTimestamp safeTime = cmdWithTime.safeTime();
 
                 handleWriteWithTime(clo, cmdWithTime, safeTime);
             } else if (command instanceof SyncTimeCommand) {
-                // TODO: IGNITE-19199 WatchProcessor must be notified of the 
new safe time.
-                throw new 
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-19199";);
+                var syncTimeCommand = (SyncTimeCommand) command;
+
+                // Ignore the command if it has been sent by a stale leader.
+                if (clo.term() == syncTimeCommand.initiatorTerm()) {
+                    clusterTime.updateSafeTime(syncTimeCommand.safeTime());
+                }
+
+                clo.result(null);
             } else {
                 assert false : "Command was not found [cmd=" + command + ']';
             }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
index b3aa7253b9..f1c953ca8a 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
@@ -17,26 +17,57 @@
 
 package org.apache.ignite.internal.metastorage.server.time;
 
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
  * Cluster time implementation with additional methods to adjust time and 
update safe time.
  */
-public class ClusterTimeImpl implements ClusterTime {
-    private final IgniteSpinBusyLock busyLock;
+public class ClusterTimeImpl implements ClusterTime, ManuallyCloseable {
+    private static final IgniteLogger LOG = 
Loggers.forClass(ClusterTimeImpl.class);
 
-    private volatile @Nullable LeaderTimer leaderTimer;
+    private final String nodeName;
+
+    private final IgniteSpinBusyLock busyLock;
 
     private final HybridClock clock;
 
-    private final PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTime;
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTime =
+            new PendingComparableValuesTracker<>(HybridTimestamp.MIN_VALUE);
+
+    /**
+     * Scheduler for sending safe time periodically when Meta Storage is idle.
+     *
+     * <p>Scheduler is only created if this node has been elected as the Meta 
Storage leader.
+     *
+     * <p>Concurrent access is guarded by {@code this}.
+     */
+    private @Nullable SafeTimeScheduler safeTimeScheduler;
+
+    /** Action that issues a time sync command. */
+    @FunctionalInterface
+    public interface SyncTimeAction {
+        CompletableFuture<Void> syncTime(HybridTimestamp time);
+    }
 
     /**
      * Constructor.
@@ -44,30 +75,30 @@ public class ClusterTimeImpl implements ClusterTime {
      * @param busyLock Busy lock.
      * @param clock Node's hybrid clock.
      */
-    public ClusterTimeImpl(IgniteSpinBusyLock busyLock, HybridClock clock) {
+    public ClusterTimeImpl(String nodeName, IgniteSpinBusyLock busyLock, 
HybridClock clock) {
+        this.nodeName = nodeName;
         this.busyLock = busyLock;
         this.clock = clock;
-        this.safeTime = new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0));
     }
 
     /**
      * Starts sync time scheduler.
      *
-     * @param service MetaStorage service that is used by scheduler to sync 
time.
+     * @param syncTimeAction Action that performs the time sync operation.
      */
-    public void startLeaderTimer(MetaStorageServiceImpl service) {
+    public void startSafeTimeScheduler(SyncTimeAction syncTimeAction, 
MetaStorageConfiguration configuration) {
         if (!busyLock.enterBusy()) {
             return;
         }
 
         try {
-            assert leaderTimer == null;
+            synchronized (this) {
+                assert safeTimeScheduler == null;
 
-            LeaderTimer newTimer = new LeaderTimer(service);
+                safeTimeScheduler = new SafeTimeScheduler(syncTimeAction, 
configuration);
 
-            leaderTimer = newTimer;
-
-            newTimer.start();
+                safeTimeScheduler.start();
+            }
         } finally {
             busyLock.leaveBusy();
         }
@@ -76,22 +107,19 @@ public class ClusterTimeImpl implements ClusterTime {
     /**
      * Stops sync time scheduler if it exists.
      */
-    public void stopLeaderTimer() {
-        if (!busyLock.enterBusy()) {
-            return;
-        }
+    public synchronized void stopSafeTimeScheduler() {
+        if (safeTimeScheduler != null) {
+            safeTimeScheduler.stop();
 
-        try {
-            LeaderTimer timer = leaderTimer;
+            safeTimeScheduler = null;
+        }
+    }
 
-            if (timer != null) {
-                timer.stop();
+    @Override
+    public void close() throws Exception {
+        stopSafeTimeScheduler();
 
-                leaderTimer = null;
-            }
-        } finally {
-            busyLock.leaveBusy();
-        }
+        safeTime.close();
     }
 
     @Override
@@ -119,48 +147,85 @@ public class ClusterTimeImpl implements ClusterTime {
     }
 
     /**
-     * Updates hybrid logical clock using {@code ts}. Selects the maximum 
between current system time,
-     * hybrid clock's latest time and {@code ts} adding 1 logical tick to the 
result.
+     * Updates hybrid logical clock using {@code ts}. Selects the maximum 
between current system time, hybrid clock's latest time and
+     * {@code ts} adding 1 logical tick to the result.
      *
      * @param ts Timestamp.
      */
-    public void adjust(HybridTimestamp ts) {
+    public synchronized void adjust(HybridTimestamp ts) {
         this.clock.update(ts);
+
+        // Since this method is called when a write command is being processed 
and safe time is also updated by write commands,
+        // we need to re-schedule the idle time scheduler.
+        if (safeTimeScheduler != null) {
+            safeTimeScheduler.schedule();
+        }
     }
 
-    private class LeaderTimer {
+    private class SafeTimeScheduler {
+        private final SyncTimeAction syncTimeAction;
 
-        private final MetaStorageServiceImpl service;
+        private final MetaStorageConfiguration configuration;
 
-        private LeaderTimer(MetaStorageServiceImpl service) {
-            this.service = service;
+        private final ScheduledExecutorService executorService =
+                
Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.create(nodeName, 
"meta-storage-safe-time", LOG));
+
+        /**
+         * Current scheduled task.
+         *
+         * <p>Concurrent access is guarded by {@code this}.
+         */
+        @Nullable
+        private ScheduledFuture<?> currentTask;
+
+        SafeTimeScheduler(SyncTimeAction syncTimeAction, 
MetaStorageConfiguration configuration) {
+            this.syncTimeAction = syncTimeAction;
+            this.configuration = configuration;
         }
 
         void start() {
             schedule();
         }
 
-        private void schedule() {
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-19199 Only 
propagate safe time when ms is idle
-        }
-
-        void disseminateTime() {
-            if (!busyLock.enterBusy()) {
-                // Shutting down.
-                return;
+        synchronized void schedule() {
+            // Cancel the previous task if we were re-scheduled because Meta 
Storage was not actually idle.
+            if (currentTask != null) {
+                currentTask.cancel(false);
             }
 
-            try {
-                HybridTimestamp now = clock.now();
-
-                service.syncTime(now);
-            } finally {
-                busyLock.leaveBusy();
-            }
+            currentTask = executorService.schedule(() -> {
+                if (!busyLock.enterBusy()) {
+                    return;
+                }
+
+                try {
+                    syncTimeAction.syncTime(clock.now())
+                            .whenComplete((v, e) -> {
+                                if (e != null) {
+                                    Throwable cause = unwrapCause(e);
+
+                                    if (!(cause instanceof 
CancellationException) && !(cause instanceof NodeStoppingException)) {
+                                        LOG.error("Unable to perform idle time 
sync", e);
+                                    }
+                                }
+                            });
+
+                    // Re-schedule the task again.
+                    schedule();
+                } finally {
+                    busyLock.leaveBusy();
+                }
+            }, configuration.idleSyncTimeInterval().value(), 
TimeUnit.MILLISECONDS);
         }
 
         void stop() {
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-19199 Stop 
safe time propagation
+            synchronized (this) {
+                if (currentTask != null) {
+                    currentTask.cancel(false);
+                }
+            }
+
+            IgniteUtils.shutdownAndAwaitTermination(executorService, 10, 
TimeUnit.SECONDS);
         }
     }
 
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
index a3aeb4e824..570295b0fa 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
@@ -29,29 +29,38 @@ import java.util.Set;
 import java.util.stream.Stream;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.raft.RaftManager;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
 import org.apache.ignite.network.ClusterService;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 /**
  * Tests that check correctness of an invocation {@link 
MetaStorageManager#deployWatches()}.
  */
+@ExtendWith(ConfigurationExtension.class)
 public class MetaStorageDeployWatchesCorrectnessTest extends 
IgniteAbstractTest {
     /** Vault manager. */
     private static VaultManager vaultManager;
 
+    @InjectConfiguration
+    private static MetaStorageConfiguration metaStorageConfiguration;
+
     @BeforeAll
     public static void init() {
         vaultManager = new VaultManager(new InMemoryVaultService());
@@ -78,13 +87,12 @@ public class MetaStorageDeployWatchesCorrectnessTest 
extends IgniteAbstractTest
 
         ClusterManagementGroupManager cmgManager = 
mock(ClusterManagementGroupManager.class);
         ClusterService clusterService = mock(ClusterService.class);
-        LogicalTopologyService logicalTopologyService = 
mock(LogicalTopologyService.class);
         RaftManager raftManager = mock(RaftManager.class);
-        RaftGroupService raftGroupService = mock(RaftGroupService.class);
+        TopologyAwareRaftGroupService raftGroupService = 
mock(TopologyAwareRaftGroupService.class);
 
         
when(cmgManager.metaStorageNodes()).thenReturn(completedFuture(Set.of(mcNodeName)));
         when(clusterService.nodeName()).thenReturn(mcNodeName);
-        when(raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(any(), 
any(), any(), any(), any()))
+        when(raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(any(), 
any(), any(), any(), any(), any()))
                 .thenReturn(completedFuture(raftGroupService));
         
when(raftGroupService.run(any(GetCurrentRevisionCommand.class))).thenAnswer(invocation
 -> completedFuture(0L));
 
@@ -93,10 +101,12 @@ public class MetaStorageDeployWatchesCorrectnessTest 
extends IgniteAbstractTest
                         vaultManager,
                         clusterService,
                         cmgManager,
-                        logicalTopologyService,
+                        mock(LogicalTopologyService.class),
                         raftManager,
                         new SimpleInMemoryKeyValueStorage(mcNodeName),
-                        clock
+                        clock,
+                        mock(TopologyAwareRaftGroupServiceFactory.class),
+                        metaStorageConfiguration
                 ),
                 StandaloneMetaStorageManager.create(vaultManager)
         );
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
index 7993242b54..e9155fbb84 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
@@ -30,12 +31,16 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.raft.RaftManager;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
@@ -45,13 +50,18 @@ import org.apache.ignite.network.NodeMetadata;
 import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /** Tests MetaStorage manager recovery basics. */
+@ExtendWith(ConfigurationExtension.class)
 public class MetaStorageManagerRecoveryTest {
     private static final String NODE_NAME = "node";
 
     private static final String LEADER_NAME = "ms-leader";
 
+    @InjectConfiguration
+    private static MetaStorageConfiguration metaStorageConfiguration;
+
     private MetaStorageManagerImpl metaStorageManager;
 
     private KeyValueStorage kvs;
@@ -75,7 +85,9 @@ public class MetaStorageManagerRecoveryTest {
                 topologyService,
                 raftManager,
                 kvs,
-                clock
+                clock,
+                mock(TopologyAwareRaftGroupServiceFactory.class),
+                metaStorageConfiguration
         );
     }
 
@@ -84,12 +96,11 @@ public class MetaStorageManagerRecoveryTest {
 
         RaftGroupService service = mock(RaftGroupService.class);
 
-        
when(service.run(any(GetCurrentRevisionCommand.class))).thenAnswer(invocation 
-> {
-            return CompletableFuture.completedFuture(remoteRevision);
-        });
+        when(service.run(any(GetCurrentRevisionCommand.class)))
+                .thenAnswer(invocation -> completedFuture(remoteRevision));
 
         when(raft.startRaftGroupNodeAndWaitNodeReadyFuture(any(), any(), 
any(), any(), any()))
-                .thenAnswer(invocation -> 
CompletableFuture.completedFuture(service));
+                .thenAnswer(invocation -> completedFuture(service));
 
         return raft;
     }
@@ -135,7 +146,7 @@ public class MetaStorageManagerRecoveryTest {
         ClusterManagementGroupManager mock = 
mock(ClusterManagementGroupManager.class);
 
         when(mock.metaStorageNodes())
-                .thenAnswer(invocation -> 
CompletableFuture.completedFuture(Set.of(LEADER_NAME)));
+                .thenAnswer(invocation -> 
completedFuture(Set.of(LEADER_NAME)));
 
         return mock;
     }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeTest.java
new file mode 100644
index 0000000000..cc3c13cb52
--- /dev/null
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertTimeout;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
+import 
org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl.SyncTimeAction;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for {@link ClusterTimeImpl}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class ClusterTimeTest {
+    private final ClusterTimeImpl clusterTime = new ClusterTimeImpl("foo", new 
IgniteSpinBusyLock(), new HybridClockImpl());
+
+    @AfterEach
+    void tearDown() {
+        // Stop the time and verify that all internal scheduled tasks do not 
impede the stop process.
+        assertTimeout(Duration.ofSeconds(1), clusterTime::close);
+    }
+
+    @Test
+    void testWaitFor() {
+        HybridTimestamp now = clusterTime.now();
+
+        CompletableFuture<Void> future = clusterTime.waitFor(now);
+
+        clusterTime.updateSafeTime(now);
+
+        assertThat(future, willCompleteSuccessfully());
+    }
+
+    @Test
+    void testWaitForCancellation() throws Exception {
+        HybridTimestamp now = clusterTime.now();
+
+        CompletableFuture<Void> future = clusterTime.waitFor(now);
+
+        clusterTime.close();
+
+        assertThat(future, willThrow(TrackerClosedException.class));
+    }
+
+    @Test
+    void 
testIdleSafeTimeScheduler(@InjectConfiguration("mock.idleSyncTimeInterval=1") 
MetaStorageConfiguration config) {
+        SyncTimeAction action = mock(SyncTimeAction.class);
+
+        when(action.syncTime(any())).thenReturn(completedFuture(null));
+
+        clusterTime.startSafeTimeScheduler(action, config);
+
+        verify(action, timeout(100).atLeast(3)).syncTime(any());
+    }
+
+    @Test
+    void 
testIdleSafeTimeSchedulerStop(@InjectConfiguration("mock.idleSyncTimeInterval=1")
 MetaStorageConfiguration config) {
+        SyncTimeAction action = mock(SyncTimeAction.class);
+
+        when(action.syncTime(any())).thenReturn(completedFuture(null));
+
+        clusterTime.startSafeTimeScheduler(action, config);
+
+        verify(action, timeout(100).atLeast(1)).syncTime(any());
+
+        clusterTime.stopSafeTimeScheduler();
+
+        clearInvocations(action);
+
+        verify(action, after(100).never()).syncTime(any());
+    }
+
+    /**
+     * Tests that {@link ClusterTimeImpl#adjust} re-schedules the idle time 
sync timer.
+     */
+    @Test
+    void 
testSchedulerProlongation(@InjectConfiguration("mock.idleSyncTimeInterval=50") 
MetaStorageConfiguration config) {
+        assertDoesNotThrow(() -> clusterTime.adjust(clusterTime.now()));
+
+        SyncTimeAction action = mock(SyncTimeAction.class);
+
+        when(action.syncTime(any())).thenReturn(completedFuture(null));
+
+        clusterTime.startSafeTimeScheduler(action, config);
+
+        verify(action, after(30).never()).syncTime(any());
+
+        clusterTime.adjust(clusterTime.now());
+
+        verify(action, after(30).never()).syncTime(any());
+
+        verify(action, after(50)).syncTime(any());
+    }
+}
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 4f3770c00e..2801ff77cf 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -26,19 +26,21 @@ import static org.mockito.Mockito.when;
 import java.io.Serializable;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.ConfigurationValue;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.RaftManager;
-import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterService;
@@ -78,7 +80,9 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
                 mockClusterGroupManager(),
                 mock(LogicalTopologyService.class),
                 mockRaftManager(),
-                keyValueStorage
+                keyValueStorage,
+                mock(TopologyAwareRaftGroupServiceFactory.class),
+                mockConfiguration()
         );
     }
 
@@ -92,9 +96,27 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
      * @param raftMgr Raft manager.
      * @param storage Storage. This component owns this resource and will 
manage its lifecycle.
      */
-    private StandaloneMetaStorageManager(VaultManager vaultMgr, ClusterService 
clusterService, ClusterManagementGroupManager cmgMgr,
-            LogicalTopologyService logicalTopologyService, RaftManager 
raftMgr, KeyValueStorage storage) {
-        super(vaultMgr, clusterService, cmgMgr, logicalTopologyService, 
raftMgr, storage, new HybridClockImpl());
+    private StandaloneMetaStorageManager(
+            VaultManager vaultMgr,
+            ClusterService clusterService,
+            ClusterManagementGroupManager cmgMgr,
+            LogicalTopologyService logicalTopologyService,
+            RaftManager raftMgr,
+            KeyValueStorage storage,
+            TopologyAwareRaftGroupServiceFactory raftServiceFactory,
+            MetaStorageConfiguration configuration
+    ) {
+        super(
+                vaultMgr,
+                clusterService,
+                cmgMgr,
+                logicalTopologyService,
+                raftMgr,
+                storage,
+                new HybridClockImpl(),
+                raftServiceFactory,
+                configuration
+        );
     }
 
     private static ClusterService mockClusterService() {
@@ -115,13 +137,14 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
     private static RaftManager mockRaftManager() {
         ArgumentCaptor<RaftGroupListener> listenerCaptor = 
ArgumentCaptor.forClass(RaftGroupListener.class);
         RaftManager raftManager = mock(RaftManager.class);
-        RaftGroupService raftGroupService = mock(RaftGroupService.class);
+        TopologyAwareRaftGroupService raftGroupService = 
mock(TopologyAwareRaftGroupService.class);
 
         try {
             when(raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(
                     any(),
                     any(),
                     listenerCaptor.capture(),
+                    any(),
                     any()
             )).thenReturn(completedFuture(raftGroupService));
 
@@ -130,7 +153,8 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
                     any(),
                     listenerCaptor.capture(),
                     any(),
-                    any(RaftNodeDisruptorConfiguration.class)
+                    any(),
+                    any()
             )).thenReturn(completedFuture(raftGroupService));
         } catch (NodeStoppingException e) {
             throw new RuntimeException(e);
@@ -148,6 +172,16 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
         return raftManager;
     }
 
+    private static MetaStorageConfiguration mockConfiguration() {
+        MetaStorageConfiguration configuration = 
mock(MetaStorageConfiguration.class);
+        ConfigurationValue<Long> value = mock(ConfigurationValue.class);
+
+        when(configuration.idleSyncTimeInterval()).thenReturn(value);
+        when(value.value()).thenReturn(1000L);
+
+        return configuration;
+    }
+
     private static CompletableFuture<Serializable> runCommand(Command command, 
RaftGroupListener listener) {
         CompletableFuture<Serializable> future = new CompletableFuture<>();
 
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
index ce7a4a5b53..166889fad9 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
@@ -36,7 +36,7 @@ public abstract class AbstractKeyValueStorageTest {
     }
 
     @AfterEach
-    void tearDown() throws Exception {
+    public void tearDown() throws Exception {
         storage.close();
     }
 
@@ -52,4 +52,4 @@ public abstract class AbstractKeyValueStorageTest {
     protected static byte[] keyValue(int k, int v) {
         return ("key" + k + '_' + "val" + v).getBytes(UTF_8);
     }
-}
\ No newline at end of file
+}
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
index 472263fb94..6ef7b70be1 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
@@ -150,7 +150,7 @@ public class ActiveActorTest extends IgniteAbstractTest {
             Set<String> placementDriverNodesNames,
             RaftGroupEventsClientListener eventsClientListener
     ) {
-        var raftManager = new Loza(clusterService, raftConfiguration, 
dataPath, new HybridClockImpl());
+        var raftManager = new Loza(clusterService, raftConfiguration, 
dataPath, new HybridClockImpl(), eventsClientListener);
 
         LogicalTopologyService logicalTopologyService = new 
LogicalTopologyServiceTestImpl(clusterService);
 
@@ -524,7 +524,7 @@ public class ActiveActorTest extends IgniteAbstractTest {
             });
         }
 
-        return (TopologyAwareRaftGroupService) 
TopologyAwareRaftGroupService.start(
+        return TopologyAwareRaftGroupService.start(
                 GROUP_ID,
                 localClusterService,
                 FACTORY,
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index a5938a5163..3144068e70 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -52,6 +52,7 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
@@ -96,7 +97,7 @@ public class MultiActorPlacementDriverTest extends 
IgniteAbstractTest {
 
     private static final PlacementDriverMessagesFactory 
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
 
-    private HybridClock clock = new HybridClockImpl();
+    private final HybridClock clock = new HybridClockImpl();
 
     @InjectConfiguration
     private RaftConfiguration raftConfiguration;
@@ -107,6 +108,9 @@ public class MultiActorPlacementDriverTest extends 
IgniteAbstractTest {
     @InjectConfiguration
     private DistributionZonesConfiguration dstZnsCfg;
 
+    @InjectConfiguration
+    private MetaStorageConfiguration metaStorageConfiguration;
+
     private List<String> placementDriverNodeNames;
 
     private List<String> nodeNames;
@@ -119,8 +123,6 @@ public class MultiActorPlacementDriverTest extends 
IgniteAbstractTest {
     /** Cluster service by node name. */
     private Map<String, ClusterService> clusterServices;
 
-    private TestInfo testInfo;
-
     /** This closure handles {@link LeaseGrantedMessage} to check the 
placement driver manager behavior. */
     private IgniteTriFunction<LeaseGrantedMessage, String, String, 
LeaseGrantedMessageResponse> leaseGrantHandler;
 
@@ -133,8 +135,6 @@ public class MultiActorPlacementDriverTest extends 
IgniteAbstractTest {
         this.nodeNames = IntStream.range(BASE_PORT, BASE_PORT + 
5).mapToObj(port -> testNodeName(testInfo, port))
                 .collect(Collectors.toList());
 
-        this.testInfo = testInfo;
-
         this.clusterServices = startNodes();
 
         List<LogicalTopologyServiceTestImpl> logicalTopManagers = new 
ArrayList<>();
@@ -283,7 +283,9 @@ public class MultiActorPlacementDriverTest extends 
IgniteAbstractTest {
                     logicalTopologyService,
                     raftManager,
                     storage,
-                    nodeClock
+                    nodeClock,
+                    topologyAwareRaftGroupServiceFactory,
+                    metaStorageConfiguration
             );
 
             if (this.metaStorageManager == null) {
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index 592a2e67b9..e5c75c5fad 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -60,6 +60,7 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
@@ -128,6 +129,9 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
     @InjectConfiguration
     private DistributionZonesConfiguration dstZnsCfg;
 
+    @InjectConfiguration
+    private MetaStorageConfiguration metaStorageConfiguration;
+
     private MetaStorageManagerImpl metaStorageManager;
 
     private PlacementDriverManager placementDriverManager;
@@ -199,7 +203,9 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
                 logicalTopologyService,
                 raftManager,
                 storage,
-                nodeClock
+                nodeClock,
+                topologyAwareRaftGroupServiceFactory,
+                metaStorageConfiguration
         );
 
         placementDriverManager = new PlacementDriverManager(
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 60cc1e0e75..d164cb64e0 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -208,7 +208,7 @@ public class PlacementDriverManager implements 
IgniteComponent {
         });
     }
 
-    private void onLeaderChange(ClusterNode leader, Long term) {
+    private void onLeaderChange(ClusterNode leader, long term) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
diff --git a/modules/metastorage-api/build.gradle 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/LeaderElectionListener.java
similarity index 64%
copy from modules/metastorage-api/build.gradle
copy to 
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/LeaderElectionListener.java
index c5aefc2800..1dd1f95a96 100644
--- a/modules/metastorage-api/build.gradle
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/LeaderElectionListener.java
@@ -15,17 +15,19 @@
  * limitations under the License.
  */
 
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
+package org.apache.ignite.internal.raft;
 
-description = 'ignite-metastorage-api'
+import org.apache.ignite.network.ClusterNode;
 
-dependencies {
-    api project(':ignite-network-api')
-
-    implementation project(':ignite-core')
-    implementation libs.jetbrains.annotations
-
-    annotationProcessor project(':ignite-network-annotation-processor')
+/**
+ * Listener that gets called after a new Raft group leader has been elected.
+ */
+public interface LeaderElectionListener {
+    /**
+     * Callback that gets called after a new Raft group leader has been 
elected.
+     *
+     * @param leader New leader node.
+     * @param term New leader term.
+     */
+    void onLeaderElected(ClusterNode leader, long term);
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 9e4ea288cf..5e9fe64fb5 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -130,6 +130,7 @@ public class Loza implements RaftManager {
      * @param dataPath Data path.
      * @param clock A hybrid logical clock.
      */
+    @TestOnly
     public Loza(
             ClusterService clusterNetSvc,
             RaftConfiguration raftConfiguration,
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java
index 077421ba56..e3685b8085 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java
@@ -18,12 +18,12 @@
 package org.apache.ignite.raft.jraft.rpc.impl;
 
 import java.util.ArrayList;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.function.BiConsumer;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.LeaderElectionListener;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.network.ClusterNode;
 
@@ -33,7 +33,7 @@ import org.apache.ignite.network.ClusterNode;
 public class RaftGroupEventsClientListener {
     private static final IgniteLogger LOG = 
Loggers.forClass(RaftGroupEventsClientListener.class);
 
-    private final Map<ReplicationGroupId, List<BiConsumer<ClusterNode, Long>>> 
leaderElectionListeners = new ConcurrentHashMap<>();
+    private final Map<ReplicationGroupId, List<LeaderElectionListener>> 
leaderElectionListeners = new ConcurrentHashMap<>();
 
     /**
      * Register leader election listener for client.
@@ -41,7 +41,7 @@ public class RaftGroupEventsClientListener {
      * @param groupId Group id.
      * @param listener Listener.
     */
-    public void addLeaderElectionListener(ReplicationGroupId groupId, 
BiConsumer<ClusterNode, Long> listener) {
+    public void addLeaderElectionListener(ReplicationGroupId groupId, 
LeaderElectionListener listener) {
         leaderElectionListeners.compute(groupId, (k, listeners) -> {
             if (listeners == null) {
                 listeners = new ArrayList<>();
@@ -53,13 +53,13 @@ public class RaftGroupEventsClientListener {
         });
     }
 
-        /**
-         * Unregister leader election listener for client.
-         *
-         * @param groupId Group id.
-         * @param listener Listener.
-        */
-    public void removeLeaderElectionListener(ReplicationGroupId groupId, 
BiConsumer<ClusterNode, Long> listener) {
+    /**
+     * Unregister leader election listener for client.
+     *
+     * @param groupId Group id.
+     * @param listener Listener.
+    */
+    public void removeLeaderElectionListener(ReplicationGroupId groupId, 
LeaderElectionListener listener) {
         leaderElectionListeners.compute(groupId, (k, listeners) -> {
             if (listeners == null) {
                 return null;
@@ -79,12 +79,12 @@ public class RaftGroupEventsClientListener {
      * @param term Election term.
     */
     public void onLeaderElected(ReplicationGroupId groupId, ClusterNode 
leader, long term) {
-        List<BiConsumer<ClusterNode, Long>> listeners = 
leaderElectionListeners.get(groupId);
+        List<LeaderElectionListener> listeners = 
leaderElectionListeners.get(groupId);
 
         if (listeners != null) {
-            for (BiConsumer<ClusterNode, Long> listener : listeners) {
+            for (LeaderElectionListener listener : listeners) {
                 try {
-                    listener.accept(leader, term);
+                    listener.onLeaderElected(leader, term);
                 } catch (Exception e) {
                     LOG.warn("Failed to notify leader election listener for 
group=" + groupId, e);
                 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index cc5a82c84f..6fc49e2422 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -26,7 +26,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -34,6 +33,7 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.LeaderElectionListener;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
@@ -83,8 +83,8 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
     private final RaftConfiguration raftConfiguration;
 
     /**
-     * Whether to notify callback after subscription to pass the current 
leader and term into it, even if the leader
-     * did not change in that moment (see {@link 
#subscribeLeader(BiConsumer)}).
+     * Whether to notify callback after subscription to pass the current 
leader and term into it, even if the leader did not change in that
+     * moment (see {@link #subscribeLeader}).
      */
     private final boolean notifyOnSubscription;
 
@@ -96,8 +96,8 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
      * @param executor RPC executor.
      * @param raftClient RPC RAFT client.
      * @param logicalTopologyService Logical topology.
-     * @param notifyOnSubscription Whether to notify callback after 
subscription to pass the current leader and term into it,
-     *        even if the leader did not change in that moment (see {@link 
#subscribeLeader(BiConsumer)}).
+     * @param notifyOnSubscription Whether to notify callback after 
subscription to pass the current leader and term into it, even
+     *         if the leader did not change in that moment (see {@link 
#subscribeLeader}).
      */
     private TopologyAwareRaftGroupService(
             ClusterService cluster,
@@ -163,11 +163,11 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
      * @param getLeader True to get the group's leader upon service creation.
      * @param executor RPC executor.
      * @param logicalTopologyService Logical topology service.
-     * @param notifyOnSubscription Whether to notify callback after 
subscription to pass the current leader and term into it,
-     *        even if the leader did not change in that moment (see {@link 
#subscribeLeader(BiConsumer)}).
+     * @param notifyOnSubscription Whether to notify callback after 
subscription to pass the current leader and term into it, even
+     *         if the leader did not change in that moment (see {@link 
#subscribeLeader}).
      * @return Future to create a raft client.
      */
-    public static CompletableFuture<RaftGroupService> start(
+    public static CompletableFuture<TopologyAwareRaftGroupService> start(
             ReplicationGroupId groupId,
             ClusterService cluster,
             RaftMessagesFactory factory,
@@ -188,7 +188,7 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
      * Sends a subscribe message to a specific node of the cluster.
      *
      * @param node Node.
-     * @param msg  Subscribe message.
+     * @param msg Subscribe message.
      * @return A future that completes with true when the message sent and 
false value when the node left the cluster.
      */
     private CompletableFuture<Boolean> sendSubscribeMessage(ClusterNode node, 
SubscriptionLeaderChangeRequest msg) {
@@ -204,8 +204,8 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
      *
      * @param node Node.
      * @param msg Subscribe message to send.
-     * @param msgSendFut Future that completes with true when the message sent 
and with false when the node left topology and cannot get a
-     *     cluster.
+     * @param msgSendFut Future that completes with true when the message sent 
and with false when the node left topology and cannot
+     *         get a cluster.
      */
     private void sendWithRetry(ClusterNode node, 
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
         clusterService.messagingService().invoke(node, msg, 
raftConfiguration.responseTimeout().value()).whenCompleteAsync((unused, th) -> {
@@ -264,7 +264,7 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
      * @param callback Callback closure.
      * @return Future that is completed when all subscription messages to 
peers are sent.
      */
-    public CompletableFuture<Void> subscribeLeader(BiConsumer<ClusterNode, 
Long> callback) {
+    public CompletableFuture<Void> subscribeLeader(LeaderElectionListener 
callback) {
         assert !serverEventHandler.isSubscribed() : "The node already 
subscribed";
 
         int peers = peers().size();
@@ -443,15 +443,15 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
     /**
      * Leader election handler.
      */
-    private static class ServerEventHandler implements BiConsumer<ClusterNode, 
Long> {
+    private static class ServerEventHandler implements LeaderElectionListener {
         /** A term of last elected leader. */
         private long term = 0;
 
         /** Last elected leader. */
-        private Peer leaderPeer;
+        private volatile Peer leaderPeer;
 
         /** A leader elected callback. */
-        private BiConsumer<ClusterNode, Long> onLeaderElectedCallback;
+        private LeaderElectionListener onLeaderElectedCallback;
 
         /**
          * Notifies about a new leader elected, if it did not make before.
@@ -459,12 +459,13 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
          * @param node Node.
          * @param term Term.
          */
-        private synchronized void onLeaderElected(ClusterNode node, long term) 
{
+        @Override
+        public synchronized void onLeaderElected(ClusterNode node, long term) {
             if (onLeaderElectedCallback != null && term > this.term) {
                 this.term = term;
                 this.leaderPeer = new Peer(node.name());
 
-                onLeaderElectedCallback.accept(node, term);
+                onLeaderElectedCallback.onLeaderElected(node, term);
             }
         }
 
@@ -473,7 +474,7 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
          *
          * @param onLeaderElectedCallback A callback closure.
          */
-        public synchronized void 
setOnLeaderElectedCallback(BiConsumer<ClusterNode, Long> 
onLeaderElectedCallback) {
+        synchronized void setOnLeaderElectedCallback(LeaderElectionListener 
onLeaderElectedCallback) {
             this.onLeaderElectedCallback = onLeaderElectedCallback;
         }
 
@@ -482,15 +483,10 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
          *
          * @return True if notification required, false otherwise.
          */
-        public synchronized boolean isSubscribed() {
+        synchronized boolean isSubscribed() {
             return onLeaderElectedCallback != null;
         }
 
-        @Override
-        public void accept(ClusterNode clusterNode, Long term) {
-            onLeaderElected(clusterNode, term);
-        }
-
         Peer leader() {
             return leaderPeer;
         }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
index 8bb79db8a9..959f21f8ea 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
@@ -69,16 +69,16 @@ public class TopologyAwareRaftGroupServiceFactory 
implements RaftServiceFactory<
             ScheduledExecutorService raftClientExecutor
     ) {
         return TopologyAwareRaftGroupService.start(
-                    groupId,
-                    clusterService,
-                    raftMessagesFactory,
-                    raftConfiguration,
-                    peersAndLearners,
-                    true,
-                    raftClientExecutor,
-                    logicalTopologyService,
-                    eventsClientListener,
-                    true
-            ).thenApply(TopologyAwareRaftGroupService.class::cast);
+                groupId,
+                clusterService,
+                raftMessagesFactory,
+                raftConfiguration,
+                peersAndLearners,
+                true,
+                raftClientExecutor,
+                logicalTopologyService,
+                eventsClientListener,
+                true
+        );
     }
 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index 5aaca91e2a..669f388dbb 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -139,7 +139,7 @@ public class Replica {
         return whenReplicaReady;
     }
 
-    private void onLeaderElected(ClusterNode clusterNode, Long term) {
+    private void onLeaderElected(ClusterNode clusterNode, long term) {
         leaderRef.set(clusterNode);
 
         if (!leaderFuture.isDone()) {
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index c2b1008f20..868fd17acb 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -412,7 +412,7 @@ public class ReplicaManager implements IgniteComponent {
                             if (throwable == null) {
                                 return true;
                             } else {
-                                LOG.error("Failed to stop replica 
[replicaGrpId={}]", replicaGrpId, throwable);
+                                LOG.error("Failed to stop replica 
[replicaGrpId={}]", throwable, replicaGrpId);
 
                                 return false;
                             }
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
index d9bd5bfd11..e0039f1f63 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
@@ -391,7 +391,7 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
             });
         }
 
-        return (TopologyAwareRaftGroupService) 
TopologyAwareRaftGroupService.start(
+        return TopologyAwareRaftGroupService.start(
                 GROUP_ID,
                 localClusterService,
                 FACTORY,
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
index d5ffb24812..83957a180d 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
@@ -35,11 +35,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import 
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
+import org.apache.ignite.internal.raft.LeaderElectionListener;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
@@ -62,11 +62,11 @@ public class PlacementDriverReplicaSideTest {
 
     private Replica replica;
 
-    private AtomicReference<BiConsumer<ClusterNode, Long>> callbackHolder = 
new AtomicReference<>();
+    private final AtomicReference<LeaderElectionListener> callbackHolder = new 
AtomicReference<>();
 
     private PendingComparableValuesTracker<Long, Void> storageIndexTracker;
 
-    private AtomicLong indexOnLeader = new AtomicLong(0);
+    private final AtomicLong indexOnLeader = new AtomicLong(0);
 
     private Peer currentLeader = null;
 
@@ -76,7 +76,7 @@ public class PlacementDriverReplicaSideTest {
         TopologyAwareRaftGroupService raftClient = 
mock(TopologyAwareRaftGroupService.class);
 
         when(raftClient.subscribeLeader(any())).thenAnswer(invocationOnMock -> 
{
-            BiConsumer<ClusterNode, Long> callback = 
invocationOnMock.getArgument(0);
+            LeaderElectionListener callback = invocationOnMock.getArgument(0);
             callbackHolder.set(callback);
 
             return completedFuture(null);
@@ -126,7 +126,7 @@ public class PlacementDriverReplicaSideTest {
      */
     private void leaderElection(ClusterNode leader) {
         if (callbackHolder.get() != null) {
-            callbackHolder.get().accept(leader, 1L);
+            callbackHolder.get().onLeaderElected(leader, 1L);
         }
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 54881775cb..9a98a90d9d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -52,9 +52,11 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.raft.Loza;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -64,6 +66,7 @@ import 
org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -91,10 +94,10 @@ public class ItDistributedConfigurationPropertiesTest {
     private static ClusterManagementConfiguration 
clusterManagementConfiguration;
 
     @InjectConfiguration
-    private static SecurityConfiguration securityConfiguration;
+    private static NodeAttributesConfiguration nodeAttributes;
 
     @InjectConfiguration
-    private static NodeAttributesConfiguration nodeAttributes;
+    private static MetaStorageConfiguration metaStorageConfiguration;
 
     /**
      * An emulation of an Ignite node, that only contains components necessary 
for tests.
@@ -139,7 +142,10 @@ public class ItDistributedConfigurationPropertiesTest {
             );
 
             HybridClock clock = new HybridClockImpl();
-            raftManager = new Loza(clusterService, raftConfiguration, workDir, 
clock);
+
+            var raftGroupEventsClientListener = new 
RaftGroupEventsClientListener();
+
+            raftManager = new Loza(clusterService, raftConfiguration, workDir, 
clock, raftGroupEventsClientListener);
 
             var clusterStateStorage = new TestClusterStateStorage();
             var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
@@ -154,14 +160,25 @@ public class ItDistributedConfigurationPropertiesTest {
                     nodeAttributes,
                     new TestConfigurationValidator());
 
+            var logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+            var topologyAwareRaftGroupServiceFactory = new 
TopologyAwareRaftGroupServiceFactory(
+                    clusterService,
+                    logicalTopologyService,
+                    Loza.FACTORY,
+                    raftGroupEventsClientListener
+            );
+
             metaStorageManager = new MetaStorageManagerImpl(
                     vaultManager,
                     clusterService,
                     cmgManager,
-                    new LogicalTopologyServiceImpl(logicalTopology, 
cmgManager),
+                    logicalTopologyService,
                     raftManager,
                     new SimpleInMemoryKeyValueStorage(name()),
-                    clock
+                    clock,
+                    topologyAwareRaftGroupServiceFactory,
+                    metaStorageConfiguration
             );
 
             deployWatchesFut = metaStorageManager.deployWatches();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 86a8c015b5..0a88f8fd22 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -45,9 +45,11 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.raft.Loza;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -56,6 +58,7 @@ import 
org.apache.ignite.internal.vault.persistence.PersistentVaultService;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
@@ -76,6 +79,9 @@ public class ItDistributedConfigurationStorageTest {
     @InjectConfiguration
     private static NodeAttributesConfiguration nodeAttributes;
 
+    @InjectConfiguration
+    private static MetaStorageConfiguration metaStorageConfiguration;
+
     /**
      * An emulation of an Ignite node, that only contains components necessary 
for tests.
      */
@@ -111,7 +117,9 @@ public class ItDistributedConfigurationStorageTest {
 
             HybridClock clock = new HybridClockImpl();
 
-            raftManager = new Loza(clusterService, raftConfiguration, workDir, 
clock);
+            var raftGroupEventsClientListener = new 
RaftGroupEventsClientListener();
+
+            raftManager = new Loza(clusterService, raftConfiguration, workDir, 
clock, raftGroupEventsClientListener);
 
             var clusterStateStorage = new TestClusterStateStorage();
             var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
@@ -126,14 +134,25 @@ public class ItDistributedConfigurationStorageTest {
                     nodeAttributes,
                     new TestConfigurationValidator());
 
+            var logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+            var topologyAwareRaftGroupServiceFactory = new 
TopologyAwareRaftGroupServiceFactory(
+                    clusterService,
+                    logicalTopologyService,
+                    Loza.FACTORY,
+                    raftGroupEventsClientListener
+            );
+
             metaStorageManager = new MetaStorageManagerImpl(
                     vaultManager,
                     clusterService,
                     cmgManager,
-                    new LogicalTopologyServiceImpl(logicalTopology, 
cmgManager),
+                    logicalTopologyService,
                     raftManager,
                     new SimpleInMemoryKeyValueStorage(name()),
-                    clock
+                    clock,
+                    topologyAwareRaftGroupServiceFactory,
+                    metaStorageConfiguration
             );
 
             deployWatchesFut = metaStorageManager.deployWatches();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index ac1e021304..23fff6513d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -96,6 +96,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
@@ -198,6 +199,9 @@ public class ItRebalanceDistributedTest {
     @InjectConfiguration
     private static NodeAttributesConfiguration nodeAttributes;
 
+    @InjectConfiguration
+    private static MetaStorageConfiguration metaStorageConfiguration;
+
     @Target(ElementType.METHOD)
     @Retention(RetentionPolicy.RUNTIME)
     private @interface UseTestTxStateStorage {
@@ -647,7 +651,11 @@ public class ItRebalanceDistributedTest {
 
             lockManager = new HeapLockManager();
 
-            raftManager = new Loza(clusterService, raftConfiguration, dir, new 
HybridClockImpl());
+            HybridClock hybridClock = new HybridClockImpl();
+
+            var raftGroupEventsClientListener = new 
RaftGroupEventsClientListener();
+
+            raftManager = new Loza(clusterService, raftConfiguration, dir, 
hybridClock, raftGroupEventsClientListener);
 
             var clusterStateStorage = new TestClusterStateStorage();
             var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
@@ -665,12 +673,10 @@ public class ItRebalanceDistributedTest {
             replicaManager = new ReplicaManager(
                     clusterService,
                     cmgManager,
-                    new HybridClockImpl(),
+                    hybridClock,
                     Set.of(TableMessageGroup.class, TxMessageGroup.class)
             );
 
-            HybridClock hybridClock = new HybridClockImpl();
-
             ReplicaService replicaSvc = new ReplicaService(
                     clusterService.messagingService(),
                     hybridClock
@@ -682,9 +688,17 @@ public class ItRebalanceDistributedTest {
 
             LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
 
+            var topologyAwareRaftGroupServiceFactory = new 
TopologyAwareRaftGroupServiceFactory(
+                    clusterService,
+                    logicalTopologyService,
+                    Loza.FACTORY,
+                    raftGroupEventsClientListener
+            );
+
             KeyValueStorage keyValueStorage = 
testInfo.getTestMethod().get().isAnnotationPresent(UseRocksMetaStorage.class)
                     ? new RocksDbKeyValueStorage(nodeName, resolveDir(dir, 
"metaStorage"))
                     : new SimpleInMemoryKeyValueStorage(nodeName);
+
             metaStorageManager = new MetaStorageManagerImpl(
                     vaultManager,
                     clusterService,
@@ -692,7 +706,9 @@ public class ItRebalanceDistributedTest {
                     logicalTopologyService,
                     raftManager,
                     keyValueStorage,
-                    hybridClock
+                    hybridClock,
+                    topologyAwareRaftGroupServiceFactory,
+                    metaStorageConfiguration
             );
 
             cfgStorage = new 
DistributedConfigurationStorage(metaStorageManager, vaultManager);
@@ -771,13 +787,6 @@ public class ItRebalanceDistributedTest {
 
             schemaManager = new SchemaManager(registry, tablesCfg, 
metaStorageManager);
 
-            TopologyAwareRaftGroupServiceFactory 
topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
-                    clusterService,
-                    logicalTopologyService,
-                    Loza.FACTORY,
-                    new RaftGroupEventsClientListener()
-            );
-
             distributionZoneManager = new DistributionZoneManager(
                     zonesCfg,
                     tablesCfg,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index f57fae0c9e..5d3923ef7d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
@@ -170,6 +171,9 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
     @InjectConfiguration
     private static NodeAttributesConfiguration nodeAttributes;
 
+    @InjectConfiguration
+    private static MetaStorageConfiguration metaStorageConfiguration;
+
     /**
      * Start some of Ignite components that are able to serve as Ignite node 
for test purposes.
      *
@@ -239,7 +243,9 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         HybridClock hybridClock = new HybridClockImpl();
 
-        var raftMgr = new Loza(clusterSvc, raftConfiguration, dir, 
hybridClock);
+        var raftGroupEventsClientListener = new 
RaftGroupEventsClientListener();
+
+        var raftMgr = new Loza(clusterSvc, raftConfiguration, dir, 
hybridClock, raftGroupEventsClientListener);
 
         var clusterStateStorage = new 
RocksDbClusterStateStorage(dir.resolve("cmg"));
 
@@ -270,14 +276,25 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         var txManager = new TxManagerImpl(replicaService, lockManager, 
hybridClock, new TransactionIdGenerator(idx));
 
+        var logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+        var topologyAwareRaftGroupServiceFactory = new 
TopologyAwareRaftGroupServiceFactory(
+                clusterSvc,
+                logicalTopologyService,
+                Loza.FACTORY,
+                raftGroupEventsClientListener
+        );
+
         var metaStorageMgr = new MetaStorageManagerImpl(
                 vault,
                 clusterSvc,
                 cmgManager,
-                new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
+                logicalTopologyService,
                 raftMgr,
                 new RocksDbKeyValueStorage(name, dir.resolve("metastorage")),
-                hybridClock
+                hybridClock,
+                topologyAwareRaftGroupServiceFactory,
+                metaStorageConfiguration
         );
 
         var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, 
vault);
@@ -321,8 +338,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         SchemaManager schemaManager = new SchemaManager(registry, 
tablesConfig, metaStorageMgr);
 
-        LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
-
         DistributionZoneManager distributionZoneManager = new 
DistributionZoneManager(
                 zonesConfig,
                 tablesConfig,
@@ -332,13 +347,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 name
         );
 
-        TopologyAwareRaftGroupServiceFactory 
topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
-                clusterSvc,
-                new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
-                Loza.FACTORY,
-                new RaftGroupEventsClientListener()
-        );
-
         var clockWaiter = new ClockWaiter("test", hybridClock);
 
         var catalogManager = new CatalogServiceImpl(
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 1811fd2310..0b015acd6a 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -88,6 +88,7 @@ import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.metrics.MetricManager;
@@ -217,7 +218,7 @@ public class IgniteImpl implements Ignite {
     private final Loza raftMgr;
 
     /** Meta storage manager. */
-    private final MetaStorageManager metaStorageMgr;
+    private final MetaStorageManagerImpl metaStorageMgr;
 
     /** Distributed configuration validator. */
     private final ConfigurationValidator distributedConfigurationValidator;
@@ -406,6 +407,13 @@ public class IgniteImpl implements Ignite {
 
         logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgMgr);
 
+        var topologyAwareRaftGroupServiceFactory = new 
TopologyAwareRaftGroupServiceFactory(
+                clusterSvc,
+                logicalTopologyService,
+                Loza.FACTORY,
+                raftGroupEventsClientListener
+        );
+
         metaStorageMgr = new MetaStorageManagerImpl(
                 vaultMgr,
                 clusterSvc,
@@ -413,7 +421,8 @@ public class IgniteImpl implements Ignite {
                 logicalTopologyService,
                 raftMgr,
                 new RocksDbKeyValueStorage(name, 
workDir.resolve(METASTORAGE_DB_PATH)),
-                clock
+                clock,
+                topologyAwareRaftGroupServiceFactory
         );
 
         this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, 
vaultMgr);
@@ -434,15 +443,7 @@ public class IgniteImpl implements Ignite {
 
         TablesConfiguration tablesConfig = 
clusterConfigRegistry.getConfiguration(TablesConfiguration.KEY);
 
-        TopologyAwareRaftGroupServiceFactory 
topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
-                clusterSvc,
-                logicalTopologyService,
-                Loza.FACTORY,
-                raftGroupEventsClientListener
-        );
-
-        DistributionZonesConfiguration zonesConfiguration = 
clusterConfigRegistry
-                .getConfiguration(DistributionZonesConfiguration.KEY);
+        
metaStorageMgr.configure(clusterConfigRegistry.getConfiguration(MetaStorageConfiguration.KEY));
 
         
metricManager.configure(clusterConfigRegistry.getConfiguration(MetricConfiguration.KEY));
 
@@ -479,7 +480,7 @@ public class IgniteImpl implements Ignite {
         schemaManager = new SchemaManager(registry, tablesConfig, 
metaStorageMgr);
 
         distributionZoneManager = new DistributionZoneManager(
-                zonesConfiguration,
+                
clusterConfigRegistry.getConfiguration(DistributionZonesConfiguration.KEY),
                 tablesConfig,
                 metaStorageMgr,
                 logicalTopologyService,

Reply via email to