This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d4c9e8748 IGNITE-19778 Restore components state on metastorage 
recovery (#2266)
4d4c9e8748 is described below

commit 4d4c9e8748bd16e079757cc4f4beac6b8977e3c0
Author: Semyon Danilov <samvi...@yandex.ru>
AuthorDate: Wed Jul 12 12:03:53 2023 +0400

    IGNITE-19778 Restore components state on metastorage recovery (#2266)
---
 .../internal/catalog/storage/UpdateLogImpl.java    |   2 +-
 .../configuration/ConfigurationChanger.java        |   5 -
 .../storage/ConfigurationStorage.java              |  10 -
 .../storage/TestConfigurationStorage.java          |   5 -
 .../java/org/apache/ignite/lang/ByteArray.java     |   7 +
 .../distributionzones/DistributionZoneManager.java |   2 +-
 .../BaseDistributionZoneManagerTest.java           |   2 +-
 .../internal/metastorage/MetaStorageManager.java   |  23 +-
 .../impl/ItMetaStorageManagerImplTest.java         |  71 ------
 ...MetaStorageSafeTimePropagationAbstractTest.java |   2 +-
 .../metastorage/impl/ItMetaStorageWatchTest.java   |   6 +-
 .../metastorage/impl/MetaStorageManagerImpl.java   |  40 ++--
 .../server/persistence/RocksDbKeyValueStorage.java |   6 +-
 .../server/BasicOperationsKeyValueStorageTest.java |   6 +-
 .../server/SimpleInMemoryKeyValueStorage.java      |   6 +-
 .../ignite/internal/BaseIgniteRestartTest.java     |  66 +-----
 .../ItDistributedConfigurationPropertiesTest.java  |   4 +-
 .../ItDistributedConfigurationStorageTest.java     |   4 +-
 .../storage/ItRebalanceDistributedTest.java        |  68 +++---
 .../zones/ItDistributionZonesFilterTest.java       |   7 +
 ...niteDistributionZoneManagerNodeRestartTest.java |   5 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   9 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  37 +---
 .../storage/DistributedConfigurationStorage.java   | 134 +++++------
 .../storage/LocalConfigurationStorage.java         |   5 -
 .../storage/LocalFileConfigurationStorage.java     |   5 -
 .../recovery/ConfigurationCatchUpListener.java     | 117 ----------
 .../recovery/RecoveryCompletionFutureFactory.java  |  51 -----
 .../DistributedConfigurationCatchUpTest.java       | 244 ---------------------
 .../DistributedConfigurationStorageTest.java       |   8 +-
 .../internal/table/distributed/TableManager.java   |  12 +-
 31 files changed, 189 insertions(+), 780 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index f627c51118..1b7994fabc 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -157,7 +157,7 @@ public class UpdateLogImpl implements UpdateLog {
         // TODO: IGNITE-19790 Read range from metastore
         while (true) {
             ByteArray key = CatalogKey.update(ver++);
-            Entry entry = metastore.getLocally(key.bytes(), appliedRevision);
+            Entry entry = metastore.getLocally(key, appliedRevision);
 
             if (entry.empty() || entry.tombstone()) {
                 break;
diff --git 
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
 
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
index 5a01225087..6b115f5995 100644
--- 
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
+++ 
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
@@ -644,11 +644,6 @@ public abstract class ConfigurationChanger implements 
DynamicConfigurationChange
                     rwLock.writeLock().unlock();
                 }
 
-                // Save revisions for recovery.
-                // We execute synchronously to avoid a race between 
notifications about updating the Meta Storage and updating the revision
-                // of the Meta Storage.
-                storage.writeConfigurationRevision(oldStorageRoots.version, 
newStorageRoots.version);
-
                 long notificationNumber = 
notificationListenerCnt.incrementAndGet();
 
                 CompletableFuture<Void> notificationFuture;
diff --git 
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
 
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
index 7315dfd51b..d9663eb284 100644
--- 
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
+++ 
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
@@ -79,16 +79,6 @@ public interface ConfigurationStorage extends 
ManuallyCloseable {
      */
     CompletableFuture<Long> lastRevision();
 
-    /**
-     * Writes previous and current configuration's MetaStorage revision for 
recovery.
-     * We need previous and current for the fail-safety: in case if node fails 
before changing master key on configuration update,
-     * MetaStorage's applied revision will be lower than {@code 
currentRevision} and we will be using previous revision.
-     *
-     * @param prevRevision Previous revision.
-     * @param currentRevision Current revision.
-     */
-    void writeConfigurationRevision(long prevRevision, long currentRevision);
-
     /**
      * Closes the storage.
      */
diff --git 
a/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
 
b/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
index 94193714e4..4231915b31 100644
--- 
a/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
+++ 
b/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
@@ -170,11 +170,6 @@ public class TestConfigurationStorage implements 
ConfigurationStorage {
         return CompletableFuture.completedFuture(version);
     }
 
-    @Override
-    public void writeConfigurationRevision(long prevRevision, long 
currentRevision) {
-        // No-op.
-    }
-
     /**
      * Increase the current revision of the storage.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java 
b/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
index 76129b6d55..93b1e14a99 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
@@ -77,6 +77,13 @@ public final class ByteArray implements 
Comparable<ByteArray> {
         return arr;
     }
 
+    /**
+     * Returns the length of this byte array.
+     */
+    public int length() {
+        return arr.length;
+    }
+
     /** {@inheritDoc} */
     @Override
     public boolean equals(Object o) {
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 03ce9cbaf0..d61f044560 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -828,7 +828,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             ZoneState zoneState = new ZoneState(executor, 
topologyAugmentationMap);
 
-            VaultEntry dataNodes = 
vaultMgr.get(zoneDataNodesKey(zoneId)).join();
+            Entry dataNodes = 
metaStorageManager.getLocally(zoneDataNodesKey(zoneId), revision);
 
             if (dataNodes != null) {
                 String filter = zone.filter();
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
index a8df523fba..9c69f09930 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
@@ -93,7 +93,7 @@ public class BaseDistributionZoneManagerTest extends 
BaseIgniteAbstractTest {
 
         components.add(metaStorageManager);
 
-        ConfigurationStorage cfgStorage = new 
DistributedConfigurationStorage(metaStorageManager, vaultMgr);
+        ConfigurationStorage cfgStorage = new 
DistributedConfigurationStorage(metaStorageManager);
 
         generator = new ConfigurationTreeGenerator(
                 List.of(DistributionZonesConfiguration.KEY),
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 0461d73924..fb37f7f1ab 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -33,6 +33,7 @@ import 
org.apache.ignite.internal.metastorage.dsl.StatementResult;
 import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import 
org.apache.ignite.internal.metastorage.exceptions.OperationTimeoutException;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.jetbrains.annotations.Nullable;
@@ -66,6 +67,9 @@ public interface MetaStorageManager extends IgniteComponent {
      * Returns all entries corresponding to the given key and bounded by given 
revisions.
      * All these entries are ordered by revisions and have the same key.
      * The lower bound and the upper bound are inclusive.
+     *
+     * <p>This method doesn't wait for the storage's revision to become 
greater or equal to the revUpperBound parameter, so it is
+     * up to user to wait for the appropriate time to call this method.
      * TODO: IGNITE-19735 move this method to another interface for 
interaction with local KeyValueStorage.
      *
      * @param key The key.
@@ -80,11 +84,28 @@ public interface MetaStorageManager extends IgniteComponent 
{
      * Returns an entry by the given key and bounded by the given revision. 
The entry is obtained
      * from the local storage.
      *
+     * <p>This method doesn't wait for the storage's revision to become 
greater or equal to the revUpperBound parameter, so it is
+     * up to user to wait for the appropriate time to call this method.
+     *
      * @param key The key.
      * @param revUpperBound The upper bound of revision.
      * @return Value corresponding to the given key.
      */
-    Entry getLocally(byte[] key, long revUpperBound);
+    Entry getLocally(ByteArray key, long revUpperBound);
+
+    /**
+     * Returns cursor by entries which correspond to the given keys range and 
bounded by revision number. The entries in the cursor
+     * are obtained from the local storage.
+     *
+     * <p>This method doesn't wait for the storage's revision to become 
greater or equal to the revUpperBound parameter, so it is
+     * up to user to wait for the appropriate time to call this method.
+     *
+     * @param startKey Start key of range (inclusive).
+     * @param endKey Last key of range (exclusive).
+     * @param revUpperBound Upper bound of revision.
+     * @return Cursor by entries which correspond to the given keys range.
+     */
+    Cursor<Entry> getLocally(ByteArray startKey, ByteArray endKey, long 
revUpperBound);
 
     /**
      * Looks up a timestamp by a revision. This should only be invoked if it 
is guaranteed that the
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index ff2a75c687..d2dbc13721 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.metastorage.impl;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
@@ -31,8 +30,6 @@ import static 
org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
@@ -65,7 +62,6 @@ import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFacto
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
 import org.apache.ignite.lang.ByteArray;
@@ -194,73 +190,6 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
         assertThat(actualKeysFuture, will(contains(key1.bytes(), key2.bytes(), 
key3.bytes())));
     }
 
-    /**
-     * Tests that "watched" Meta Storage keys get persisted in the Vault.
-     */
-    @Test
-    void testWatchEventsPersistence() throws InterruptedException {
-        byte[] value = "value".getBytes(UTF_8);
-
-        var key1 = new ByteArray("foo");
-        var key2 = new ByteArray("bar");
-
-        CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
-                Conditions.notExists(new ByteArray("foo")),
-                List.of(
-                        Operations.put(key1, value),
-                        Operations.put(key2, value)
-                ),
-                List.of(Operations.noop())
-        );
-
-        assertThat(invokeFuture, willBe(true));
-
-        // No data should be persisted until any watches are registered.
-        assertThat(vaultManager.get(key1), willBe(nullValue()));
-        assertThat(vaultManager.get(key2), willBe(nullValue()));
-
-        metaStorageManager.registerExactWatch(key1, new NoOpListener());
-
-        invokeFuture = metaStorageManager.invoke(
-                Conditions.exists(new ByteArray("foo")),
-                List.of(
-                        Operations.put(key1, value),
-                        Operations.put(key2, value)
-                ),
-                List.of(Operations.noop())
-        );
-
-        assertThat(invokeFuture, willBe(true));
-
-        assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() 
== 2, 10_000));
-
-        // Expect that only the watched key is persisted.
-        assertThat(vaultManager.get(key1).thenApply(VaultEntry::value), 
willBe(value));
-        assertThat(vaultManager.get(key2), willBe(nullValue()));
-
-        metaStorageManager.registerExactWatch(key2, new NoOpListener());
-
-        assertThat(metaStorageManager.appliedRevision(), is(2L));
-
-        byte[] newValue = "newValue".getBytes(UTF_8);
-
-        invokeFuture = metaStorageManager.invoke(
-                Conditions.exists(new ByteArray("foo")),
-                List.of(
-                        Operations.put(key1, newValue),
-                        Operations.put(key2, newValue)
-                ),
-                List.of(Operations.noop())
-        );
-
-        assertThat(invokeFuture, willBe(true));
-
-        assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() 
== 3, 10_000));
-
-        assertThat(vaultManager.get(key1).thenApply(VaultEntry::value), 
willBe(newValue));
-        assertThat(vaultManager.get(key2).thenApply(VaultEntry::value), 
willBe(newValue));
-    }
-
     private static class NoOpListener implements WatchListener {
         @Override
         public CompletableFuture<Void> onUpdate(WatchEvent event) {
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
index 068b2b2721..47b8ad923a 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
@@ -45,7 +45,7 @@ public abstract class 
ItMetaStorageSafeTimePropagationAbstractTest extends Abstr
 
     @BeforeEach
     public void startWatches() {
-        storage.startWatches(0, (e, t) -> {
+        storage.startWatches(1, (e, t) -> {
             time.updateSafeTime(t);
 
             return CompletableFuture.completedFuture(null);
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index c0e2d93e9e..e919c6ccbd 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -218,6 +218,10 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
         String name = nodes.get(0).name();
 
         nodes.get(0).cmgManager.initCluster(List.of(name), List.of(name), 
"test");
+
+        for (Node node : nodes) {
+            assertThat(node.metaStorageManager.recoveryFinishedFuture(), 
willCompleteSuccessfully());
+        }
     }
 
     @Test
@@ -312,7 +316,7 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
     }
 
     /**
-     * Tests that metastorage missed metastorage events are replayed after 
deploying watches.
+     * Tests that missed metastorage events are replayed after deploying 
watches.
      */
     @Test
     void testReplayUpdates() throws Exception {
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 5c9c72d8df..10ec576f1a 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultEntry;
@@ -206,7 +207,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
                 assert targetRevision != null;
 
-                listenForRecovery(recoveryFinishedFuture, targetRevision);
+                listenForRecovery(targetRevision);
             });
 
             return recoveryFinishedFuture;
@@ -215,10 +216,10 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         }
     }
 
-    private void listenForRecovery(CompletableFuture<Long> res, long 
targetRevision) {
+    private void listenForRecovery(long targetRevision) {
         storage.setRecoveryRevisionListener(storageRevision -> {
             if (!busyLock.enterBusy()) {
-                res.completeExceptionally(new NodeStoppingException());
+                recoveryFinishedFuture.completeExceptionally(new 
NodeStoppingException());
 
                 return;
             }
@@ -230,7 +231,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
                 storage.setRecoveryRevisionListener(null);
 
-                if (res.complete(targetRevision)) {
+                if (recoveryFinishedFuture.complete(targetRevision)) {
                     LOG.info("Finished MetaStorage recovery");
                 }
             } finally {
@@ -239,7 +240,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         });
 
         if (!busyLock.enterBusy()) {
-            res.completeExceptionally(new NodeStoppingException());
+            recoveryFinishedFuture.completeExceptionally(new 
NodeStoppingException());
 
             return;
         }
@@ -249,7 +250,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
             if (storage.revision() >= targetRevision) {
                 storage.setRecoveryRevisionListener(null);
 
-                if (res.complete(targetRevision)) {
+                if (recoveryFinishedFuture.complete(targetRevision)) {
                     LOG.info("Finished MetaStorage recovery");
                 }
             }
@@ -502,18 +503,23 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     }
 
     @Override
-    public Entry getLocally(byte[] key, long revUpperBound) {
+    public Entry getLocally(ByteArray key, long revUpperBound) {
         if (!busyLock.enterBusy()) {
             throw new IgniteException(new NodeStoppingException());
         }
 
         try {
-            return storage.get(key, revUpperBound);
+            return storage.get(key.bytes(), revUpperBound);
         } finally {
             busyLock.leaveBusy();
         }
     }
 
+    @Override
+    public Cursor<Entry> getLocally(ByteArray startKey, ByteArray endKey, long 
revUpperBound) {
+        return storage.range(startKey.bytes(), endKey.bytes(), revUpperBound);
+    }
+
     @Override
     public HybridTimestamp timestampByRevision(long revision) {
         if (!busyLock.enterBusy()) {
@@ -824,19 +830,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         clusterTime.updateSafeTime(time);
 
         try {
-            CompletableFuture<Void> saveToVaultFuture;
-
-            if (watchEvent.entryEvents().isEmpty()) {
-                saveToVaultFuture = vaultMgr.put(APPLIED_REV_KEY, 
longToBytes(watchEvent.revision()));
-            } else {
-                Map<ByteArray, byte[]> batch = 
IgniteUtils.newHashMap(watchEvent.entryEvents().size() + 1);
-
-                batch.put(APPLIED_REV_KEY, longToBytes(watchEvent.revision()));
-
-                watchEvent.entryEvents().forEach(e -> batch.put(new 
ByteArray(e.newEntry().key()), e.newEntry().value()));
-
-                saveToVaultFuture = vaultMgr.putAll(batch);
-            }
+            CompletableFuture<Void> saveToVaultFuture = 
vaultMgr.put(APPLIED_REV_KEY, longToBytes(watchEvent.revision()));
 
             return saveToVaultFuture.thenRun(() -> appliedRevision = 
watchEvent.revision());
         } finally {
@@ -906,7 +900,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     }
 
     /** Explicitly notifies revision update listeners. */
-    public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long 
newRevision) {
-        return storage.notifyRevisionUpdateListenerOnStart(newRevision);
+    public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart() {
+        return 
recoveryFinishedFuture.thenCompose(storage::notifyRevisionUpdateListenerOnStart);
     }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index ee7f3628f5..abb2d85848 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -992,6 +992,8 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
     @Override
     public void startWatches(long startRevision, OnRevisionAppliedCallback 
revisionCallback) {
+        assert startRevision != 0 : "First meaningful revision is 1";
+
         long currentRevision;
 
         rwLock.readLock().lock();
@@ -1496,9 +1498,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     }
 
     private void replayUpdates(long lowerRevision, long upperRevision) {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-19778 Should be 
Math.max, so we start from the revision that
-        // components restored their state to (lowerRevision).
-        long minWatchRevision = Math.min(lowerRevision, 
watchProcessor.minWatchRevision().orElse(-1));
+        long minWatchRevision = Math.max(lowerRevision, 
watchProcessor.minWatchRevision().orElse(-1));
 
         if (minWatchRevision == -1 || minWatchRevision > upperRevision) {
             // No events to replay, we can start processing more recent events 
from the event queue.
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index d0640a4d9c..198ade0395 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -1969,7 +1969,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
 
         long appliedRevision = storage.revision();
 
-        storage.startWatches(0, (event, ts) -> completedFuture(null));
+        storage.startWatches(1, (event, ts) -> completedFuture(null));
 
         CompletableFuture<byte[]> fut = new CompletableFuture<>();
 
@@ -2310,7 +2310,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
 
         when(mockCallback.onRevisionApplied(any(), 
any())).thenReturn(completedFuture(null));
 
-        storage.startWatches(0, mockCallback);
+        storage.startWatches(1, mockCallback);
 
         putToMs(key, value);
 
@@ -2505,7 +2505,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
             }
         });
 
-        storage.startWatches(0, (event, ts) -> completedFuture(null));
+        storage.startWatches(1, (event, ts) -> completedFuture(null));
 
         return resultFuture;
     }
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index dbc4ea68b1..6760682702 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -483,6 +483,8 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
 
     @Override
     public void startWatches(long startRevision, OnRevisionAppliedCallback 
revisionCallback) {
+        assert startRevision != 0 : "First meaningful revision is 1";
+
         synchronized (mux) {
             areWatchesEnabled = true;
 
@@ -493,9 +495,7 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     }
 
     private void replayUpdates(long startRevision) {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-19778 Should be 
Math.max, so we start from the revision that
-        // components restored their state to (lowerRevision).
-        long minWatchRevision = Math.min(startRevision, 
watchProcessor.minWatchRevision().orElse(-1));
+        long minWatchRevision = Math.max(startRevision, 
watchProcessor.minWatchRevision().orElse(-1));
 
         if (minWatchRevision <= 0) {
             return;
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
index 54c6b5fe73..8604076f57 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
@@ -17,9 +17,9 @@
 
 package org.apache.ignite.internal;
 
-import static 
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -29,7 +29,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.configuration.ConfigurationModule;
@@ -41,20 +40,16 @@ import 
org.apache.ignite.internal.configuration.ConfigurationModules;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
 import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
-import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
 import 
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
-import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.IgniteSystemProperties;
 import org.intellij.lang.annotations.Language;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -239,34 +234,21 @@ public abstract class BaseIgniteRestartTest extends 
IgniteAbstractTest {
             ConfigurationTreeGenerator distributedConfigurationGenerator,
             ConfigurationRegistry clusterConfigRegistry
     ) {
-        AtomicLong lastRevision = new AtomicLong();
-
-        Consumer<Long> revisionCallback0 = rev -> {
-            if (revisionCallback != null) {
-                revisionCallback.accept(rev);
-            }
-
-            lastRevision.set(rev);
-        };
-
-        CompletableFuture<Void> configurationCatchUpFuture = 
RecoveryCompletionFutureFactory.create(
-                clusterCfgMgr,
-                fut -> new TestConfigurationCatchUpListener(cfgStorage, fut, 
revisionCallback0)
-        );
-
         CompletableFuture<?> startFuture = CompletableFuture.allOf(
                 
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
                 clusterConfigRegistry.notifyCurrentConfigurationListeners()
         ).thenCompose(unused ->
                 // Deploy all registered watches because all components are 
ready and have registered their listeners.
-                CompletableFuture.allOf(metaStorageMgr.deployWatches(), 
configurationCatchUpFuture)
+                metaStorageMgr.deployWatches()
         );
 
         assertThat("Partial node was not started", startFuture, 
willCompleteSuccessfully());
 
-        log.info("Completed recovery on partially started node, last revision 
applied: " + lastRevision.get()
-                + ", acceptableDifference: " + 
IgniteSystemProperties.getInteger(CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, 
100)
-        );
+        Long recoveryRevision = 
metaStorageMgr.recoveryFinishedFuture().getNow(null);
+
+        assertNotNull(recoveryRevision);
+
+        log.info("Completed recovery on partially started node, MetaStorage 
revision recovered to: " + recoveryRevision);
 
         return new PartialNode(
                 components,
@@ -333,38 +315,4 @@ public abstract class BaseIgniteRestartTest extends 
IgniteAbstractTest {
             return logicalTopology;
         }
     }
-
-    /**
-     * Configuration catch-up listener for test.
-     */
-    public static class TestConfigurationCatchUpListener extends 
ConfigurationCatchUpListener {
-        /** Callback called on revision update. */
-        private final Consumer<Long> revisionCallback;
-
-        /**
-         * Constructor.
-         *
-         * @param cfgStorage Configuration storage.
-         * @param catchUpFuture Catch-up future.
-         */
-        TestConfigurationCatchUpListener(
-                ConfigurationStorage cfgStorage,
-                CompletableFuture<Void> catchUpFuture,
-                Consumer<Long> revisionCallback
-        ) {
-            super(cfgStorage, catchUpFuture, log);
-
-            this.revisionCallback = revisionCallback;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public CompletableFuture<?> onUpdate(long appliedRevision) {
-            if (revisionCallback != null) {
-                revisionCallback.accept(appliedRevision);
-            }
-
-            return super.onUpdate(appliedRevision);
-        }
-    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 9a98a90d9d..6d50b1e47f 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -184,7 +184,7 @@ public class ItDistributedConfigurationPropertiesTest {
             deployWatchesFut = metaStorageManager.deployWatches();
 
             // create a custom storage implementation that is able to "lose" 
some storage updates
-            var distributedCfgStorage = new 
DistributedConfigurationStorage(metaStorageManager, vaultManager) {
+            var distributedCfgStorage = new 
DistributedConfigurationStorage(metaStorageManager) {
                 /** {@inheritDoc} */
                 @Override
                 public synchronized void 
registerConfigurationListener(ConfigurationStorageListener listener) {
@@ -228,7 +228,7 @@ public class ItDistributedConfigurationPropertiesTest {
             Stream.of(clusterService, raftManager, cmgManager, 
metaStorageManager)
                     .forEach(IgniteComponent::start);
 
-            distributedCfgManager.start();
+            CompletableFuture.runAsync(distributedCfgManager::start);
         }
 
         /**
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 0a88f8fd22..9c8bac497c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -157,7 +157,7 @@ public class ItDistributedConfigurationStorageTest {
 
             deployWatchesFut = metaStorageManager.deployWatches();
 
-            cfgStorage = new 
DistributedConfigurationStorage(metaStorageManager, vaultManager);
+            cfgStorage = new 
DistributedConfigurationStorage(metaStorageManager);
         }
 
         /**
@@ -232,8 +232,6 @@ public class ItDistributedConfigurationStorageTest {
 
             assertThat(node.cfgStorage.write(data, 0), willBe(equalTo(true)));
 
-            node.cfgStorage.writeConfigurationRevision(0, 1);
-
             assertTrue(waitForCondition(
                     () -> node.metaStorageManager.appliedRevision() != 0,
                     3000
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 23fff6513d..77b022c0d4 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.configuration.storage;
 
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.function.Function.identity;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZoneReplicas;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
@@ -56,6 +57,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -168,6 +170,7 @@ import org.apache.ignite.utils.ClusterServiceTestUtils;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -178,6 +181,7 @@ import org.mockito.Mockito;
  */
 @ExtendWith(WorkDirectoryExtension.class)
 @ExtendWith(ConfigurationExtension.class)
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-19506";)
 public class ItRebalanceDistributedTest {
     /** Ignite logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(ItRebalanceDistributedTest.class);
@@ -483,6 +487,7 @@ public class ItRebalanceDistributedTest {
         checkInvokeDestroyedPartitionStorages(evictedNode, TABLE_1_NAME, 0);
     }
 
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19506";)
     @Test
     @UseTestTxStateStorage
     @UseRocksMetaStorage
@@ -711,7 +716,7 @@ public class ItRebalanceDistributedTest {
                     metaStorageConfiguration
             );
 
-            cfgStorage = new 
DistributedConfigurationStorage(metaStorageManager, vaultManager);
+            cfgStorage = new 
DistributedConfigurationStorage(metaStorageManager);
 
             clusterCfgGenerator = new ConfigurationTreeGenerator(
                     List.of(
@@ -863,40 +868,51 @@ public class ItRebalanceDistributedTest {
          * Starts the created components.
          */
         void start() {
-            nodeComponents = List.of(
+            nodeComponents = new CopyOnWriteArrayList<>();
+
+            List<IgniteComponent> firstComponents = List.of(
                     vaultManager,
                     nodeCfgMgr,
                     clusterService,
-                    raftManager,
-                    cmgManager,
-                    metaStorageManager,
-                    clusterCfgMgr,
-                    clockWaiter,
-                    catalogManager,
-                    distributionZoneManager,
-                    replicaManager,
-                    txManager,
-                    baselineMgr,
-                    dataStorageMgr,
-                    schemaManager,
-                    tableManager
+                    raftManager
             );
 
-            nodeComponents.forEach(IgniteComponent::start);
+            firstComponents.forEach(IgniteComponent::start);
+
+            nodeComponents.addAll(firstComponents);
+
+            deployWatchesFut = CompletableFuture.supplyAsync(() -> {
+                List<IgniteComponent> secondComponents = List.of(
+                        cmgManager,
+                        metaStorageManager,
+                        clusterCfgMgr,
+                        clockWaiter,
+                        catalogManager,
+                        distributionZoneManager,
+                        replicaManager,
+                        txManager,
+                        baselineMgr,
+                        dataStorageMgr,
+                        schemaManager,
+                        tableManager
+                );
+
+                secondComponents.forEach(IgniteComponent::start);
+
+                nodeComponents.addAll(secondComponents);
 
-            assertThat(
-                    allOf(
+                var configurationNotificationFut = 
metaStorageManager.recoveryFinishedFuture().thenCompose(rev -> {
+                    return allOf(
                             
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
                             
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
-                            // Why "-1"? I don't know, it just works like that.
-                            ((MetaStorageManagerImpl) 
metaStorageManager).notifyRevisionUpdateListenerOnStart(
-                                    metaStorageManager.appliedRevision() - 1
-                            )
-                    ),
-                    willSucceedIn(1, TimeUnit.MINUTES)
-            );
+                            ((MetaStorageManagerImpl) 
metaStorageManager).notifyRevisionUpdateListenerOnStart()
+                    );
+                });
+
+                assertThat(configurationNotificationFut, willSucceedIn(1, 
TimeUnit.MINUTES));
 
-            deployWatchesFut = metaStorageManager.deployWatches();
+                return metaStorageManager.deployWatches();
+            }).thenCompose(identity());
         }
 
         /**
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
index 74198469b5..4f4743953c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
@@ -49,6 +49,7 @@ import 
org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.sql.Session;
 import org.intellij.lang.annotations.Language;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -95,6 +96,8 @@ public class ItDistributionZonesFilterTest extends 
ClusterPerTestIntegrationTest
      *
      * @throws Exception If failed.
      */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-19955 also blocks 
this.
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19506";)
     @Test
     void testFilteredDataNodesPropagatedToStable() throws Exception {
         String filter = "'$[?(@.region == \"US\" && @.storage == \"SSD\")]'";
@@ -172,6 +175,8 @@ public class ItDistributionZonesFilterTest extends 
ClusterPerTestIntegrationTest
      *
      * @throws Exception If failed.
      */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-19955 also blocks 
this.
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19506";)
     @Test
     void testAlteringFiltersPropagatedDataNodesToStableImmediately() throws 
Exception {
         String filter = "'$[?(@.region == \"US\" && @.storage == \"SSD\")]'";
@@ -240,6 +245,8 @@ public class ItDistributionZonesFilterTest extends 
ClusterPerTestIntegrationTest
      *
      * @throws Exception If failed.
      */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-19955 also blocks 
this.
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19506";)
     @Test
     void testEmptyDataNodesDoNotPropagatedToStableAfterAlteringFilter() throws 
Exception {
         String filter = "'$[?(@.region == \"US\" && @.storage == \"SSD\")]'";
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index d744178a3a..daff409fc2 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -26,7 +26,6 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTest
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision;
-import static 
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
@@ -80,7 +79,6 @@ import 
org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.recovery.VaultStateIds;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
-import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.network.ClusterNode;
@@ -96,7 +94,6 @@ import org.junit.jupiter.params.provider.MethodSource;
 /**
  * Tests for checking {@link DistributionZoneManager} behavior after node's 
restart.
  */
-@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = 
"0")
 @ExtendWith(ConfigurationExtension.class)
 public class ItIgniteDistributionZoneManagerNodeRestartTest extends 
BaseIgniteRestartTest {
     private static final LogicalNode A = new LogicalNode(
@@ -181,7 +178,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
                 new TestRocksDbKeyValueStorage(name, 
workDir.resolve("metastorage"))
         ));
 
-        var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, 
vault);
+        var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr);
 
         ConfigurationTreeGenerator distributedConfigurationGenerator = new 
ConfigurationTreeGenerator(
                 modules.distributed().rootKeys(),
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 5d3923ef7d..df0527853e 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.runner.app;
 
-import static 
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -116,7 +115,6 @@ import 
org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
-import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -149,7 +147,6 @@ import org.junit.jupiter.params.provider.ValueSource;
 /**
  * These tests check node restart scenarios.
  */
-@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = 
"0")
 @ExtendWith(ConfigurationExtension.class)
 @Timeout(120)
 public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
@@ -297,7 +294,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 metaStorageConfiguration
         );
 
-        var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, 
vault);
+        var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr);
 
         ConfigurationTreeGenerator distributedConfigurationGenerator = new 
ConfigurationTreeGenerator(
                 modules.distributed().rootKeys(),
@@ -779,7 +776,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
         for (int i = 0; i < 10; i++) {
             ByteArray key = ByteArray.fromString("some-test-key-" + i);
 
-            byte[] value = restartedMs.getLocally(key.bytes(), 100).value();
+            byte[] value = restartedMs.getLocally(key, 100).value();
 
             assertEquals(1, value.length);
             assertEquals((byte) i, value[0]);
@@ -1020,7 +1017,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
      * The test for node restart when there is a gap between the node local 
configuration and distributed configuration.
      */
     @Test
-    @WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, 
value = "0")
     public void testCfgGapWithoutData() throws InterruptedException {
         List<IgniteImpl> nodes = startNodes(3);
 
@@ -1052,7 +1048,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
      */
     @Test
     @Disabled(value = "https://issues.apache.org/jira/browse/IGNITE-18919";)
-    @WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, 
value = "0")
     public void testMetastorageStop() throws InterruptedException {
         int cfgGap = 4;
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 5b2b53f0ba..dfd08d83ce 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -69,7 +69,6 @@ import 
org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
 import 
org.apache.ignite.internal.configuration.DistributedConfigurationUpdater;
 import org.apache.ignite.internal.configuration.SecurityConfiguration;
 import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
-import 
org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListener;
 import org.apache.ignite.internal.configuration.presentation.HoconPresentation;
 import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
 import 
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
@@ -103,8 +102,6 @@ import org.apache.ignite.internal.raft.Loza;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import 
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
-import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
-import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.rest.RestComponent;
@@ -430,7 +427,7 @@ public class IgniteImpl implements Ignite {
                 topologyAwareRaftGroupServiceFactory
         );
 
-        this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, 
vaultMgr);
+        this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr);
 
         clusterCfgMgr = new ConfigurationManager(
                 modules.distributed().rootKeys(),
@@ -982,44 +979,16 @@ public class IgniteImpl implements Ignite {
      * and deploying watches after that.
      */
     private CompletableFuture<?> recoverComponentsStateOnStart(ExecutorService 
startupExecutor) {
-        // Recovery future must be created before configuration listeners are 
triggered.
-        CompletableFuture<?> recoveryFuture = 
RecoveryCompletionFutureFactory.create(
-                clusterCfgMgr,
-                fut -> new ConfigurationCatchUpListener(cfgStorage, fut, LOG)
-        );
-
-        //TODO https://issues.apache.org/jira/browse/IGNITE-19778
-        // The order of these two lines matter, the first method relies on the 
second one not being called yet.
-        // After the fix, the order will most likely have to be reversed.
-        CompletableFuture<Void> startupRevisionUpdate = 
notifyRevisionUpdateListenerOnStart();
         CompletableFuture<Void> startupConfigurationUpdate = 
notifyConfigurationListeners();
+        CompletableFuture<Void> startupRevisionUpdate = 
metaStorageMgr.notifyRevisionUpdateListenerOnStart();
 
         return CompletableFuture.allOf(startupConfigurationUpdate, 
startupRevisionUpdate)
                 .thenComposeAsync(t -> {
                     // Deploy all registered watches because all components 
are ready and have registered their listeners.
-                    return metaStorageMgr.deployWatches().thenCompose(unused 
-> recoveryFuture);
+                    return metaStorageMgr.deployWatches();
                 }, startupExecutor);
     }
 
-    private CompletableFuture<Void> notifyRevisionUpdateListenerOnStart() {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-19778 Use 
meta-storages revision after recovery,
-        // it should match configuration revision.
-        // Temporary workaround.
-        // In order to avoid making a public getter for configuration 
revision, I read it from the startup notification.
-        // It should be removed once we start using up-to-date meta-storage 
revision for node startup.
-        var configurationRevisionFuture = new CompletableFuture<Void>();
-
-        ConfigurationStorageRevisionListener revisionListener = 
newStorageRevision ->
-                ((MetaStorageManagerImpl) 
metaStorageMgr).notifyRevisionUpdateListenerOnStart(newStorageRevision)
-                        .thenRun(() -> 
configurationRevisionFuture.complete(null));
-
-        clusterConfiguration().listenUpdateStorageRevision(revisionListener);
-
-        return configurationRevisionFuture.thenRun(() ->
-                
clusterConfiguration().stopListenUpdateStorageRevision(revisionListener)
-        );
-    }
-
     /**
      * Notify all listeners of current configurations.
      */
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index cbfe4bd99b..71e79ff9ab 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.internal.configuration.storage;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
 
 import java.io.Serializable;
 import java.util.Arrays;
@@ -41,19 +44,15 @@ import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.ConditionType;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
-import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.vault.VaultEntry;
-import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Distributed configuration storage.
@@ -70,11 +69,6 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
      */
     private static final ByteArray MASTER_KEY = new 
ByteArray(DISTRIBUTED_PREFIX + "$master$key");
 
-    /**
-     * Vault's key for a value of previous and current configuration's 
MetaStorage revision.
-     */
-    private static final ByteArray CONFIGURATION_REVISIONS_KEY = new 
ByteArray("$revisions");
-
     /**
      * Prefix for all keys in the distributed storage. This key is expected to 
be the first key in lexicographical order of distributed
      * configuration keys.
@@ -91,9 +85,6 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
     /** Meta storage manager. */
     private final MetaStorageManager metaStorageMgr;
 
-    /** Vault manager. */
-    private final VaultManager vaultMgr;
-
     /** Configuration changes listener. */
     private volatile ConfigurationStorageListener lsnr;
 
@@ -105,10 +96,10 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
      * <p>Given that {@link #MASTER_KEY} is updated on every configuration 
change, one could assume that {@code changeId} matches the
      * revision of {@link #MASTER_KEY}.
      *
-     * <p>This is true for all cases except for node restart. Key-specific 
revision values are lost on local vault copy after restart, so
-     * stored {@link MetaStorageManager#appliedRevision} value is used 
instead. This fact has very important side effect: it's no longer
-     * possible to use {@link ConditionType#REV_EQUAL} on {@link #MASTER_KEY} 
in {@link DistributedConfigurationStorage#write(Map, long)}.
-     * {@link ConditionType#REV_LESS_OR_EQUAL} must be used instead.
+     * <p>This is true for all cases except for node restart. We use latest 
values after restart, so MetaStorage's local revision is used
+     * instead. This fact has very important side effect: it's no longer 
possible to use {@link ConditionType#REV_EQUAL} on
+     * {@link #MASTER_KEY} in {@link 
DistributedConfigurationStorage#write(Map, long)}. {@link 
ConditionType#REV_LESS_OR_EQUAL} must be
+     * used instead.
      *
      * @see #MASTER_KEY
      * @see #write(Map, long)
@@ -123,12 +114,9 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
      * Constructor.
      *
      * @param metaStorageMgr Meta storage manager.
-     * @param vaultMgr Vault manager.
      */
-    public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr, 
VaultManager vaultMgr) {
+    public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
-
-        this.vaultMgr = vaultMgr;
     }
 
     @Override
@@ -206,60 +194,46 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
 
     @Override
     public CompletableFuture<Data> readDataOnRecovery() throws 
StorageException {
-        CompletableFuture<Data> future = 
vaultMgr.get(CONFIGURATION_REVISIONS_KEY)
-                .thenApplyAsync(entry -> {
-                    long revision = 
resolveRevision(metaStorageMgr.appliedRevision(), entry);
-
-                    return readDataOnRecovery0(revision);
-                }, threadPool);
+        CompletableFuture<Data> future = 
metaStorageMgr.recoveryFinishedFuture()
+                .thenApplyAsync(this::readDataOnRecovery0, threadPool);
 
         return registerFuture(future);
     }
 
-    /**
-     * Resolves current configuration revision based on the saved in the Vault 
revision of the metastorage and also previous and current
-     * revisions of the configuration saved in the Vault.
-     *
-     * @param metaStorageRevision Meta Storage revision.
-     * @param revisionsEntry Configuration revisions entry.
-     * @return Configuration revision.
-     */
-    private static long resolveRevision(long metaStorageRevision, @Nullable 
VaultEntry revisionsEntry) {
-        if (revisionsEntry != null) {
-            byte[] value = revisionsEntry.value();
-            long prevMasterKeyRevision = ByteUtils.bytesToLong(value, 0);
-            long curMasterKeyRevision = ByteUtils.bytesToLong(value, 
Long.BYTES);
-
-            // If current master key revision is higher than applied revision, 
then node failed
-            // before applied revision changed, so we have to use previous 
master key revision
-            return curMasterKeyRevision <= metaStorageRevision ? 
curMasterKeyRevision : prevMasterKeyRevision;
-        } else {
-            // Configuration has not been updated yet, so it is safe to return 
0 as the revision for the master key.
-            return 0L;
-        }
-    }
-
     private Data readDataOnRecovery0(long cfgRevision) {
         var data = new HashMap<String, Serializable>();
 
-        try (Cursor<VaultEntry> entries = storedDistributedConfigKeys()) {
-            for (VaultEntry entry : entries) {
-                ByteArray key = entry.key();
+        byte[] masterKey = MASTER_KEY.bytes();
+        boolean sawMasterKey = false;
+
+        try (Cursor<Entry> cursor = 
metaStorageMgr.getLocally(DST_KEYS_START_RANGE, DST_KEYS_END_RANGE, 
cfgRevision)) {
+            for (Entry entry : cursor) {
+                if (entry.tombstone()) {
+                    continue;
+                }
+
+                byte[] key = entry.key();
                 byte[] value = entry.value();
 
-                // vault iterator should not return nulls as values
+                // MetaStorage iterator should not return nulls as values.
                 assert value != null;
 
-                if (key.equals(MASTER_KEY)) {
+                if (!sawMasterKey && Arrays.equals(masterKey, key)) {
+                    sawMasterKey = true;
+
                     continue;
                 }
 
-                String dataKey = 
key.toString().substring(DISTRIBUTED_PREFIX.length());
+                int startIdx = DST_KEYS_START_RANGE.length();
+
+                int keyLengthWithoutPrefix = key.length - startIdx;
+
+                var dataKey = new String(key, startIdx, 
keyLengthWithoutPrefix, UTF_8);
 
                 data.put(dataKey, 
ConfigurationSerializationUtil.fromBytes(value));
             }
         } catch (Exception e) {
-            throw new StorageException("Exception when closing a Vault 
cursor", e);
+            throw new StorageException("Exception reading data on recovery", 
e);
         }
 
         assert data.isEmpty() || cfgRevision > 0;
@@ -296,9 +270,26 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
 
         operations.add(Operations.put(MASTER_KEY, 
ByteUtils.longToBytes(curChangeId)));
 
-        SimpleCondition condition = curChangeId == 0L
-                ? Conditions.notExists(MASTER_KEY)
-                : Conditions.revision(MASTER_KEY).eq(curChangeId);
+        // Condition for a valid MetaStorage data update. Several 
possibilities here:
+        //  - First update ever, MASTER_KEY property must be absent from 
MetaStorage.
+        //  - Current node has already performed some updates or received them 
from MetaStorage watch listener. In this
+        //    case "curChangeId" must match the MASTER_KEY revision exactly.
+        //  - Current node has been restarted and received updates from 
MetaStorage watch listeners after that. Same as
+        //    above, "curChangeId" must match the MASTER_KEY revision exactly.
+        //  - Current node has been restarted and have not received any 
updates from MetaStorage watch listeners yet.
+        //    In this case "curChangeId" matches MetaStorage's local revision, 
which may or may not match the MASTER_KEY revision. Two
+        //    options here:
+        //     - MASTER_KEY is missing in local MetaStorage copy. This means 
that current node have not performed nor
+        //       observed any configuration changes. Valid condition is 
"MASTER_KEY does not exist".
+        //     - MASTER_KEY is present in local MetaStorage copy. The 
MASTER_KEY revision is unknown but is less than or
+        //       equal to MetaStorage's local revision. Obviously, there have 
been no updates from the future yet. It's also guaranteed
+        //       that the next received configuration update will have the 
MASTER_KEY revision strictly greater than
+        //       current MetaStorage's local revision. This allows to conclude 
that "MASTER_KEY revision <= curChangeId" is a valid
+        //       condition for update.
+        // Joining all of the above, it's concluded that the following 
condition must be used:
+        Condition condition = curChangeId == 0L
+                ? notExists(MASTER_KEY)
+                : or(notExists(MASTER_KEY), 
revision(MASTER_KEY).le(curChangeId));
 
         return metaStorageMgr.invoke(condition, operations, 
Set.of(Operations.noop()));
     }
@@ -374,29 +365,6 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
         return metaStorageMgr.get(MASTER_KEY).thenApply(Entry::revision);
     }
 
-    @Override
-    public void writeConfigurationRevision(long prevRevision, long 
currentRevision) {
-        byte[] value = new byte[Long.BYTES * 2];
-
-        ByteUtils.putLongToBytes(prevRevision, value, 0);
-        ByteUtils.putLongToBytes(currentRevision, value, Long.BYTES);
-
-        vaultMgr.put(CONFIGURATION_REVISIONS_KEY, value).join();
-    }
-
-    /**
-     * Method that returns all distributed configuration keys from the meta 
storage that were stored in the vault filtered out by the
-     * current applied revision as an upper bound. Applied revision is a 
revision of the last successful vault update.
-     *
-     * <p>This is possible to distinguish cfg keys from meta storage because 
we add a special prefix {@link
-     * DistributedConfigurationStorage#DISTRIBUTED_PREFIX} to all 
configuration keys that we put to the meta storage.
-     *
-     * @return Iterator built upon all distributed configuration entries 
stored in vault.
-     */
-    private Cursor<VaultEntry> storedDistributedConfigKeys() {
-        return vaultMgr.range(DST_KEYS_START_RANGE, DST_KEYS_END_RANGE);
-    }
-
     /**
      * Increments the last character of the given string.
      */
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
index b832ee4927..c026a6aa04 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
@@ -223,11 +223,6 @@ public class LocalConfigurationStorage implements 
ConfigurationStorage {
                 .thenApply(entry -> entry == null ? 0 : (Long) 
fromBytes(entry.value()));
     }
 
-    @Override
-    public void writeConfigurationRevision(long prevRevision, long 
currentRevision) {
-        // No-op.
-    }
-
     /**
      * Increments the last character of the given string.
      */
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java
index e3d75d477b..742745c4cb 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java
@@ -219,11 +219,6 @@ public class LocalFileConfigurationStorage implements 
ConfigurationStorage {
         return CompletableFuture.completedFuture(lastRevision);
     }
 
-    @Override
-    public void writeConfigurationRevision(long prevRevision, long 
currentRevision) {
-        // No-op.
-    }
-
     @Override
     public void close() {
         futureTracker.cancelInFlightFutures();
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java
deleted file mode 100644
index ef5cc3c4f0..0000000000
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.recovery;
-
-import java.util.concurrent.CompletableFuture;
-import 
org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListener;
-import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.IgniteSystemProperties;
-
-/**
- * Configuration listener class that is intended to complete catch-up future 
during recovery when configuration
- * is up-to-date.
- */
-public class ConfigurationCatchUpListener implements 
ConfigurationStorageRevisionListener {
-    /** Configuration catch-up difference property name. */
-    public static final String CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY = 
"CONFIGURATION_CATCH_UP_DIFFERENCE";
-
-    /**
-     * Difference between the local node applied revision and distributed data 
storage revision on start.
-     * TODO: IGNITE-16488 Make this property adjustable and remove system 
property.
-     */
-    private final int configurationCatchUpDifference =
-            
IgniteSystemProperties.getInteger(CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, 
100);
-
-    /** Revision to catch up. */
-    private volatile long targetRevision = -1;
-
-    /** Catch-up future. */
-    private final CompletableFuture<Void> catchUpFuture;
-
-    /** Configuration storage. */
-    private final ConfigurationStorage cfgStorage;
-
-    /** Mutex for updating target revision. */
-    private final Object targetRevisionUpdateMutex = new Object();
-
-    /** Logger. */
-    private final IgniteLogger log;
-
-    /**
-     * Constructor.
-     *
-     * @param catchUpFuture Catch-up future.
-     */
-    public ConfigurationCatchUpListener(ConfigurationStorage cfgStorage, 
CompletableFuture<Void> catchUpFuture, IgniteLogger log) {
-        this.cfgStorage = cfgStorage;
-        this.catchUpFuture = catchUpFuture;
-        this.log = log;
-    }
-
-    /**
-     * Checks the node up to date by distributed configuration.
-     *
-     * @param targetRevision Configuration revision.
-     * @param appliedRevision Last applied node revision.
-     * @return True when the applied revision is great enough for node 
recovery to complete, false otherwise.
-     */
-    private boolean isConfigurationUpToDate(long targetRevision, long 
appliedRevision) {
-        return targetRevision - configurationCatchUpDifference <= 
appliedRevision;
-    }
-
-    /**
-     * Retrieve distribute configuration revision and check whether local 
revision is great enough to complete the recovery.
-     *
-     * @param appliedRevision Applied revision.
-     */
-    private CompletableFuture<?> checkRevisionUpToDate(long appliedRevision) {
-        return cfgStorage.lastRevision().thenAccept(rev -> {
-            synchronized (targetRevisionUpdateMutex) {
-                assert rev >= appliedRevision : IgniteStringFormatter.format(
-                    "Configuration revision must be greater than local node 
applied revision [msRev={}, appliedRev={}",
-                    rev, appliedRevision);
-
-                targetRevision = rev;
-
-                log.info("Checking revision on recovery [targetRevision={}, 
appliedRevision={}, acceptableDifference={}]",
-                        targetRevision, appliedRevision, 
configurationCatchUpDifference);
-
-                if (isConfigurationUpToDate(targetRevision, appliedRevision)) {
-                    catchUpFuture.complete(null);
-                }
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public CompletableFuture<?> onUpdate(long appliedRevision) {
-        long targetRev = targetRevision;
-
-        if (targetRev >= 0) {
-            if (isConfigurationUpToDate(targetRev, appliedRevision)) {
-                return checkRevisionUpToDate(appliedRevision);
-            }
-        } else {
-            return checkRevisionUpToDate(appliedRevision);
-        }
-
-        return CompletableFuture.completedFuture(null);
-    }
-}
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/RecoveryCompletionFutureFactory.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/recovery/RecoveryCompletionFutureFactory.java
deleted file mode 100644
index 32cc5ac7f7..0000000000
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/RecoveryCompletionFutureFactory.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.recovery;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import org.apache.ignite.internal.configuration.ConfigurationManager;
-import 
org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListener;
-
-/**
- * Creates a future that completes when local recovery is finished.
- */
-public class RecoveryCompletionFutureFactory {
-    /**
-     * Create recovery completion future.
-     *
-     * @param clusterCfgMgr Cluster configuration manager.
-     * @param listenerProvider Provider of configuration listener.
-     * @return Recovery completion future.
-     */
-    public static CompletableFuture<Void> create(
-            ConfigurationManager clusterCfgMgr,
-            Function<CompletableFuture<Void>, 
ConfigurationStorageRevisionListener> listenerProvider
-    ) {
-        CompletableFuture<Void> configCatchUpFuture = new 
CompletableFuture<>();
-
-        ConfigurationStorageRevisionListener listener = 
listenerProvider.apply(configCatchUpFuture);
-
-        CompletableFuture<Void> recoveryCompletionFuture =
-                configCatchUpFuture.thenRun(() -> 
clusterCfgMgr.configurationRegistry().stopListenUpdateStorageRevision(listener));
-
-        
clusterCfgMgr.configurationRegistry().listenUpdateStorageRevision(listener);
-
-        return recoveryCompletionFuture;
-    }
-}
diff --git 
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
 
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
deleted file mode 100644
index 4e47a634e6..0000000000
--- 
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.configuration.storage;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyCollection;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
-import org.apache.ignite.configuration.RootKey;
-import org.apache.ignite.configuration.annotation.ConfigurationRoot;
-import org.apache.ignite.configuration.annotation.Value;
-import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
-import org.apache.ignite.internal.configuration.TestConfigurationChanger;
-import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
-import org.apache.ignite.internal.configuration.tree.ConstructableTreeNode;
-import 
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.metastorage.EntryEvent;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.WatchEvent;
-import org.apache.ignite.internal.metastorage.WatchListener;
-import org.apache.ignite.internal.metastorage.dsl.Operation;
-import org.apache.ignite.internal.metastorage.impl.EntryImpl;
-import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
-import org.apache.ignite.lang.ByteArray;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/**
- * Tests for the {@link DistributedConfigurationStorage}.
- */
-public class DistributedConfigurationCatchUpTest {
-    private final VaultManager vaultManager = new VaultManager(new 
InMemoryVaultService());
-
-    /**
-     * Before each.
-     */
-    @BeforeEach
-    void start() {
-        vaultManager.start();
-    }
-
-    /**
-     * After each.
-     */
-    @AfterEach
-    void stop() throws Exception {
-        vaultManager.stop();
-    }
-
-    /**
-     * Dummy configuration.
-     */
-    @ConfigurationRoot(rootName = "someKey", type = DISTRIBUTED)
-    public static class DistributedTestConfigurationSchema {
-        @Value(hasDefault = true)
-        public final int foobar = 0;
-    }
-
-    /**
-     * Tests that distributed configuration storage correctly picks up latest 
configuration MetaStorage revision during recovery process.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testMetaStorageRevisionDifferentFromConfigurationOnRestart() 
throws Exception {
-        RootKey<DistributedTestConfiguration, DistributedTestView> rootKey = 
DistributedTestConfiguration.KEY;
-
-        ConfigurationTreeGenerator generator = new 
ConfigurationTreeGenerator(rootKey);
-
-        MetaStorageMockWrapper wrapper = new MetaStorageMockWrapper();
-
-        DistributedConfigurationStorage storage = storage(wrapper);
-
-        try {
-            var validator = new ConfigurationValidatorImpl(generator, 
Set.of());
-            var changer = new TestConfigurationChanger(List.of(rootKey), 
storage, generator, validator);
-
-            try {
-                changer.start();
-
-                ConfigurationSource source = source(
-                        rootKey,
-                        (DistributedTestChange change) -> 
change.changeFoobar(1)
-                );
-
-                CompletableFuture<Void> change = changer.change(source);
-
-                assertThat(change, willCompleteSuccessfully());
-            } finally {
-                changer.stop();
-            }
-        } finally {
-            storage.close();
-        }
-
-        // Put a value to the configuration, so we start on non-empty vault.
-        vaultManager.put(MetaStorageMockWrapper.TEST_KEY, new byte[]{4, 1, 2, 
3, 4}).get();
-
-        // This emulates a change in MetaStorage that is not related to the 
configuration.
-        when(wrapper.mock.appliedRevision()).thenReturn(2L);
-
-        storage = storage(wrapper);
-
-        try {
-
-            var configurationValidator = new 
ConfigurationValidatorImpl(generator, Set.of());
-            var changer = new TestConfigurationChanger(List.of(rootKey), 
storage, generator, configurationValidator);
-
-            try {
-                changer.start();
-
-                // Should return last configuration change, not last 
MetaStorage change.
-                assertThat(storage.lastRevision(), willBe(1L));
-            } finally {
-                changer.stop();
-            }
-        } finally {
-            storage.close();
-        }
-    }
-
-    private DistributedConfigurationStorage storage(MetaStorageMockWrapper 
wrapper) {
-        return new 
DistributedConfigurationStorage(wrapper.metaStorageManager(), vaultManager);
-    }
-
-    /**
-     * This class stores data for {@link MetaStorageManager}'s mock.
-     */
-    private static class MetaStorageMockWrapper {
-        private static final String DISTRIBUTED_PREFIX = "dst-cfg.";
-
-        /**
-         * This and previous field are copy-pasted intentionally, so in case 
if something changes, this test should fail and be reviewed and
-         * re-written.
-         */
-        private static final ByteArray MASTER_KEY = new 
ByteArray(DISTRIBUTED_PREFIX + "$master$key");
-
-        private static final ByteArray TEST_KEY = new 
ByteArray(DISTRIBUTED_PREFIX + "someKey.foobar");
-
-        /** MetaStorage mock. */
-        private final MetaStorageManager mock;
-
-        /** Captured MetaStorage listener. */
-        private WatchListener lsnr;
-
-        /** Current master key revision. */
-        private final AtomicLong masterKeyRevision = new AtomicLong();
-
-        private MetaStorageMockWrapper() {
-            mock = mock(MetaStorageManager.class);
-
-            setup();
-        }
-
-        private void setup() {
-            // Returns current master key revision.
-            when(mock.get(MASTER_KEY)).then(invocation -> {
-                return completedFuture(new EntryImpl(MASTER_KEY.bytes(), null, 
masterKeyRevision.get(), -1));
-            });
-
-            // On any invocation - trigger storage listener.
-            when(mock.invoke(any(), anyCollection(), any()))
-                    .then(invocation -> triggerStorageListener());
-
-            when(mock.invoke(any(), any(Operation.class), any()))
-                    .then(invocation -> triggerStorageListener());
-
-            // This captures the listener.
-            doAnswer(invocation -> {
-                lsnr = invocation.getArgument(1);
-
-                return null;
-            }).when(mock).registerPrefixWatch(any(), any());
-        }
-
-        /**
-         * Triggers MetaStorage listener incrementing master key revision.
-         */
-        private CompletableFuture<Boolean> triggerStorageListener() {
-            return CompletableFuture.supplyAsync(() -> {
-                long newRevision = masterKeyRevision.incrementAndGet();
-
-                lsnr.onUpdate(new WatchEvent(List.of(
-                        new EntryEvent(null, new EntryImpl(MASTER_KEY.bytes(), 
null, newRevision, -1)),
-                        // Add a mock entry to simulate a configuration update.
-                        new EntryEvent(null, new EntryImpl((DISTRIBUTED_PREFIX 
+ "foobar").getBytes(UTF_8), null, newRevision, -1))
-                ), newRevision, HybridTimestamp.MAX_VALUE));
-
-                return true;
-            });
-        }
-
-        private MetaStorageManager metaStorageManager() {
-            return mock;
-        }
-    }
-
-    private static <CHANGET> ConfigurationSource source(RootKey<?, ? super 
CHANGET> rootKey, Consumer<CHANGET> changer) {
-        return new ConfigurationSource() {
-            @Override
-            public void descend(ConstructableTreeNode node) {
-                ConfigurationSource changerSrc = new ConfigurationSource() {
-                    @Override
-                    public void descend(ConstructableTreeNode node) {
-                        changer.accept((CHANGET) node);
-                    }
-                };
-
-                node.construct(rootKey.key(), changerSrc, true);
-            }
-        };
-    }
-}
diff --git 
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
 
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
index 17da06caf6..36d40052d8 100644
--- 
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
@@ -34,8 +34,6 @@ import 
org.apache.ignite.internal.metastorage.server.ExistenceCondition;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.RevisionCondition;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
-import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
 import org.apache.ignite.lang.ByteArray;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -44,8 +42,6 @@ import org.junit.jupiter.api.BeforeEach;
  * Tests for the {@link DistributedConfigurationStorage}.
  */
 public class DistributedConfigurationStorageTest extends 
ConfigurationStorageTest {
-    private final VaultManager vaultManager = new VaultManager(new 
InMemoryVaultService());
-
     private final KeyValueStorage metaStorage = new 
SimpleInMemoryKeyValueStorage("test");
 
     private final MetaStorageManager metaStorageManager = 
mockMetaStorageManager();
@@ -55,7 +51,6 @@ public class DistributedConfigurationStorageTest extends 
ConfigurationStorageTes
      */
     @BeforeEach
     void start() {
-        vaultManager.start();
         metaStorage.start();
         metaStorageManager.start();
     }
@@ -67,13 +62,12 @@ public class DistributedConfigurationStorageTest extends 
ConfigurationStorageTes
     void stop() throws Exception {
         metaStorageManager.stop();
         metaStorage.close();
-        vaultManager.stop();
     }
 
     /** {@inheritDoc} */
     @Override
     public ConfigurationStorage getStorage() {
-        return new DistributedConfigurationStorage(metaStorageManager, 
vaultManager);
+        return new DistributedConfigurationStorage(metaStorageManager);
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 23ba896aa2..b534b2e831 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -633,6 +633,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 }
             });
 
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 
Probably should be reworked so that
+            // the future is returned along with createTableFut. Right now it 
will break some tests.
             writeTableAssignmentsToMetastore(tableId, assignments);
 
             return createTableFut;
@@ -641,7 +643,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         }
     }
 
-    private void writeTableAssignmentsToMetastore(int tableId, 
List<Set<Assignment>> assignments) {
+    private CompletableFuture<Boolean> writeTableAssignmentsToMetastore(int 
tableId, List<Set<Assignment>> assignments) {
         assert !assignments.isEmpty();
 
         List<Operation> partitionAssignments = new 
ArrayList<>(assignments.size());
@@ -655,7 +657,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         Condition condition = Conditions.notExists(new 
ByteArray(partitionAssignments.get(0).key()));
 
-        metaStorageMgr
+        return metaStorageMgr
                 .invoke(condition, partitionAssignments, 
Collections.emptyList())
                 .exceptionally(e -> {
                     LOG.error("Couldn't write assignments to metastore", e);
@@ -1265,7 +1267,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                     });
         }));
 
-        createTablePartitionsLocally(causalityToken, assignments, 
zoneDescriptor.id(), table);
+        CompletableFuture<?> createPartsFut = 
createTablePartitionsLocally(causalityToken, assignments, zoneDescriptor.id(), 
table);
 
         pendingTables.put(tableId, table);
         startedTables.put(tableId, table);
@@ -1279,7 +1281,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         // TODO should be reworked in IGNITE-16763
         // We use the event notification future as the result so that 
dependent components can complete the schema updates.
-        return fireEvent(TableEvent.CREATE, new 
TableEventParameters(causalityToken, tableId));
+
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible 
performance degradation.
+        return allOf(createPartsFut, fireEvent(TableEvent.CREATE, new 
TableEventParameters(causalityToken, tableId)));
     }
 
     /**


Reply via email to