This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7de32e8c60 IGNITE-23292 Local startup of metastorage compaction on 
node startup after recovery (#4663)
7de32e8c60 is described below

commit 7de32e8c60cf2bb31e3d950ec7b0c195e29a1b5c
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Nov 5 09:08:54 2024 +0300

    IGNITE-23292 Local startup of metastorage compaction on node startup after 
recovery (#4663)
---
 .../internal/catalog/storage/UpdateLogImpl.java    |   9 +-
 .../catalog/storage/UpdateLogImplTest.java         |   3 +-
 .../distributionzones/DistributionZoneManager.java |   5 +-
 .../rebalance/DistributionZoneRebalanceEngine.java |  15 ++-
 .../DistributionZoneRebalanceEngineV2.java         |   5 +-
 .../DistributionZoneRebalanceEngineTest.java       |   3 +-
 .../internal/index/IndexBuildingManager.java       |   5 +-
 .../IndexAvailabilityControllerRestorerTest.java   |   7 +-
 .../index/IndexAvailabilityControllerTest.java     |   2 +-
 .../internal/index/TestIndexManagementUtils.java   |   9 +-
 .../internal/metastorage/MetaStorageManager.java   |   4 +-
 .../ignite/internal/metastorage/Revisions.java     |  57 +++++++++
 modules/metastorage/build.gradle                   |   2 +
 .../metastorage/TestMetasStorageUtils.java         |  67 +++++++++++
 .../ItMetaStorageCompactionTriggerOneNodeTest.java | 128 +++++++++++++++++++++
 .../impl/ItMetaStorageCompactionTriggerTest.java   |  61 ++--------
 ...ommand.java => GetCurrentRevisionsCommand.java} |  11 +-
 .../command/MetastorageCommandsMessageGroup.java   |   4 +-
 .../command/response/RevisionsInfo.java            |  71 ++++++++++++
 .../impl/MetaStorageCompactionTrigger.java         |  48 +++++++-
 .../metastorage/impl/MetaStorageManagerImpl.java   | 116 +++++++------------
 .../metastorage/impl/MetaStorageService.java       |   7 +-
 .../metastorage/impl/MetaStorageServiceImpl.java   |   7 +-
 .../impl/RecoveryRevisionsListenerImpl.java        | 101 ++++++++++++++++
 .../server/AbstractKeyValueStorage.java            |  89 ++++++++++++--
 .../metastorage/server/KeyValueStorage.java        |  32 +++---
 .../NotifyWatchProcessorEvent.java}                |  20 +++-
 .../RecoveryRevisionsListener.java}                |  18 ++-
 .../server/UpdateCompactionRevisionEvent.java      |  51 ++++++++
 .../metastorage/server/UpdateEntriesEvent.java     |  54 +++++++++
 .../server/persistence/RocksDbKeyValueStorage.java | 112 +++++++-----------
 .../server/raft/MetaStorageListener.java           |  10 +-
 .../MetaStorageDeployWatchesCorrectnessTest.java   |   6 +-
 .../impl/MetaStorageManagerRecoveryTest.java       |   9 +-
 .../AbstractCompactionKeyValueStorageTest.java     |  23 +++-
 .../server/SimpleInMemoryKeyValueStorage.java      |  60 +++++-----
 .../PartitionReplicaLifecycleManager.java          |   5 +-
 .../internal/placementdriver/ActiveActorTest.java  |   3 +-
 .../placementdriver/AssignmentsTracker.java        |   4 +-
 .../placementdriver/PlacementDriverManager.java    |   5 +-
 .../internal/placementdriver/LeaseUpdaterTest.java |   3 +-
 .../placementdriver/PlacementDriverTest.java       |   5 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   2 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 .../storage/DistributedConfigurationStorage.java   |   4 +-
 .../ignite/internal/BaseIgniteRestartTest.java     |   7 +-
 .../ignite/internal/schema/SchemaManagerTest.java  |   4 +-
 .../internal/table/distributed/TableManager.java   |   5 +-
 .../table/distributed/index/IndexMetaStorage.java  |   5 +-
 .../table/distributed/TableManagerTest.java        |   5 +-
 50 files changed, 960 insertions(+), 331 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 3e083a5a64..681c08686b 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
@@ -113,7 +114,7 @@ public class UpdateLogImpl implements UpdateLog {
                 );
             }
 
-            recoveryStateFromMetastore(handler);
+            recoverStateFromMetastore(handler);
 
             UpdateListener listener = new UpdateListener(handler, marshaller);
             this.listener = listener;
@@ -236,12 +237,12 @@ public class UpdateLogImpl implements UpdateLog {
         }
     }
 
-    private void recoveryStateFromMetastore(OnUpdateHandler handler) {
-        CompletableFuture<Long> recoveryFinishedFuture = 
metastore.recoveryFinishedFuture();
+    private void recoverStateFromMetastore(OnUpdateHandler handler) {
+        CompletableFuture<Revisions> recoveryFinishedFuture = 
metastore.recoveryFinishedFuture();
 
         assert recoveryFinishedFuture.isDone();
 
-        long recoveryRevision = recoveryFinishedFuture.join();
+        long recoveryRevision = recoveryFinishedFuture.join().revision();
 
         Entry earliestVersion = 
metastore.getLocally(CatalogKey.snapshotVersion(), recoveryRevision);
 
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index 7c111cda1f..b6d7998a88 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -51,6 +51,7 @@ import 
org.apache.ignite.internal.catalog.storage.serialization.UpdateLogMarshal
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
@@ -231,7 +232,7 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest {
         metastore = StandaloneMetaStorageManager.create(keyValueStorage, 
readOperationForCompactionTracker);
         assertThat(metastore.startAsync(componentContext), 
willCompleteSuccessfully());
 
-        assertThat(metastore.recoveryFinishedFuture(), 
willBe(recoverRevision));
+        
assertThat(metastore.recoveryFinishedFuture().thenApply(Revisions::revision), 
willBe(recoverRevision));
     }
 
     @Test
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index bf285938f1..e7f4693a76 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -114,6 +114,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
@@ -271,12 +272,12 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             
metaStorageManager.registerPrefixWatch(zonesLogicalTopologyPrefix(), 
topologyWatchListener);
 
-            CompletableFuture<Long> recoveryFinishFuture = 
metaStorageManager.recoveryFinishedFuture();
+            CompletableFuture<Revisions> recoveryFinishFuture = 
metaStorageManager.recoveryFinishedFuture();
 
             // At the moment of the start of this manager, it is guaranteed 
that Meta Storage has been recovered.
             assert recoveryFinishFuture.isDone();
 
-            long recoveryRevision = recoveryFinishFuture.join();
+            long recoveryRevision = recoveryFinishFuture.join().revision();
 
             restoreGlobalStateFromLocalMetastorage(recoveryRevision);
 
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index 07405ac3f8..777afd919b 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -48,11 +48,13 @@ import 
org.apache.ignite.internal.distributionzones.utils.CatalogAlterZoneEventL
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Zone rebalance manager.
@@ -89,6 +91,11 @@ public class DistributionZoneRebalanceEngine {
     // TODO IGNITE-22115 remove it
     public static final boolean ENABLED = getBoolean(FEATURE_FLAG_NAME, false);
 
+    /** Special flag to skip rebalance on node recovery for tests. */
+    // TODO: IGNITE-23466 Remove it
+    @TestOnly
+    public static final String SKIP_REBALANCE_TRIGGERS_RECOVERY = 
"IGNITE_SKIP_REBALANCE_TRIGGERS_RECOVERY";
+
     /**
      * Constructor.
      *
@@ -132,12 +139,16 @@ public class DistributionZoneRebalanceEngine {
             // TODO: IGNITE-18694 - Recovery for the case when zones watch 
listener processed event but assignments were not updated.
             metaStorageManager.registerPrefixWatch(zoneDataNodesKey(), 
dataNodesListener);
 
-            CompletableFuture<Long> recoveryFinishFuture = 
metaStorageManager.recoveryFinishedFuture();
+            CompletableFuture<Revisions> recoveryFinishFuture = 
metaStorageManager.recoveryFinishedFuture();
 
             // At the moment of the start of this manager, it is guaranteed 
that Meta Storage has been recovered.
             assert recoveryFinishFuture.isDone();
 
-            long recoveryRevision = recoveryFinishFuture.join();
+            long recoveryRevision = recoveryFinishFuture.join().revision();
+
+            if (getBoolean(SKIP_REBALANCE_TRIGGERS_RECOVERY, false)) {
+                return nullCompletedFuture();
+            }
 
             if (ENABLED) {
                 return rebalanceTriggersRecovery(recoveryRevision, 
catalogVersion)
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
index 1fcd523022..26b7e14f9f 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.distributionzones.utils.CatalogAlterZoneEventL
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -113,12 +114,12 @@ public class DistributionZoneRebalanceEngineV2 {
             // TODO: IGNITE-18694 - Recovery for the case when zones watch 
listener processed event but assignments were not updated.
             metaStorageManager.registerPrefixWatch(zoneDataNodesKey(), 
dataNodesListener);
 
-            CompletableFuture<Long> recoveryFinishFuture = 
metaStorageManager.recoveryFinishedFuture();
+            CompletableFuture<Revisions> recoveryFinishFuture = 
metaStorageManager.recoveryFinishedFuture();
 
             // At the moment of the start of this manager, it is guaranteed 
that Meta Storage has been recovered.
             assert recoveryFinishFuture.isDone();
 
-            long recoveryRevision = recoveryFinishFuture.join();
+            long recoveryRevision = recoveryFinishFuture.join().revision();
 
             return rebalanceTriggersRecovery(recoveryRevision);
         });
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index a1653e7927..02b788ac3a 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -80,6 +80,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
@@ -173,7 +174,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
             return null;
         }).when(metaStorageManager).registerPrefixWatch(any(), any());
 
-        
when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(1L));
+        
when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(new
 Revisions(1, -1)));
 
         AtomicLong raftIndex = new AtomicLong();
 
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
index 8c76986d75..a998694e95 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.replicator.ReplicaService;
@@ -134,11 +135,11 @@ public class IndexBuildingManager implements 
IgniteComponent {
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
         return inBusyLockAsync(busyLock, () -> {
-            CompletableFuture<Long> recoveryFinishedFuture = 
metaStorageManager.recoveryFinishedFuture();
+            CompletableFuture<Revisions> recoveryFinishedFuture = 
metaStorageManager.recoveryFinishedFuture();
 
             assert recoveryFinishedFuture.isDone();
 
-            long recoveryRevision = recoveryFinishedFuture.join();
+            long recoveryRevision = recoveryFinishedFuture.join().revision();
 
             indexAvailabilityController.start(recoveryRevision);
 
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
index a99a3c48fe..6bd3231b5c 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
@@ -232,11 +233,11 @@ public class IndexAvailabilityControllerRestorerTest 
extends BaseIgniteAbstractT
 
         controller = new IndexAvailabilityController(catalogManager, 
metaStorageManager, mock(IndexBuilder.class));
 
-        CompletableFuture<Long> metastoreRecoveryFuture = 
metaStorageManager.recoveryFinishedFuture();
+        CompletableFuture<Revisions> metastoreRecoveryFuture = 
metaStorageManager.recoveryFinishedFuture();
 
-        assertThat(metastoreRecoveryFuture, willBe(greaterThan(0L)));
+        assertThat(metastoreRecoveryFuture.thenApply(Revisions::revision), 
willBe(greaterThan(0L)));
 
-        controller.start(metastoreRecoveryFuture.join());
+        controller.start(metastoreRecoveryFuture.join().revision());
     }
 
     private void setLocalNodeToClusterService(ClusterNode clusterNode) {
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
index c1ddbc2fd3..49933fd47b 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -102,7 +102,7 @@ public class IndexAvailabilityControllerTest extends 
BaseIgniteAbstractTest {
 
         assertThat(metaStorageManager.recoveryFinishedFuture(), 
willCompleteSuccessfully());
 
-        
indexAvailabilityController.start(metaStorageManager.recoveryFinishedFuture().join());
+        
indexAvailabilityController.start(metaStorageManager.recoveryFinishedFuture().join().revision());
 
         assertThat(metaStorageManager.deployWatches(), 
willCompleteSuccessfully());
 
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
index cd5a7a10bf..dc1db53e93 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageService;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
@@ -110,12 +111,12 @@ class TestIndexManagementUtils {
     static void 
awaitTillGlobalMetastoreRevisionIsApplied(MetaStorageManagerImpl 
metaStorageManager) throws Exception {
         assertTrue(
                 waitForCondition(() -> {
-                    CompletableFuture<Long> currentRevisionFuture = 
metaStorageManager.metaStorageService()
-                            .thenCompose(MetaStorageService::currentRevision);
+                    CompletableFuture<RevisionsInfo> currentRevisionsFuture = 
metaStorageManager.metaStorageService()
+                            .thenCompose(MetaStorageService::currentRevisions);
 
-                    assertThat(currentRevisionFuture, 
willCompleteSuccessfully());
+                    assertThat(currentRevisionsFuture, 
willCompleteSuccessfully());
 
-                    return currentRevisionFuture.join() == 
metaStorageManager.appliedRevision();
+                    return currentRevisionsFuture.join().revision() == 
metaStorageManager.appliedRevision();
                 }, 1_000)
         );
     }
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index dd7ba5efa5..ddabb27d85 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -414,9 +414,9 @@ public interface MetaStorageManager extends IgniteComponent 
{
 
     /**
      * Returns a future which completes when MetaStorage manager finished 
local recovery.
-     * The value of the future is the revision which must be used for state 
recovery by other components.
+     * The value of the future is the revisions which must be used for state 
recovery by other components.
      */
-    CompletableFuture<Long> recoveryFinishedFuture();
+    CompletableFuture<Revisions> recoveryFinishedFuture();
 
     /** Registers a Meta Storage revision update listener. */
     void registerRevisionUpdateListener(RevisionUpdateListener listener);
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/Revisions.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/Revisions.java
new file mode 100644
index 0000000000..595acbf89f
--- /dev/null
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/Revisions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage;
+
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import org.apache.ignite.internal.tostring.S;
+
+/** Information about metastorage revisions. */
+public class Revisions {
+    private final long revision;
+
+    private final long compactionRevision;
+
+    /**
+     * Constructor.
+     *
+     * @param revision Metastorage revision.
+     * @param compactionRevision Metastorage compaction revision.
+     */
+    public Revisions(long revision, long compactionRevision) {
+        this.revision = revision;
+        this.compactionRevision = compactionRevision;
+    }
+
+    /** Returns metastorage revision. */
+    public long revision() {
+        return revision;
+    }
+
+    /**
+     * Returns metastorage compaction revision of the up to which (inclusive) 
key versions will be deleted and when trying to read them,
+     * {@link CompactedException} will occur.
+     */
+    public long compactionRevision() {
+        return compactionRevision;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git a/modules/metastorage/build.gradle b/modules/metastorage/build.gradle
index d0c167f8b0..a6da29290f 100644
--- a/modules/metastorage/build.gradle
+++ b/modules/metastorage/build.gradle
@@ -68,6 +68,8 @@ dependencies {
     integrationTestImplementation project(':ignite-runner')
     integrationTestImplementation project(':ignite-system-disaster-recovery')
     integrationTestImplementation project(':ignite-configuration-system')
+    integrationTestImplementation project(':ignite-configuration-root')
+    integrationTestImplementation project(':ignite-distribution-zones')
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation testFixtures(project(':ignite-network'))
     integrationTestImplementation testFixtures(project(':ignite-raft'))
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/TestMetasStorageUtils.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/TestMetasStorageUtils.java
index bb55ddc9d5..67deee8707 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/TestMetasStorageUtils.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/TestMetasStorageUtils.java
@@ -17,12 +17,24 @@
 
 package org.apache.ignite.internal.metastorage;
 
+import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME;
+import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.INTERVAL_SYSTEM_PROPERTY_NAME;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.jetbrains.annotations.Nullable;
 
 /** Helper class for use in integration tests that may contain useful methods 
and constants. */
@@ -30,6 +42,15 @@ public class TestMetasStorageUtils {
     /** Special value representing any random timestamp. */
     public static final HybridTimestamp ANY_TIMESTAMP = new 
HybridTimestamp(1L, 0);
 
+    /** Foo key. */
+    public static final ByteArray FOO_KEY = ByteArray.fromString("foo_key");
+
+    /** Bar key. */
+    public static final ByteArray BAR_KEY = ByteArray.fromString("bar_key");
+
+    /** Random value. */
+    public static final byte[] VALUE = ByteArray.fromString("value").bytes();
+
     /** Checks the metastore entry. */
     public static void checkEntry(Entry actEntry, byte[] expKey, byte 
@Nullable [] expValue, long expRevision) {
         assertEquals(expRevision, actEntry.revision(), () -> "entry=" + 
actEntry);
@@ -55,4 +76,50 @@ public class TestMetasStorageUtils {
 
         return Arrays.equals(act.value(), exp.value());
     }
+
+    /** Creates a cluster configuration with metastorage compaction 
properties. */
+    public static String createClusterConfigWithCompactionProperties(long 
interval, long dataAvailabilityTime) {
+        return String.format(
+                "ignite.system.properties: {"
+                        + "%s.propertyValue= \"%s\", "
+                        + "%s.propertyValue= \"%s\""
+                        + "}",
+                INTERVAL_SYSTEM_PROPERTY_NAME, interval, 
DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME, dataAvailabilityTime
+        );
+    }
+
+    /** Returns the latest revision for the key from the leader. */
+    public static long latestKeyRevision(MetaStorageManager 
metaStorageManager, ByteArray key) {
+        CompletableFuture<Entry> latestEntryFuture = 
metaStorageManager.get(key);
+        assertThat(latestEntryFuture.thenApply(Entry::empty), willBe(false));
+
+        return latestEntryFuture.join().revision();
+    }
+
+    /** Returns {@code true} if the metastorage key has only one revision in 
the cluster. */
+    public static boolean allNodesContainSingleRevisionForKeyLocally(Cluster 
cluster, ByteArray key, long revision) {
+        return cluster.runningNodes()
+                .map(TestWrappers::unwrapIgniteImpl)
+                .map(IgniteImpl::metaStorageManager)
+                .map(metaStorageManager -> 
collectRevisionsLocally(metaStorageManager, key))
+                .allMatch(keyRevisions -> keyRevisions.size() == 1 && 
keyRevisions.contains(revision));
+    }
+
+    private static Set<Long> collectRevisionsLocally(MetaStorageManager 
metaStorageManager, ByteArray key) {
+        var res = new HashSet<Long>();
+
+        for (int i = 0; i <= metaStorageManager.appliedRevision(); i++) {
+            try {
+                Entry entry = metaStorageManager.getLocally(key, i);
+
+                if (!entry.empty()) {
+                    res.add(entry.revision());
+                }
+            } catch (CompactedException ignore) {
+                // Do nothing.
+            }
+        }
+
+        return res;
+    }
 }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerOneNodeTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerOneNodeTest.java
new file mode 100644
index 0000000000..02dce5c715
--- /dev/null
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerOneNodeTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.BAR_KEY;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.FOO_KEY;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.VALUE;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.allNodesContainSingleRevisionForKeyLocally;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.createClusterConfigWithCompactionProperties;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.latestKeyRevision;
+import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME;
+import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.INTERVAL_SYSTEM_PROPERTY_NAME;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.configuration.SystemDistributedExtensionConfiguration;
+import 
org.apache.ignite.internal.distributionzones.rebalance.DistributionZoneRebalanceEngine;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.command.CompactionCommand;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
+import org.junit.jupiter.api.Test;
+
+/** For {@link MetaStorageCompactionTrigger} testing for single node case. */
+@WithSystemProperty(key = 
DistributionZoneRebalanceEngine.SKIP_REBALANCE_TRIGGERS_RECOVERY, value = 
"true")
+public class ItMetaStorageCompactionTriggerOneNodeTest extends 
ClusterPerTestIntegrationTest {
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @Override
+    protected void customizeInitParameters(InitParametersBuilder builder) {
+        super.customizeInitParameters(builder);
+
+        
builder.clusterConfiguration(createClusterConfigWithCompactionProperties(10, 
10));
+    }
+
+    @Test
+    void testCompactionAfterRestartNode() throws Exception {
+        IgniteImpl node = aliveNode();
+
+        MetaStorageManager metaStorageManager = node.metaStorageManager();
+
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+        assertThat(metaStorageManager.put(BAR_KEY, VALUE), 
willCompleteSuccessfully());
+
+        // Let's wait until the compaction on revision of FOO_KEY creation 
happens.
+        long fooRevision = latestKeyRevision(metaStorageManager, FOO_KEY);
+        assertTrue(waitForCondition(() -> 
metaStorageManager.getCompactionRevisionLocally() >= fooRevision, 10, 1_000));
+
+        log.info("Latest revision for key: [key={}, revision={}]", FOO_KEY, 
fooRevision);
+
+        // Let's cancel new compactions to create a new version for the key 
and not compact it until we restart the node.
+        startDroppingCompactionCommand(node);
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+
+        long latestFooRevision = latestKeyRevision(metaStorageManager, 
FOO_KEY);
+
+        long latestCompactionRevision = 
metaStorageManager.getCompactionRevisionLocally();
+        // Let's change the properties before restarting so that a new 
scheduled compaction does not start after the node starts.
+        changeCompactionProperties(node, Long.MAX_VALUE, Long.MAX_VALUE);
+
+        IgniteImpl restartedNode = restartNode();
+
+        MetaStorageManager restartedMetaStorageManager = 
restartedNode.metaStorageManager();
+
+        // Let's make sure that after the restart the correct revision of the 
compaction is restored and the compaction itself will be at
+        // the latest compaction revision.
+        assertEquals(latestCompactionRevision, 
restartedMetaStorageManager.getCompactionRevisionLocally());
+        assertTrue(waitForCondition(() -> 
allNodesContainSingleRevisionForKeyLocally(cluster, FOO_KEY, 
latestFooRevision), 10, 1_000));
+    }
+
+    private IgniteImpl aliveNode() {
+        return unwrapIgniteImpl(node(0));
+    }
+
+    private IgniteImpl restartNode() {
+        return unwrapIgniteImpl(restartNode(0));
+    }
+
+    private static void startDroppingCompactionCommand(IgniteImpl node) {
+        node.dropMessages((s, message) -> message instanceof WriteActionRequest
+                && ((WriteActionRequest) message).deserializedCommand() 
instanceof CompactionCommand);
+    }
+
+    private static void changeCompactionProperties(IgniteImpl node, long 
interval, long dataAvailabilityTime) {
+        CompletableFuture<Void> changeFuture = node
+                .clusterConfiguration()
+                .getConfiguration(SystemDistributedExtensionConfiguration.KEY)
+                .system()
+                .properties()
+                .change(systemPropertyViews -> systemPropertyViews
+                        .update(
+                                INTERVAL_SYSTEM_PROPERTY_NAME,
+                                systemPropertyChange -> 
systemPropertyChange.changePropertyValue(Long.toString(interval))
+                        ).update(
+                                DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME,
+                                systemPropertyChange -> 
systemPropertyChange.changePropertyValue(Long.toString(dataAvailabilityTime))
+                        )
+                );
+
+        assertThat(changeFuture, willCompleteSuccessfully());
+    }
+}
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
index 4700050e23..3b4e5860c5 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
-import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME;
-import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.INTERVAL_SYSTEM_PROPERTY_NAME;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.FOO_KEY;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.VALUE;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.allNodesContainSingleRevisionForKeyLocally;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.createClusterConfigWithCompactionProperties;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.latestKeyRevision;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -30,20 +32,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.InitParametersBuilder;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
-import org.apache.ignite.internal.TestWrappers;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
-import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -51,10 +48,6 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 /** Integration test for {@link MetaStorageCompactionTrigger}. */
 public class ItMetaStorageCompactionTriggerTest extends 
ClusterPerClassIntegrationTest {
-    private static final ByteArray FOO_KEY = ByteArray.fromString("foo_key");
-
-    private static final byte[] VALUE = ByteArray.fromString("value").bytes();
-
     @Override
     protected int initialNodes() {
         return 2;
@@ -69,12 +62,7 @@ public class ItMetaStorageCompactionTriggerTest extends 
ClusterPerClassIntegrati
 
     @Override
     protected void configureInitParameters(InitParametersBuilder builder) {
-        String clusterConfig = "ignite.system.properties: {"
-                + INTERVAL_SYSTEM_PROPERTY_NAME + ".propertyValue= \"10\", "
-                + DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME + 
".propertyValue= \"10\""
-                + "}";
-
-        builder.clusterConfiguration(clusterConfig);
+        
builder.clusterConfiguration(createClusterConfigWithCompactionProperties(10, 
10));
     }
 
     @ParameterizedTest
@@ -97,21 +85,15 @@ public class ItMetaStorageCompactionTriggerTest extends 
ClusterPerClassIntegrati
 
         long latestFooEntryRevision = latestKeyRevision(metaStorageManager, 
FOO_KEY);
 
-        assertTrue(waitForCondition(() -> 
allNodesContainsSingleRevisionForKeyLocally(FOO_KEY, latestFooEntryRevision), 
10, 1_000));
+        assertTrue(
+                waitForCondition(() -> 
allNodesContainSingleRevisionForKeyLocally(CLUSTER, FOO_KEY, 
latestFooEntryRevision), 10, 1_000)
+        );
     }
 
     private static IgniteImpl aliveNode() {
         return unwrapIgniteImpl(CLUSTER.aliveNode());
     }
 
-    private static boolean 
allNodesContainsSingleRevisionForKeyLocally(ByteArray key, long revision) {
-        return CLUSTER.runningNodes()
-                .map(TestWrappers::unwrapIgniteImpl)
-                .map(IgniteImpl::metaStorageManager)
-                .map(metaStorageManager -> 
collectRevisionsLocally(metaStorageManager, key))
-                .allMatch(keyRevisions -> keyRevisions.size() == 1 && 
keyRevisions.contains(revision));
-    }
-
     private static void watchExact(MetaStorageManager metaStorageManager, 
ByteArray key, CountDownLatch latch) {
         metaStorageManager.registerExactWatch(key, new WatchListener() {
             @Override
@@ -127,31 +109,6 @@ public class ItMetaStorageCompactionTriggerTest extends 
ClusterPerClassIntegrati
         });
     }
 
-    private static long latestKeyRevision(MetaStorageManager 
metaStorageManager, ByteArray key) {
-        CompletableFuture<Entry> latestEntryFuture = 
metaStorageManager.get(key);
-        assertThat(latestEntryFuture.thenApply(Entry::empty), willBe(false));
-
-        return latestEntryFuture.join().revision();
-    }
-
-    private static Set<Long> collectRevisionsLocally(MetaStorageManager 
metaStorageManager, ByteArray key) {
-        var res = new HashSet<Long>();
-
-        for (int i = 0; i <= metaStorageManager.appliedRevision(); i++) {
-            try {
-                Entry entry = metaStorageManager.getLocally(key, i);
-
-                if (!entry.empty()) {
-                    res.add(entry.revision());
-                }
-            } catch (CompactedException ignore) {
-                // Do nothing.
-            }
-        }
-
-        return res;
-    }
-
     private void transferMetastorageLeadershipToAnotherNode() throws Exception 
{
         RaftGroupService raftGroupService = 
CLUSTER.leaderServiceFor(MetastorageGroupId.INSTANCE);
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionsCommand.java
similarity index 73%
copy from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
copy to 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionsCommand.java
index 8c72249c39..7084056c28 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionsCommand.java
@@ -17,10 +17,15 @@
 
 package org.apache.ignite.internal.metastorage.command;
 
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import org.apache.ignite.internal.raft.ReadCommand;
 
-/** Get command for MetaStorageCommandListener that retrieves current 
revision. */
-@Transferable(MetastorageCommandsMessageGroup.GET_CURRENT_REVISION)
-public interface GetCurrentRevisionCommand extends ReadCommand {
+/**
+ * Get command for {@link MetaStorageListener} that retrieves the {@link 
RevisionsInfo current metastorage revisions} from the
+ * leader.
+ */
+@Transferable(MetastorageCommandsMessageGroup.GET_CURRENT_REVISIONS)
+public interface GetCurrentRevisionsCommand extends ReadCommand {
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
index 86f87e7846..ebeff9d5dc 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
@@ -38,8 +38,8 @@ public interface MetastorageCommandsMessageGroup {
     /** Message type for {@link GetAllCommand}. */
     short GET_ALL = 30;
 
-    /** Message type for {@link GetCurrentRevisionCommand}. */
-    short GET_CURRENT_REVISION = 33;
+    /** Message type for {@link GetCurrentRevisionsCommand}. */
+    short GET_CURRENT_REVISIONS = 33;
 
     /** Message type for {@link GetChecksumCommand}. */
     short GET_CHECKSUM = 34;
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/response/RevisionsInfo.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/response/RevisionsInfo.java
new file mode 100644
index 0000000000..c8f4214c9b
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/response/RevisionsInfo.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.command.response;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.metastorage.Revisions;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import org.apache.ignite.internal.tostring.S;
+
+/** Information about metastorage revisions. */
+public class RevisionsInfo implements Serializable {
+    private static final long serialVersionUID = -1479528194130161192L;
+
+    private final long revision;
+
+    private final long compactionRevision;
+
+    /**
+     * Constructor.
+     *
+     * @param revision Metastorage revision.
+     * @param compactionRevision Metastorage compaction revision.
+     */
+    public RevisionsInfo(long revision, long compactionRevision) {
+        this.revision = revision;
+        this.compactionRevision = compactionRevision;
+    }
+
+    /** Returns metastorage revision. */
+    public long revision() {
+        return revision;
+    }
+
+    /**
+     * Returns metastorage compaction revision of the up to which (inclusive) 
key versions will be deleted and when trying to read them,
+     * {@link CompactedException} will occur.
+     */
+    public long compactionRevision() {
+        return compactionRevision;
+    }
+
+    /** Converts to {@link Revisions}. */
+    public Revisions toRevisions() {
+        return new Revisions(revision, compactionRevision);
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+
+    /** Converts to {@link RevisionsInfo}. */
+    public static RevisionsInfo of(Revisions currentRevisions) {
+        return new RevisionsInfo(currentRevisions.revision(), 
currentRevisions.compactionRevision());
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
index 1a247f9501..8731697a14 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
+import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -41,6 +42,8 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.command.CompactionCommand;
 import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
@@ -65,6 +68,14 @@ import org.jetbrains.annotations.Nullable;
  *     <li>Metastorage leader locally gets notification of the completion of 
the local compaction for the new revision.</li>
  *     <li>Metastorage leader locally schedules a new start of compaction.</li>
  * </ol>
+ *
+ * <p>About recovery:</p>
+ * <ul>
+ *     <li>At the start of the component, we will start a local compaction for 
the compaction revision from
+ *     {@link MetaStorageManager#recoveryFinishedFuture}.</li>
+ *     <li>{@link CompactionCommand}s that were received before the {@link 
MetaStorageManager#deployWatches} will start a local compaction
+ *     in the {@link MetaStorageManager#deployWatches}.</li>
+ * </ul>
  */
 // TODO: IGNITE-23280 Turn on compaction
 public class MetaStorageCompactionTrigger implements IgniteComponent {
@@ -140,10 +151,12 @@ public class MetaStorageCompactionTrigger implements 
IgniteComponent {
             lock.lock();
 
             try {
-                started = true;
-
                 config.init();
 
+                startCompactionOnRecoveryInBackground();
+
+                started = true;
+
                 scheduleNextCompactionBusy();
 
                 return nullCompletedFuture();
@@ -319,4 +332,35 @@ public class MetaStorageCompactionTrigger implements 
IgniteComponent {
             lock.unlock();
         }
     }
+
+    private void startCompactionOnRecoveryInBackground() {
+        CompletableFuture<Revisions> recoveryFuture = 
metaStorageManager.recoveryFinishedFuture();
+
+        assert recoveryFuture.isDone();
+
+        long recoveredCompactionRevision = 
recoveryFuture.join().compactionRevision();
+
+        if (recoveredCompactionRevision != -1) {
+            runAsync(() -> inBusyLockSafe(busyLock, () -> 
storage.compact(recoveredCompactionRevision)), compactionExecutor)
+                    .whenComplete((unused, throwable) -> {
+                        if (throwable != null) {
+                            Throwable cause = unwrapCause(throwable);
+
+                            if (!(cause instanceof NodeStoppingException)) {
+                                LOG.error(
+                                        "Unknown error during metastore 
compaction launched on node recovery: [compactionRevision={}]",
+                                        cause,
+                                        recoveredCompactionRevision
+                                );
+                            }
+                        } else {
+                            LOG.info(
+                                    "Metastorage compaction launched during 
node recovery has been successfully completed: "
+                                            + "[compactionRevision={}]",
+                                    recoveredCompactionRevision
+                            );
+                        }
+                    });
+        }
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 50880831a4..7fb0f640d6 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -41,7 +41,6 @@ import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
-import java.util.function.LongConsumer;
 import java.util.function.Supplier;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.ClusterState;
@@ -63,6 +62,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.command.CompactionCommand;
 import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
@@ -139,10 +139,10 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
     private final AtomicBoolean isStopped = new AtomicBoolean();
 
     /**
-     * Future which completes when MetaStorage manager finished local 
recovery. The value of the future is the revision which must be used
+     * Future which completes when MetaStorage manager finished local 
recovery. The value of the future is the revisions which must be used
      * for state recovery by other components.
      */
-    private final CompletableFuture<Long> recoveryFinishedFuture = new 
CompletableFuture<>();
+    private final CompletableFuture<Revisions> recoveryFinishedFuture = new 
CompletableFuture<>();
 
     /**
      * Future that gets completed after {@link #deployWatches} method has been 
called.
@@ -196,6 +196,8 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
     private final MetastorageDivergencyValidator divergencyValidator = new 
MetastorageDivergencyValidator();
 
+    private final RecoveryRevisionsListenerImpl recoveryRevisionsListener;
+
     /**
      * The constructor.
      *
@@ -238,6 +240,9 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         this.readOperationFromLeaderForCompactionTracker = 
readOperationForCompactionTracker;
 
         learnerManager = new MetaStorageLearnerManager(busyLock, 
logicalTopologyService, metaStorageSvcFut);
+
+        recoveryRevisionsListener = new 
RecoveryRevisionsListenerImpl(busyLock, recoveryFinishedFuture, storage);
+        storage.setRecoveryRevisionsListener(recoveryRevisionsListener);
     }
 
     /**
@@ -311,75 +316,39 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         electionListeners.add(listener);
     }
 
-    private CompletableFuture<Long> recover(MetaStorageService service) {
-        if (!busyLock.enterBusy()) {
-            return failedFuture(new NodeStoppingException());
-        }
-
-        try {
-            service.currentRevision().whenComplete((targetRevision, throwable) 
-> {
-                if (throwable != null) {
-                    recoveryFinishedFuture.completeExceptionally(throwable);
-
-                    return;
-                }
-
-                LOG.info("Performing MetaStorage recovery from revision {} to 
{}", storage.revision(), targetRevision);
-
-                assert targetRevision != null;
-
-                listenForRecovery(targetRevision);
-            }).whenComplete((res, ex) -> {
-                if (ex != null) {
-                    LOG.info("Recovery failed", ex);
-
-                    recoveryFinishedFuture.completeExceptionally(ex);
-                }
-            });
-
-            return recoveryFinishedFuture;
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    private void listenForRecovery(long targetRevision) {
-        LongConsumer listener = storageRevision -> {
-            if (!busyLock.enterBusy()) {
-                recoveryFinishedFuture.completeExceptionally(new 
NodeStoppingException());
-
-                return;
-            }
-
-            try {
-                if (storageRevision < targetRevision) {
-                    return;
-                }
-
-                storage.setRecoveryRevisionListener(null);
-
-                finishRecovery(targetRevision);
-            } finally {
-                busyLock.leaveBusy();
-            }
-        };
+    private CompletableFuture<?> recover(MetaStorageService service) {
+        return inBusyLockAsync(busyLock, () -> {
+            service.currentRevisions()
+                    .thenAccept(targetRevisions -> {
+                        assert targetRevisions != null;
 
-        storage.setRecoveryRevisionListener(listener);
+                        LOG.info("Performing MetaStorage recovery: [from={}, 
to={}]", storage.revisions(), targetRevisions);
 
-        // Storage might be already up-to-date, so check here manually after 
setting the listener.
-        listener.accept(storage.revision());
-    }
+                        
recoveryRevisionsListener.setTargetRevisions(targetRevisions.toRevisions());
+                    }).whenComplete((res, throwable) -> {
+                        if (throwable != null) {
+                            
recoveryFinishedFuture.completeExceptionally(throwable);
+                        }
+                    });
 
-    private void finishRecovery(long targetRevision) {
-        appliedRevision = targetRevision;
+            return recoveryFinishedFuture
+                    .thenAccept(revisions -> {
+                        long recoveryRevision = revisions.revision();
 
-        if (targetRevision > 0) {
-            
clusterTime.updateSafeTime(storage.timestampByRevision(targetRevision));
-        }
+                        appliedRevision = recoveryRevision;
 
-        if (recoveryFinishedFuture.complete(targetRevision)) {
-            LOG.info("Finished MetaStorage recovery");
-        }
+                        if (recoveryRevision > 0) {
+                            
clusterTime.updateSafeTime(storage.timestampByRevision(recoveryRevision));
+                        }
+                    })
+                    .whenComplete((revisions, throwable) -> {
+                        if (throwable != null) {
+                            LOG.info("Recovery failed", throwable);
+                        } else {
+                            LOG.info("Finished MetaStorage recovery");
+                        }
+                    });
+        });
     }
 
     private CompletableFuture<MetaStorageServiceImpl> 
reenterIfNeededAndInitializeMetaStorage(
@@ -694,6 +663,9 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
         storage.start();
 
+        // Safe because we haven't started raft nodes yet and so no one has to 
update storage locally.
+        recoveryRevisionsListener.onUpdate(storage.revisions());
+
         cmgMgr.metaStorageInfo()
                 .thenCombine(cmgMgr.clusterState(), 
MetaStorageInfoAndClusterState::new)
                 .thenCompose(infoAndState -> {
@@ -811,8 +783,8 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
         try {
             return recoveryFinishedFuture
-                    .thenAccept(revision -> inBusyLock(busyLock, () -> {
-                        storage.startWatches(revision + 1, new 
WatchEventHandlingCallback() {
+                    .thenAccept(revisions -> inBusyLock(busyLock, () -> {
+                        storage.startWatches(revisions.revision() + 1, new 
WatchEventHandlingCallback() {
                             @Override
                             public void onSafeTimeAdvanced(HybridTimestamp 
newSafeTime) {
                                 
MetaStorageManagerImpl.this.onSafeTimeAdvanced(newSafeTime);
@@ -1065,7 +1037,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
     }
 
     @Override
-    public CompletableFuture<Long> recoveryFinishedFuture() {
+    public CompletableFuture<Revisions> recoveryFinishedFuture() {
         return recoveryFinishedFuture;
     }
 
@@ -1167,9 +1139,9 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         storage.unregisterRevisionUpdateListener(listener);
     }
 
-    /** Explicitly notifies revision update listeners. */
+    /** Explicitly notifies revisions update listeners. */
     public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart() {
-        return 
recoveryFinishedFuture.thenCompose(storage::notifyRevisionUpdateListenerOnStart);
+        return 
recoveryFinishedFuture.thenApply(Revisions::revision).thenCompose(storage::notifyRevisionUpdateListenerOnStart);
     }
 
     /**
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
index 577c3b6319..b36d754185 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.command.response.ChecksumInfo;
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -249,10 +250,8 @@ public interface MetaStorageService extends 
ManuallyCloseable {
      */
     Publisher<Entry> prefix(ByteArray prefix, long revUpperBound);
 
-    /**
-     * Returns a future which will hold current revision of the metastorage 
leader.
-     */
-    CompletableFuture<Long> currentRevision();
+    /** Returns a future which will hold {@link RevisionsInfo current 
revisions} of the metastorage leader. */
+    CompletableFuture<RevisionsInfo> currentRevisions();
 
     /**
      * Returns information about a revision checksum on the leader.
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index 5f867370ff..e7c5a5b7d6 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -41,7 +41,7 @@ import 
org.apache.ignite.internal.metastorage.command.EvictIdempotentCommandsCac
 import org.apache.ignite.internal.metastorage.command.GetAllCommand;
 import org.apache.ignite.internal.metastorage.command.GetChecksumCommand;
 import org.apache.ignite.internal.metastorage.command.GetCommand;
-import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
+import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionsCommand;
 import org.apache.ignite.internal.metastorage.command.InvokeCommand;
 import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
 import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
@@ -51,6 +51,7 @@ import 
org.apache.ignite.internal.metastorage.command.RemoveAllCommand;
 import org.apache.ignite.internal.metastorage.command.RemoveCommand;
 import org.apache.ignite.internal.metastorage.command.SyncTimeCommand;
 import org.apache.ignite.internal.metastorage.command.response.ChecksumInfo;
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -260,8 +261,8 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
     }
 
     @Override
-    public CompletableFuture<Long> currentRevision() {
-        GetCurrentRevisionCommand cmd = 
context.commandsFactory().getCurrentRevisionCommand().build();
+    public CompletableFuture<RevisionsInfo> currentRevisions() {
+        GetCurrentRevisionsCommand cmd = 
context.commandsFactory().getCurrentRevisionsCommand().build();
 
         return context.raftService().run(cmd);
     }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/RecoveryRevisionsListenerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/RecoveryRevisionsListenerImpl.java
new file mode 100644
index 0000000000..6498fd502b
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/RecoveryRevisionsListenerImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.metastorage.Revisions;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.RecoveryRevisionsListener;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Implementation of {@link RecoveryRevisionsListener}. */
+class RecoveryRevisionsListenerImpl implements RecoveryRevisionsListener {
+    private final IgniteSpinBusyLock busyLock;
+
+    private final CompletableFuture<Revisions> recoveryFinishFuture;
+
+    private final KeyValueStorage storage;
+
+    private final ReentrantLock lock = new ReentrantLock();
+
+    /** Guarded by {@link #lock}. */
+    private Revisions targetRevisions;
+
+    /** Guarded by {@link #lock}. */
+    private Revisions currentRevisions;
+
+    RecoveryRevisionsListenerImpl(
+            IgniteSpinBusyLock busyLock,
+            CompletableFuture<Revisions> recoveryFinishFuture,
+            KeyValueStorage storage
+    ) {
+        this.busyLock = busyLock;
+        this.recoveryFinishFuture = recoveryFinishFuture;
+        this.storage = storage;
+    }
+
+    @Override
+    public void onUpdate(Revisions currentRevisions) {
+        lock.lock();
+
+        try {
+            this.currentRevisions = currentRevisions;
+
+            completeRecoveryFinishFutureIfPossible();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    void setTargetRevisions(Revisions targetRevisions) {
+        lock.lock();
+
+        try {
+            this.targetRevisions = targetRevisions;
+
+            completeRecoveryFinishFutureIfPossible();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void completeRecoveryFinishFutureIfPossible() {
+        if (!busyLock.enterBusy()) {
+            recoveryFinishFuture.completeExceptionally(new 
NodeStoppingException());
+        }
+
+        try {
+            if (targetRevisions == null
+                    || currentRevisions == null
+                    || currentRevisions.revision() < targetRevisions.revision()
+                    || currentRevisions.compactionRevision() < 
targetRevisions.compactionRevision()) {
+                return;
+            }
+
+            storage.setRecoveryRevisionsListener(null);
+
+            recoveryFinishFuture.complete(currentRevisions);
+        } catch (Throwable t) {
+            recoveryFinishFuture.completeExceptionally(t);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
index f3fc6cc479..9a77c26a8c 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -34,7 +34,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.LongConsumer;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -42,6 +41,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.CompactionRevisionUpdateListener;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.apache.ignite.internal.metastorage.impl.EntryImpl;
@@ -61,11 +61,12 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     protected final WatchProcessor watchProcessor;
 
     /**
-     * Revision listener for recovery only. Notifies {@link 
MetaStorageManagerImpl} of revision update.
+     * Revision listener for recovery only. Notifies {@link 
MetaStorageManagerImpl} of current revisions update, {@code null} if recovery
+     * is complete.
      *
      * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
      */
-    private @Nullable LongConsumer recoveryRevisionListener;
+    private @Nullable RecoveryRevisionsListener recoveryRevisionListener;
 
     /**
      * Revision. Will be incremented for each single-entry or multi-entry 
update operation.
@@ -88,6 +89,15 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     /** Tracks only cursors, since reading a single entry or a batch is done 
entirely under {@link #rwLock}. */
     protected final ReadOperationForCompactionTracker 
readOperationForCompactionTracker;
 
+    /**
+     * Events for notification of the {@link WatchProcessor} that were created 
before the {@link #startWatches start of watches}, after the
+     * start of watches there will be {@code null}. Events are sorted by 
{@link NotifyWatchProcessorEvent#timestamp} and are expected to
+     * have no duplicates.
+     *
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+     */
+    protected @Nullable TreeSet<NotifyWatchProcessorEvent> 
notifyWatchProcessorEventsBeforeStartingWatches = new TreeSet<>();
+
     /**
      * Constructor.
      *
@@ -115,11 +125,20 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     protected abstract Value valueForOperation(byte[] key, long revision);
 
     /**
-     * Returns {@code true} if the storage is in the recovery state.
+     * Returns {@code true} if the metastorage is in the recovery state.
+     *
+     * <p>Method is expected to be invoked under {@link #rwLock}.</p>
+     */
+    private boolean isInRecoveryState() {
+        return recoveryRevisionListener != null;
+    }
+
+    /**
+     * Returns {@code true} if the watches have {@link #startWatches started}.
      *
      * <p>Method is expected to be invoked under {@link #rwLock}.</p>
      */
-    protected abstract boolean isInRecoveryState();
+    protected abstract boolean areWatchesStarted();
 
     @Override
     public Entry get(byte[] key) {
@@ -214,6 +233,8 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
             assertCompactionRevisionLessThanCurrent(revision, rev);
 
             compactionRevision = revision;
+
+            notifyRevisionsUpdate();
         } finally {
             rwLock.writeLock().unlock();
         }
@@ -243,8 +264,12 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
 
             if (isInRecoveryState()) {
                 setCompactionRevision(compactionRevision);
-            } else {
+            } else if (areWatchesStarted()) {
                 watchProcessor.updateCompactionRevision(compactionRevision, 
context.timestamp);
+            } else {
+                var notifyWatchesEvent = new 
UpdateCompactionRevisionEvent(compactionRevision, context.timestamp);
+
+                
addToNotifyWatchProcessorEventsBeforeStartingWatches(notifyWatchesEvent);
             }
         } finally {
             rwLock.writeLock().unlock();
@@ -289,7 +314,7 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     }
 
     @Override
-    public void setRecoveryRevisionListener(@Nullable LongConsumer listener) {
+    public void setRecoveryRevisionsListener(@Nullable 
RecoveryRevisionsListener listener) {
         rwLock.writeLock().lock();
 
         try {
@@ -339,10 +364,10 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     }
 
     /** Notifies of revision update. Must be called under the {@link #rwLock}. 
*/
-    protected void notifyRevisionUpdate() {
+    protected void notifyRevisionsUpdate() {
         if (recoveryRevisionListener != null) {
             // Listener must be invoked only on recovery, after recovery 
listener must be null.
-            recoveryRevisionListener.accept(rev);
+            recoveryRevisionListener.onUpdate(createCurrentRevisions());
         }
     }
 
@@ -430,4 +455,50 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
 
         return res;
     }
+
+    @Override
+    public void advanceSafeTime(KeyValueUpdateContext context) {
+        rwLock.writeLock().lock();
+
+        try {
+            setIndexAndTerm(context.index, context.term);
+
+            if (areWatchesStarted()) {
+                watchProcessor.advanceSafeTime(context.timestamp);
+            }
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public Revisions revisions() {
+        rwLock.readLock().lock();
+
+        try {
+            return createCurrentRevisions();
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private Revisions createCurrentRevisions() {
+        return new Revisions(rev, compactionRevision);
+    }
+
+    protected void 
addToNotifyWatchProcessorEventsBeforeStartingWatches(NotifyWatchProcessorEvent 
event) {
+        assert !areWatchesStarted();
+
+        boolean added = 
notifyWatchProcessorEventsBeforeStartingWatches.add(event);
+
+        assert added : event;
+    }
+
+    protected void drainNotifyWatchProcessorEventsBeforeStartingWatches() {
+        assert !areWatchesStarted();
+
+        notifyWatchProcessorEventsBeforeStartingWatches.forEach(event -> 
event.notify(watchProcessor));
+
+        notifyWatchProcessorEventsBeforeStartingWatches = null;
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 3e74e4e7d9..ec580af62d 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -21,13 +21,13 @@ import java.nio.file.Path;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.LongConsumer;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.CommandId;
 import org.apache.ignite.internal.metastorage.CompactionRevisionUpdateListener;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
@@ -50,11 +50,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
      */
     void start();
 
-    /**
-     * Returns storage revision.
-     *
-     * @return Storage revision.
-     */
+    /** Returns storage revision, {@code 0} if there have been no storage 
update operations yet. */
     long revision();
 
     /**
@@ -477,12 +473,10 @@ public interface KeyValueStorage extends 
ManuallyCloseable {
     long revisionByTimestamp(HybridTimestamp timestamp);
 
     /**
-     * Sets the revision listener. This is needed only for the recovery, after 
that listener must be set to {@code null}.
-     * {@code null} means that we no longer must be notified of revision 
updates for recovery, because recovery is finished.
-     *
-     * @param listener Revision listener.
+     * Sets the revisions listener. This is needed only for the recovery, 
after that listener must be set to {@code null}.
+     * {@code null} means that we no longer must be notified of revisions 
updates for recovery, because recovery is finished.
      */
-    void setRecoveryRevisionListener(@Nullable LongConsumer listener);
+    void setRecoveryRevisionsListener(@Nullable RecoveryRevisionsListener 
listener);
 
     /** Registers a Meta Storage revision update listener. */
     void registerRevisionUpdateListener(RevisionUpdateListener listener);
@@ -548,13 +542,15 @@ public interface KeyValueStorage extends 
ManuallyCloseable {
      * Updates the metastorage compaction revision.
      *
      * <p>Algorithm:</p>
-     * <ul>
+     * <ol>
      *     <li>Invokes {@link #saveCompactionRevision}.</li>
-     *     <li>If the storage is in a recovery state ({@link #startWatches all 
registered watches not started}), then
+     *     <li>If the metastorage is in a recovery state (listener set via 
{@link #setRecoveryRevisionsListener}), then
      *     {@link #setCompactionRevision} is invoked and the current method is 
completed.</li>
+     *     <li>If the watches have <b>not</b> {@link #startWatches started}, 
then it will postpone the execution of step 4 until the
+     *     watches and the current method is completed.</li>
      *     <li>Otherwise, a new task (A) is added to the WatchEvent queue and 
the current method is completed.</li>
      *     <li>Task (A) invokes {@link #setCompactionRevision} and invokes 
{@link CompactionRevisionUpdateListener#onUpdate}.</li>
-     * </ul>
+     * </ol>
      *
      * <p>Compaction revision is expected to be less than the {@link #revision 
current storage revision}.</p>
      *
@@ -585,4 +581,12 @@ public interface KeyValueStorage extends ManuallyCloseable 
{
      * Clears the content of the storage. Should only be called when no one 
else uses this storage.
      */
     void clear();
+
+    /**
+     * Returns current metastorage revisions.
+     *
+     * @see #revision()
+     * @see #getCompactionRevision()
+     */
+    Revisions revisions();
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/NotifyWatchProcessorEvent.java
similarity index 59%
copy from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
copy to 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/NotifyWatchProcessorEvent.java
index 8c72249c39..2373cc3284 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/NotifyWatchProcessorEvent.java
@@ -15,12 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.command;
+package org.apache.ignite.internal.metastorage.server;
 
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 
-/** Get command for MetaStorageCommandListener that retrieves current 
revision. */
-@Transferable(MetastorageCommandsMessageGroup.GET_CURRENT_REVISION)
-public interface GetCurrentRevisionCommand extends ReadCommand {
+/** {@link WatchProcessor} notification events. */
+public interface NotifyWatchProcessorEvent extends 
Comparable<NotifyWatchProcessorEvent> {
+    /** Event timestamp. */
+    HybridTimestamp timestamp();
+
+    /** Notifies the {@link WatchProcessor}. */
+    void notify(WatchProcessor watchProcessor);
+
+    @Override
+    default int compareTo(NotifyWatchProcessorEvent o) {
+        return timestamp().compareTo(o.timestamp());
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/RecoveryRevisionsListener.java
similarity index 56%
rename from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
rename to 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/RecoveryRevisionsListener.java
index 8c72249c39..55d1519cb3 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/RecoveryRevisionsListener.java
@@ -15,12 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.command;
+package org.apache.ignite.internal.metastorage.server;
 
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.metastorage.Revisions;
 
-/** Get command for MetaStorageCommandListener that retrieves current 
revision. */
-@Transferable(MetastorageCommandsMessageGroup.GET_CURRENT_REVISION)
-public interface GetCurrentRevisionCommand extends ReadCommand {
+/** Listener update of {@link Revisions current metastorage revisions}, needed 
only for metastorage recovery. */
+@FunctionalInterface
+public interface RecoveryRevisionsListener {
+    /**
+     * Invoked when one of the {@link Revisions current metastorage revisions} 
is updated.
+     *
+     * <p>Until the method completes its execution, no update or compaction of 
metastorage will occur, so the method should complete its
+     * execution as soon as possible.</p>
+     */
+    void onUpdate(Revisions currentRevisions);
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateCompactionRevisionEvent.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateCompactionRevisionEvent.java
new file mode 100644
index 0000000000..6d7f61e302
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateCompactionRevisionEvent.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/** Notifier of {@link WatchProcessor} about updating the metastorage 
compaction revision. */
+public class UpdateCompactionRevisionEvent implements 
NotifyWatchProcessorEvent {
+    private final long compactionRevision;
+
+    @IgniteToStringInclude
+    private final HybridTimestamp timestamp;
+
+    /** Constructor. */
+    public UpdateCompactionRevisionEvent(long revision, HybridTimestamp 
timestamp) {
+        compactionRevision = revision;
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public HybridTimestamp timestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public void notify(WatchProcessor watchProcessor) {
+        watchProcessor.updateCompactionRevision(compactionRevision, timestamp);
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateEntriesEvent.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateEntriesEvent.java
new file mode 100644
index 0000000000..0c2944e3f3
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateEntriesEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import java.util.List;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/** Notifier of {@link WatchProcessor} about updating metastorage {@link Entry 
entries}. */
+public class UpdateEntriesEvent implements NotifyWatchProcessorEvent {
+    @IgniteToStringInclude
+    private final List<Entry> updatedEntries;
+
+    @IgniteToStringInclude
+    private final HybridTimestamp timestamp;
+
+    /** Constructor. */
+    public UpdateEntriesEvent(List<Entry> updatedEntries, HybridTimestamp 
timestamp) {
+        this.updatedEntries = updatedEntries;
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public HybridTimestamp timestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public void notify(WatchProcessor watchProcessor) {
+        watchProcessor.notifyWatches(updatedEntries, timestamp);
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 67a88affc9..d6a30e2dd8 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -63,6 +63,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -87,8 +89,10 @@ import org.apache.ignite.internal.metastorage.server.If;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext;
 import org.apache.ignite.internal.metastorage.server.MetastorageChecksum;
+import org.apache.ignite.internal.metastorage.server.NotifyWatchProcessorEvent;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
 import org.apache.ignite.internal.metastorage.server.Statement;
+import org.apache.ignite.internal.metastorage.server.UpdateEntriesEvent;
 import org.apache.ignite.internal.metastorage.server.Value;
 import 
org.apache.ignite.internal.metastorage.server.WatchEventHandlingCallback;
 import org.apache.ignite.internal.raft.IndexWithTerm;
@@ -228,15 +232,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
      */
     private final AtomicReference<RecoveryStatus> recoveryStatus = new 
AtomicReference<>(RecoveryStatus.INITIAL);
 
-    /**
-     * Buffer used to cache new events while an event replay is in progress. 
After replay finishes, the cache gets drained and is never
-     * used again.
-     *
-     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
-     */
-    @Nullable
-    private List<UpdatedEntries> eventCache;
-
     /**
      * Current list of updated entries.
      *
@@ -491,7 +486,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
                 compactionRevision = bytesToLong(compactionRevisionBytes);
             }
 
-            notifyRevisionUpdate();
+            notifyRevisionsUpdate();
         } catch (MetaStorageException e) {
             throw e;
         } catch (Exception e) {
@@ -652,7 +647,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
         queueWatchEvent();
 
-        notifyRevisionUpdate();
+        notifyRevisionsUpdate();
     }
 
     private boolean validateNoChecksumConflict(long newRev, long newChecksum) 
throws RocksDBException {
@@ -926,7 +921,19 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         }
 
         if (currentRevision != 0) {
-            replayUpdates(startRevision, currentRevision);
+            Set<NotifyWatchProcessorEvent> fromStorage = 
collectNotifyWatchProcessorEventsFromStorage(startRevision, currentRevision);
+
+            rwLock.writeLock().lock();
+
+            try {
+                
notifyWatchProcessorEventsBeforeStartingWatches.addAll(fromStorage);
+
+                drainNotifyWatchProcessorEventsBeforeStartingWatches();
+
+                recoveryStatus.set(RecoveryStatus.DONE);
+            } finally {
+                rwLock.writeLock().unlock();
+            }
         }
     }
 
@@ -1090,17 +1097,14 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
                 updatedEntries.clear();
 
                 break;
-
             case IN_PROGRESS:
-                // Buffer the event while event replay is still in progress.
-                if (eventCache == null) {
-                    eventCache = new ArrayList<>();
-                }
+                UpdatedEntries copy = updatedEntries.transfer();
 
-                eventCache.add(updatedEntries.transfer());
+                var event = new UpdateEntriesEvent(copy.updatedEntries, 
copy.ts);
 
-                break;
+                addToNotifyWatchProcessorEventsBeforeStartingWatches(event);
 
+                break;
             default:
                 notifyWatches();
 
@@ -1115,19 +1119,18 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         watchProcessor.notifyWatches(copy.updatedEntries, copy.ts);
     }
 
-    private void replayUpdates(long lowerRevision, long upperRevision) {
+    private Set<NotifyWatchProcessorEvent> 
collectNotifyWatchProcessorEventsFromStorage(long lowerRevision, long 
upperRevision) {
         long minWatchRevision = Math.max(lowerRevision, 
watchProcessor.minWatchRevision().orElse(-1));
 
-        if (minWatchRevision == -1 || minWatchRevision > upperRevision) {
-            // No events to replay, we can start processing more recent events 
from the event queue.
-            finishReplay();
-
-            return;
+        if (minWatchRevision > upperRevision) {
+            return Set.of();
         }
 
         var updatedEntries = new ArrayList<Entry>();
         HybridTimestamp ts = null;
 
+        var events = new TreeSet<NotifyWatchProcessorEvent>();
+
         try (
                 var upperBound = new Slice(longToBytes(upperRevision + 1));
                 var options = new 
ReadOptions().setIterateUpperBound(upperBound);
@@ -1149,7 +1152,11 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
                         assert ts != null : revision;
 
-                        watchProcessor.notifyWatches(updatedEntriesCopy, ts);
+                        var event = new UpdateEntriesEvent(updatedEntriesCopy, 
ts);
+
+                        boolean added = events.add(event);
+
+                        assert added : event;
 
                         updatedEntries.clear();
 
@@ -1173,15 +1180,19 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
                 throw new MetaStorageException(OP_EXECUTION_ERR, e);
             }
 
-            // Notify about the events left after finishing the loop above.
+            // Adds event left after finishing the loop above.
             if (!updatedEntries.isEmpty()) {
                 assert ts != null;
 
-                watchProcessor.notifyWatches(updatedEntries, ts);
+                var event = new UpdateEntriesEvent(updatedEntries, ts);
+
+                boolean added = events.add(event);
+
+                assert added : event;
             }
         }
 
-        finishReplay();
+        return events;
     }
 
     @Override
@@ -1229,28 +1240,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         }
     }
 
-    private void finishReplay() {
-        // Take the lock to drain the event cache and prevent new events from 
being cached. Since event notification is asynchronous,
-        // this lock shouldn't be held for long.
-        rwLock.writeLock().lock();
-
-        try {
-            if (eventCache != null) {
-                eventCache.forEach(entries -> {
-                    assert entries.ts != null;
-
-                    watchProcessor.notifyWatches(entries.updatedEntries, 
entries.ts);
-                });
-
-                eventCache = null;
-            }
-
-            recoveryStatus.set(RecoveryStatus.DONE);
-        } finally {
-            rwLock.writeLock().unlock();
-        }
-    }
-
     @TestOnly
     public Path getDbPath() {
         return dbPath;
@@ -1296,21 +1285,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         }
     }
 
-    @Override
-    public void advanceSafeTime(KeyValueUpdateContext context) {
-        rwLock.writeLock().lock();
-
-        try {
-            setIndexAndTerm(context.index, context.term);
-
-            if (!isInRecoveryState()) {
-                watchProcessor.advanceSafeTime(context.timestamp);
-            }
-        } finally {
-            rwLock.writeLock().unlock();
-        }
-    }
-
     @Override
     protected void saveCompactionRevision(long revision, KeyValueUpdateContext 
context, boolean advanceSafeTime) {
         try (WriteBatch batch = new WriteBatch()) {
@@ -1320,7 +1294,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
             db.write(defaultWriteOptions, batch);
 
-            if (advanceSafeTime && !isInRecoveryState()) {
+            if (advanceSafeTime && areWatchesStarted()) {
                 watchProcessor.advanceSafeTime(context.timestamp);
             }
         } catch (Throwable t) {
@@ -1512,8 +1486,8 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
     }
 
     @Override
-    protected boolean isInRecoveryState() {
-        return recoveryStatus.get() != RecoveryStatus.DONE;
+    protected boolean areWatchesStarted() {
+        return recoveryStatus.get() == RecoveryStatus.DONE;
     }
 
     private @Nullable Value getValueForOperationNullable(byte[] key, long 
revision) {
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index f299d8ac93..6d4c82a788 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -29,15 +29,17 @@ import java.util.List;
 import java.util.function.Consumer;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.command.GetAllCommand;
 import org.apache.ignite.internal.metastorage.command.GetChecksumCommand;
 import org.apache.ignite.internal.metastorage.command.GetCommand;
-import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
+import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionsCommand;
 import org.apache.ignite.internal.metastorage.command.GetPrefixCommand;
 import org.apache.ignite.internal.metastorage.command.GetRangeCommand;
 import org.apache.ignite.internal.metastorage.command.PaginationCommand;
 import org.apache.ignite.internal.metastorage.command.response.BatchResponse;
 import org.apache.ignite.internal.metastorage.command.response.ChecksumInfo;
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
 import org.apache.ignite.internal.metastorage.server.ChecksumAndRevisions;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
@@ -148,10 +150,10 @@ public class MetaStorageListener implements 
RaftGroupListener, BeforeApplyHandle
                     byte[] keyTo = storage.nextKey(prefix);
 
                     clo.result(handlePaginationCommand(keyFrom, keyTo, 
prefixCmd));
-                } else if (command instanceof GetCurrentRevisionCommand) {
-                    long revision = storage.revision();
+                } else if (command instanceof GetCurrentRevisionsCommand) {
+                    Revisions currentRevisions = storage.revisions();
 
-                    clo.result(revision);
+                    clo.result(RevisionsInfo.of(currentRevisions));
                 } else if (command instanceof GetChecksumCommand) {
                     ChecksumAndRevisions checksumInfo = 
storage.checksumAndRevisions(((GetChecksumCommand) command).revision());
 
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
index 7ef5a979a4..f766997ae4 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
@@ -37,7 +37,8 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
+import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionsCommand;
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
 import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
@@ -82,7 +83,8 @@ public class MetaStorageDeployWatchesCorrectnessTest extends 
IgniteAbstractTest
         when(clusterService.nodeName()).thenReturn(mcNodeName);
         when(raftManager.startRaftGroupNodeAndWaitNodeReady(any(), any(), 
any(), any(), any(), any(), any()))
                 .thenReturn(raftGroupService);
-        
when(raftGroupService.run(any(GetCurrentRevisionCommand.class))).thenAnswer(invocation
 -> completedFuture(0L));
+        when(raftGroupService.run(any(GetCurrentRevisionsCommand.class)))
+                .thenAnswer(invocation -> completedFuture(new RevisionsInfo(0, 
-1)));
 
         var readOperationForCompactionTracker = new 
ReadOperationForCompactionTracker();
 
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
index 77ad87ff8f..1ec9a9f37c 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
@@ -41,7 +41,8 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.ComponentContext;
-import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
+import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionsCommand;
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
 import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
@@ -103,13 +104,13 @@ public class MetaStorageManagerRecoveryTest extends 
BaseIgniteAbstractTest {
         );
     }
 
-    private RaftManager raftManager(long remoteRevision) throws Exception {
+    private static RaftManager raftManager(long remoteRevision) throws 
Exception {
         RaftManager raft = mock(RaftManager.class);
 
         RaftGroupService service = mock(TopologyAwareRaftGroupService.class);
 
-        when(service.run(any(GetCurrentRevisionCommand.class)))
-                .thenAnswer(invocation -> completedFuture(remoteRevision));
+        when(service.run(any(GetCurrentRevisionsCommand.class)))
+                .thenAnswer(invocation -> completedFuture(new 
RevisionsInfo(remoteRevision, -1)));
 
         when(raft.startRaftGroupNodeAndWaitNodeReady(any(), any(), any(), 
any(), any(), any(), any()))
                 .thenAnswer(invocation -> service);
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index 5c47a00d69..d88479b249 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.metastorage.server;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.stream.Collectors.joining;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
@@ -1021,10 +1022,30 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
 
     /** Tests {@link KeyValueStorage#updateCompactionRevision} case from 
method description when storage is in recovery state. */
     @Test
-    void testUpdateCompactionRevisionWithoutStartWatches() {
+    void testUpdateCompactionRevisionAndStorageInRecoveryState() {
+        storage.setRecoveryRevisionsListener(currentRevisions -> {});
         storage.updateCompactionRevision(1, kvContext(clock.now()));
         assertEquals(1, storage.getCompactionRevision());
 
+        storage.setRecoveryRevisionsListener(null);
+        storage.updateCompactionRevision(2, kvContext(clock.now()));
+        assertEquals(1, storage.getCompactionRevision());
+
+        assertThat(
+                allOf(
+                        updateCompactionRevisionInWatchEvenQueue.waitFor(1L),
+                        updateCompactionRevisionInWatchEvenQueue.waitFor(2L)
+                ),
+                willTimeoutFast()
+        );
+    }
+
+    /** Tests {@link KeyValueStorage#updateCompactionRevision} case from 
method description when storage is in recovery state. */
+    @Test
+    void testUpdateCompactionRevisionWithoutStartWatches() {
+        storage.updateCompactionRevision(1, kvContext(clock.now()));
+        assertEquals(-1, storage.getCompactionRevision());
+
         assertThat(updateCompactionRevisionInWatchEvenQueue.waitFor(1L), 
willTimeoutFast());
     }
 
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index e7d525223b..78955c6d81 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -226,7 +226,7 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
 
         notifyWatches();
 
-        notifyRevisionUpdate();
+        notifyRevisionsUpdate();
     }
 
     @Override
@@ -462,23 +462,21 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
         rwLock.readLock().lock();
 
         try {
-            areWatchesEnabled = true;
-
             watchProcessor.setWatchEventHandlingCallback(callback);
 
-            replayUpdates(startRevision);
+            fillNotifyWatchProcessorEventsFromStorage(startRevision);
+
+            drainNotifyWatchProcessorEventsBeforeStartingWatches();
+
+            areWatchesEnabled = true;
         } finally {
             rwLock.readLock().unlock();
         }
     }
 
-    private void replayUpdates(long startRevision) {
+    private void fillNotifyWatchProcessorEventsFromStorage(long startRevision) 
{
         long minWatchRevision = Math.max(startRevision, 
watchProcessor.minWatchRevision().orElse(-1));
 
-        if (minWatchRevision <= 0) {
-            return;
-        }
-
         revsIdx.tailMap(minWatchRevision)
                 .forEach((revision, entries) -> {
                     entries.forEach((key, value) -> {
@@ -487,12 +485,29 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
                         updatedEntries.add(entry);
                     });
 
-                    notifyWatches();
+                    fillNotifyWatchProcessorEventsFromUpdatedEntries();
                 });
     }
 
+    private void fillNotifyWatchProcessorEventsFromUpdatedEntries() {
+        if (updatedEntries.isEmpty()) {
+            return;
+        }
+
+        long revision = updatedEntries.get(0).revision();
+
+        HybridTimestamp ts = revToTsMap.get(revision);
+        assert ts != null : revision;
+
+        var event = new UpdateEntriesEvent(List.copyOf(updatedEntries), ts);
+
+        addToNotifyWatchProcessorEventsBeforeStartingWatches(event);
+
+        updatedEntries.clear();
+    }
+
     private void notifyWatches() {
-        if (isInRecoveryState() || updatedEntries.isEmpty()) {
+        if (!areWatchesStarted() || updatedEntries.isEmpty()) {
             updatedEntries.clear();
 
             return;
@@ -632,6 +647,8 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
             savedCompactionRevision = snapshot.savedCompactionRevision;
             term = snapshot.term;
             index = snapshot.index;
+
+            notifyRevisionsUpdate();
         } catch (Throwable t) {
             throw new MetaStorageException(RESTORING_STORAGE_ERR, t);
         } finally {
@@ -745,28 +762,13 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
         updateRevision(curRev, context);
     }
 
-    @Override
-    public void advanceSafeTime(KeyValueUpdateContext context) {
-        rwLock.writeLock().lock();
-
-        try {
-            setIndexAndTerm(context.index, context.term);
-
-            if (!isInRecoveryState()) {
-                watchProcessor.advanceSafeTime(context.timestamp);
-            }
-        } finally {
-            rwLock.writeLock().unlock();
-        }
-    }
-
     @Override
     public void saveCompactionRevision(long revision, KeyValueUpdateContext 
context, boolean advanceSafeTime) {
         savedCompactionRevision = revision;
 
         setIndexAndTerm(context.index, context.term);
 
-        if (advanceSafeTime && !isInRecoveryState()) {
+        if (advanceSafeTime && areWatchesStarted()) {
             watchProcessor.advanceSafeTime(context.timestamp);
         }
     }
@@ -845,8 +847,8 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
     }
 
     @Override
-    protected boolean isInRecoveryState() {
-        return !areWatchesEnabled;
+    protected boolean areWatchesStarted() {
+        return areWatchesEnabled;
     }
 
     private @Nullable Value getValueNullable(byte[] key, long revision) {
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index f359355777..2836a9a508 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -103,6 +103,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
@@ -256,11 +257,11 @@ public class PartitionReplicaLifecycleManager  extends
             return nullCompletedFuture();
         }
 
-        CompletableFuture<Long> recoveryFinishFuture = 
metaStorageMgr.recoveryFinishedFuture();
+        CompletableFuture<Revisions> recoveryFinishFuture = 
metaStorageMgr.recoveryFinishedFuture();
 
         assert recoveryFinishFuture.isDone();
 
-        long recoveryRevision = recoveryFinishFuture.join();
+        long recoveryRevision = recoveryFinishFuture.join().revision();
 
         cleanUpResourcesForDroppedZonesOnRecovery();
 
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
index 670ef7292a..67adde0cca 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.raft.Loza;
@@ -76,7 +77,7 @@ public class ActiveActorTest extends 
AbstractTopologyAwareGroupServiceTest {
 
     @BeforeEach
     public void setUp() {
-        when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(0L));
+        when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(new 
Revisions(0, -1)));
         when(msm.invoke(any(), any(Operation.class), 
any(Operation.class))).thenReturn(trueCompletedFuture());
         when(msm.getLocally(any(), anyLong())).then(invocation -> 
emptyMetastoreEntry());
         when(msm.getLocally(any(), any(), anyLong())).then(invocation -> 
Cursor.fromIterable(List.of()));
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index 5f85110c7c..14a08087e3 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -87,9 +87,9 @@ public class AssignmentsTracker implements 
AssignmentsPlacementDriver {
     public void startTrack() {
         
msManager.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), 
assignmentsListener);
 
-        msManager.recoveryFinishedFuture().thenAccept(recoveryRevision -> {
+        msManager.recoveryFinishedFuture().thenAccept(recoveryRevisions -> {
             try (Cursor<Entry> cursor = 
msManager.getLocally(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
-                    
ByteArray.fromString(incrementLastChar(STABLE_ASSIGNMENTS_PREFIX)), 
recoveryRevision);
+                    
ByteArray.fromString(incrementLastChar(STABLE_ASSIGNMENTS_PREFIX)), 
recoveryRevisions.revision());
             ) {
                 for (Entry entry : cursor) {
                     if (entry.tombstone()) {
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index ea3e7e4e88..058247af6e 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
@@ -266,11 +267,11 @@ public class PlacementDriverManager implements 
IgniteComponent {
     }
 
     private void recoverInternalComponentsBusy() {
-        CompletableFuture<Long> recoveryFinishedFuture = 
metastore.recoveryFinishedFuture();
+        CompletableFuture<Revisions> recoveryFinishedFuture = 
metastore.recoveryFinishedFuture();
 
         assert recoveryFinishedFuture.isDone();
 
-        long recoveryRevision = recoveryFinishedFuture.join();
+        long recoveryRevision = recoveryFinishedFuture.join().revision();
 
         leaseTracker.startTrack(recoveryRevision);
     }
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
index 9a7cd175c4..813e54a53a 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.OperationImpl;
@@ -127,7 +128,7 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
         
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
         lenient().when(leaseTracker.leasesCurrent()).thenReturn(leases);
         
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).then(i -> 
Lease.emptyLease(i.getArgument(0)));
-        
when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(1L));
+        
when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(new
 Revisions(1, -1)));
         when(metaStorageManager.getLocally(any(ByteArray.class), 
any(ByteArray.class), anyLong())).thenReturn(mcEntriesCursor);
         
when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new 
LogicalTopologySnapshot(1, List.of(node))));
 
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
index 0563fbea01..5ca6e19d4b 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.dsl.Conditions;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
@@ -183,11 +184,11 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
 
         assertThat(metastore.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
-        CompletableFuture<Long> recoveryFinishedFuture = 
metastore.recoveryFinishedFuture();
+        CompletableFuture<Revisions> recoveryFinishedFuture = 
metastore.recoveryFinishedFuture();
 
         assertThat(recoveryFinishedFuture, willCompleteSuccessfully());
 
-        leasePlacementDriver.startTrack(recoveryFinishedFuture.join());
+        
leasePlacementDriver.startTrack(recoveryFinishedFuture.join().revision());
 
         assignmentsPlacementDriver.startTrack();
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 57b44c78e5..2da26a44a1 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -1553,7 +1553,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         TableImpl table = 
unwrapTableImpl(restartedNode.tables().table(TABLE_NAME));
 
-        long recoveryRevision = 
restartedNode.metaStorageManager().recoveryFinishedFuture().join();
+        long recoveryRevision = 
restartedNode.metaStorageManager().recoveryFinishedFuture().join().revision();
 
         PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(nodes.stream().map(IgniteImpl::name)
                 .collect(toSet()), Set.of());
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index a19fcca3e9..ad7c8a2d88 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1737,9 +1737,8 @@ public class IgniteImpl implements Ignite {
         CompletableFuture<Void> startupRevisionUpdate = 
metaStorageMgr.notifyRevisionUpdateListenerOnStart();
 
         return CompletableFuture.allOf(startupConfigurationUpdate, 
startupRevisionUpdate, startFuture)
-                .thenComposeAsync(t -> {
+                .thenComposeAsync(unused -> {
                     // Deploy all registered watches because all components 
are ready and have registered their listeners.
-                    // TODO: IGNITE-23292 Run local metastore compaction after 
start watches for the latest compacted revision
                     return metaStorageMgr.deployWatches();
                 }, startupExecutor);
     }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index bdee06027f..d7b790d217 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -179,7 +179,7 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
     @Override
     public CompletableFuture<Data> readDataOnRecovery() throws 
StorageException {
         CompletableFuture<Data> future = 
metaStorageMgr.recoveryFinishedFuture()
-                .thenApplyAsync(this::readDataOnRecovery0, threadPool);
+                .thenApplyAsync(revisions -> 
readDataOnRecovery0(revisions.revision()), threadPool);
 
         return registerFuture(future);
     }
@@ -329,7 +329,7 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
     @Override
     public CompletableFuture<Long> localRevision() {
         return metaStorageMgr.recoveryFinishedFuture()
-                .thenApply(rev -> metaStorageMgr.getLocally(MASTER_KEY, 
rev).revision());
+                .thenApply(revisions -> metaStorageMgr.getLocally(MASTER_KEY, 
revisions.revision()).revision());
     }
 
     private <T> CompletableFuture<T> registerFuture(CompletableFuture<T> 
future) {
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
index 2ae7bbf4eb..0050fe6604 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -288,11 +289,11 @@ public abstract class BaseIgniteRestartTest extends 
IgniteAbstractTest {
 
         assertThat("Partial node was not started", startFuture, 
willCompleteSuccessfully());
 
-        Long recoveryRevision = 
metaStorageMgr.recoveryFinishedFuture().getNow(null);
+        Revisions recoveryRevisions = 
metaStorageMgr.recoveryFinishedFuture().getNow(null);
 
-        assertNotNull(recoveryRevision);
+        assertNotNull(recoveryRevisions);
 
-        log.info("Completed recovery on partially started node, MetaStorage 
revision recovered to: " + recoveryRevision);
+        log.info("Completed recovery on partially started node, MetaStorage 
revision recovered to: " + recoveryRevisions);
 
         return new PartialNode(
                 name,
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
index b3c040a0ab..683b6d2361 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.schema;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor.INITIAL_TABLE_VERSION;
@@ -51,6 +52,7 @@ import 
org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.sql.ColumnType;
@@ -271,7 +273,7 @@ class SchemaManagerTest extends BaseIgniteAbstractTest {
 
         when(catalogService.latestCatalogVersion()).thenReturn(2);
         
when(catalogService.tables(anyInt())).thenReturn(List.of(tableDescriptorAfterColumnAddition()));
-        
doReturn(CompletableFuture.completedFuture(CAUSALITY_TOKEN_2)).when(metaStorageManager).recoveryFinishedFuture();
+        doReturn(completedFuture(new Revisions(CAUSALITY_TOKEN_2, 
-1))).when(metaStorageManager).recoveryFinishedFuture();
 
         schemaManager = new SchemaManager(registry, catalogService);
         assertThat(schemaManager.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 51ae06fa02..73a2d5a79a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -129,6 +129,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
@@ -610,11 +611,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
             fullStateTransferIndexChooser.start();
 
-            CompletableFuture<Long> recoveryFinishFuture = 
metaStorageMgr.recoveryFinishedFuture();
+            CompletableFuture<Revisions> recoveryFinishFuture = 
metaStorageMgr.recoveryFinishedFuture();
 
             assert recoveryFinishFuture.isDone();
 
-            long recoveryRevision = recoveryFinishFuture.join();
+            long recoveryRevision = recoveryFinishFuture.join().revision();
 
             cleanUpResourcesForDroppedTablesOnRecoveryBusy();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
index afaf721fef..c779af32af 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
@@ -79,6 +79,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.versioned.VersionedSerialization;
 import org.jetbrains.annotations.Nullable;
@@ -398,11 +399,11 @@ public class IndexMetaStorage implements IgniteComponent {
     }
 
     private Map<Integer, IndexMeta> readAllFromMetastoreOnRecovery() {
-        CompletableFuture<Long> recoveryFinishedFuture = 
metaStorageManager.recoveryFinishedFuture();
+        CompletableFuture<Revisions> recoveryFinishedFuture = 
metaStorageManager.recoveryFinishedFuture();
 
         assert recoveryFinishedFuture.isDone();
 
-        long recoveryRevision = recoveryFinishedFuture.join();
+        long recoveryRevision = recoveryFinishedFuture.join().revision();
 
         try (Cursor<Entry> cursor = 
metaStorageManager.prefixLocally(ByteArray.fromString(INDEX_META_VALUE_KEY_PREFIX),
 recoveryRevision)) {
             return cursor.stream()
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 1e5bf4ee0d..7e2f408648 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -96,6 +96,7 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
@@ -680,7 +681,7 @@ public class TableManagerTest extends IgniteAbstractTest {
         
doReturn(mock(PartitionTimestampCursor.class)).when(mvPartitionStorage).scan(any());
         when(txStateStorage.clear()).thenReturn(nullCompletedFuture());
 
-        when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(2L));
+        when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(new 
Revisions(2, -1)));
 
         // For some reason, "when(something).thenReturn" does not work on 
spies, but this notation works.
         createTableManager(tblManagerFut, (mvTableStorage) -> {
@@ -722,7 +723,7 @@ public class TableManagerTest extends IgniteAbstractTest {
         when(msm.invoke(any(), anyList(), 
anyList())).thenReturn(trueCompletedFuture());
         when(msm.get(any())).thenReturn(nullCompletedFuture());
 
-        when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(2L));
+        when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(new 
Revisions(2, -1)));
 
         when(msm.prefixLocally(any(), 
anyLong())).thenReturn(CursorUtils.emptyCursor());
     }


Reply via email to