This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4363510950 IGNITE-21496 Make Vault methods synchronous (#3189)
4363510950 is described below

commit 4363510950a71d6641e31aa83de35fc40434b11e
Author: Alexander Polovtcev <alex.polovt...@gmail.com>
AuthorDate: Fri Feb 9 11:54:43 2024 +0200

    IGNITE-21496 Make Vault methods synchronous (#3189)
---
 .../management/ClusterManagementGroupManager.java  |  24 ++--
 .../cluster/management/LocalStateStorage.java      |  23 ++--
 .../internal/cluster/management/MockNode.java      |   4 +-
 ...niteDistributionZoneManagerNodeRestartTest.java |   4 +-
 .../internal/network/recovery/VaultStaleIds.java   |   6 +-
 .../network/recovery/VaultStaleIdsTest.java        |  51 +++-----
 modules/platforms/cpp/ignite/common/error_codes.h  |   0
 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs |   0
 .../ItDistributedConfigurationStorageTest.java     |   3 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   8 +-
 .../storage/LocalConfigurationStorage.java         |  27 +++--
 .../ignite/internal/BaseIgniteRestartTest.java     |   4 +-
 .../rebalance/ItRebalanceDistributedTest.java      |   6 +-
 .../internal/table/distributed/LowWatermark.java   |  80 ++++++-------
 .../table/distributed/LowWatermarkTest.java        |  13 +-
 .../table/distributed/TableManagerTest.java        |   4 -
 .../apache/ignite/internal/vault/VaultManager.java |  34 +++---
 .../apache/ignite/internal/vault/VaultService.java |  14 +--
 .../vault/persistence/PersistentVaultService.java  | 133 +++++++--------------
 .../ignite/internal/vault/VaultManagerTest.java    |  12 +-
 .../ignite/internal/vault/VaultServiceTest.java    |  62 +++++-----
 .../ItPersistencePropertiesVaultServiceTest.java   |  16 +--
 .../persistence/ItPersistentVaultServiceTest.java  |   2 +-
 .../vault/inmemory/InMemoryVaultService.java       |  80 +++++--------
 25 files changed, 243 insertions(+), 371 deletions(-)

diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 372ce984fd..0ef44d4dfb 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -247,17 +247,7 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
      */
     @Nullable
     private CompletableFuture<CmgRaftService> recoverLocalState() {
-        LocalState localState;
-
-        try {
-            localState = localStateStorage.getLocalState().get();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
-        } catch (ExecutionException e) {
-            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
-        }
+        LocalState localState = localStateStorage.getLocalState();
 
         if (localState == null) {
             return null;
@@ -336,8 +326,9 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
                 .thenCompose(state -> {
                     var localState = new LocalState(state.cmgNodes(), 
state.clusterTag());
 
-                    return localStateStorage.saveLocalState(localState)
-                            .thenCompose(v -> joinCluster(service, 
state.clusterTag()));
+                    localStateStorage.saveLocalState(localState);
+
+                    return joinCluster(service, state.clusterTag());
                 });
     }
 
@@ -440,7 +431,7 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
 
                 raftManager.stopRaftNodes(CmgGroupId.INSTANCE);
 
-                localStateStorage.clear().get();
+                localStateStorage.clear();
             } catch (Exception e) {
                 throw new IgniteInternalException("Error when cleaning the CMG 
state", e);
             }
@@ -578,8 +569,9 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
                 .thenCompose(service -> {
                     var localState = new LocalState(state.cmgNodes(), 
state.clusterTag());
 
-                    return localStateStorage.saveLocalState(localState)
-                            .thenCompose(v -> joinCluster(service, 
state.clusterTag()));
+                    localStateStorage.saveLocalState(localState);
+
+                    return joinCluster(service, state.clusterTag());
                 });
     }
 
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
index 9a6bcd8438..d708e5e038 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
@@ -19,10 +19,11 @@ package org.apache.ignite.internal.cluster.management;
 
 import java.io.Serializable;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Class that represents a local CMG state (persisted in the Vault).
@@ -33,6 +34,8 @@ class LocalStateStorage {
     private static final ByteArray CMG_STATE_VAULT_KEY = 
ByteArray.fromString("cmg_state");
 
     static class LocalState implements Serializable {
+        private static final long serialVersionUID = -5069326157367860480L;
+
         private final Set<String> cmgNodeNames;
 
         private final ClusterTag clusterTag;
@@ -62,27 +65,25 @@ class LocalStateStorage {
      *
      * @return Local state.
      */
-    CompletableFuture<LocalState> getLocalState() {
-        return vault.get(CMG_STATE_VAULT_KEY)
-                .thenApply(entry -> entry == null ? null : 
ByteUtils.fromBytes(entry.value()));
+    @Nullable LocalState getLocalState() {
+        VaultEntry entry = vault.get(CMG_STATE_VAULT_KEY);
+
+        return entry == null ? null : ByteUtils.fromBytes(entry.value());
     }
 
     /**
      * Saves a given local state.
      *
      * @param state Local state to save.
-     * @return Future that represents the state of the operation.
      */
-    CompletableFuture<Void> saveLocalState(LocalState state) {
-        return vault.put(CMG_STATE_VAULT_KEY, ByteUtils.toBytes(state));
+    void saveLocalState(LocalState state) {
+        vault.put(CMG_STATE_VAULT_KEY, ByteUtils.toBytes(state));
     }
 
     /**
      * Removes all data from the local storage.
-     *
-     * @return Future that represents the state of the operation.
      */
-    CompletableFuture<Void> clear() {
-        return vault.remove(CMG_STATE_VAULT_KEY);
+    void clear() {
+        vault.remove(CMG_STATE_VAULT_KEY);
     }
 }
diff --git 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index d4e3c87261..8db7c05a7f 100644
--- 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++ 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -18,8 +18,6 @@
 package org.apache.ignite.internal.cluster.management;
 
 
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
-
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -105,7 +103,7 @@ public class MockNode {
     private void init(int port) throws IOException {
         Path vaultDir = workDir.resolve("vault");
 
-        var vaultManager = new VaultManager(new 
PersistentVaultService(testNodeName(testInfo, port), 
Files.createDirectories(vaultDir)));
+        var vaultManager = new VaultManager(new 
PersistentVaultService(Files.createDirectories(vaultDir)));
 
         this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, 
port, nodeFinder);
 
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 8bc669ef5b..76516e2e88 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -161,7 +161,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
 
         List<IgniteComponent> components = new ArrayList<>();
 
-        VaultManager vault = createVault(name, dir);
+        VaultManager vault = createVault(dir);
 
         ConfigurationModules modules = loadConfigurationModules(log, 
Thread.currentThread().getContextClassLoader());
 
@@ -261,7 +261,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
         // Start.
 
         vault.start();
-        vault.putName(name).join();
+        vault.putName(name);
 
         nodeCfgMgr.start();
 
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java
index 2ad20206e4..d02f94b5e7 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java
@@ -28,7 +28,7 @@ import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
 
 /**
- * {@link StaleIds} implementating using Vault as a persistent storage.
+ * {@link StaleIds} implementation using Vault as a persistent storage.
  */
 public class VaultStaleIds implements StaleIds {
     private static final ByteArray STALE_IDS_KEY = new 
ByteArray("network.staleIds");
@@ -64,7 +64,7 @@ public class VaultStaleIds implements StaleIds {
     }
 
     private Set<String> loadStaleIdsFromVault() {
-        VaultEntry entry = vaultManager.get(STALE_IDS_KEY).join();
+        VaultEntry entry = vaultManager.get(STALE_IDS_KEY);
 
         if (entry == null) {
             return new LinkedHashSet<>();
@@ -99,6 +99,6 @@ public class VaultStaleIds implements StaleIds {
     private void saveIdsToVault() {
         String joinedIds = String.join("\n", staleIds);
 
-        vaultManager.put(STALE_IDS_KEY, joinedIds.getBytes(UTF_8)).join();
+        vaultManager.put(STALE_IDS_KEY, joinedIds.getBytes(UTF_8));
     }
 }
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java
index c3c4b9b38c..7154edfeb3 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java
@@ -18,19 +18,14 @@
 package org.apache.ignite.internal.network.recovery;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.vault.VaultEntry;
@@ -38,6 +33,7 @@ import org.apache.ignite.internal.vault.VaultManager;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
@@ -57,8 +53,7 @@ class VaultStaleIdsTest extends BaseIgniteAbstractTest {
 
     @Test
     void consultsVaultWhenCheckingForStaleness() {
-        doReturn(completedFuture(new VaultEntry(staleIdsKey, 
"id1\nid2\nid3".getBytes(UTF_8))))
-                .when(vaultManager).get(staleIdsKey);
+        when(vaultManager.get(staleIdsKey)).thenReturn(new 
VaultEntry(staleIdsKey, "id1\nid2\nid3".getBytes(UTF_8)));
 
         assertThat(staleIds.isIdStale("id1"), is(true));
         assertThat(staleIds.isIdStale("id2"), is(true));
@@ -68,56 +63,44 @@ class VaultStaleIdsTest extends BaseIgniteAbstractTest {
 
     @Test
     void cachesVaultStateInMemory() {
-        doReturn(completedFuture(new VaultEntry(staleIdsKey, 
"id1\nid2\nid3".getBytes(UTF_8))))
-                .when(vaultManager).get(staleIdsKey);
+        when(vaultManager.get(staleIdsKey)).thenReturn(new 
VaultEntry(staleIdsKey, "id1\nid2\nid3".getBytes(UTF_8)));
 
         staleIds.isIdStale("id1");
         staleIds.isIdStale("id2");
         staleIds.isIdStale("id3");
 
-        verify(vaultManager, times(1)).get(any());
+        verify(vaultManager).get(any());
     }
 
     @Test
     void savesNewStaleIdsToVault() {
-        doReturn(nullCompletedFuture()).when(vaultManager).get(staleIdsKey);
-        doReturn(nullCompletedFuture())
-                .when(vaultManager).put(staleIdsKey, "id2".getBytes(UTF_8));
-        doReturn(nullCompletedFuture())
-                .when(vaultManager).put(staleIdsKey, 
"id2\nid1".getBytes(UTF_8));
-
         staleIds.markAsStale("id2");
+
+        verify(vaultManager).put(staleIdsKey, "id2".getBytes(UTF_8));
+
         staleIds.markAsStale("id1");
+
+        verify(vaultManager).put(staleIdsKey, "id2\nid1".getBytes(UTF_8));
     }
 
     @Test
     void respectsMaxIdsLimit() {
         staleIds = new VaultStaleIds(vaultManager, 2);
 
-        doReturn(nullCompletedFuture()).when(vaultManager).get(staleIdsKey);
-
-        AtomicReference<String> lastSavedIds = new AtomicReference<>();
-
-        doAnswer(invocation -> {
-            byte[] value = invocation.getArgument(1);
-
-            lastSavedIds.set(new String(value, UTF_8));
-
-            return nullCompletedFuture();
-        }).when(vaultManager).put(eq(staleIdsKey), any());
-
         staleIds.markAsStale("id3");
         staleIds.markAsStale("id2");
         staleIds.markAsStale("id1");
 
-        assertThat(lastSavedIds.get(), is("id2\nid1"));
+        ArgumentCaptor<byte[]> idsCaptor = 
ArgumentCaptor.forClass(byte[].class);
+
+        verify(vaultManager, times(3)).put(eq(staleIdsKey), 
idsCaptor.capture());
+
+        assertThat(idsCaptor.getValue(), is("id2\nid1".getBytes(UTF_8)));
     }
 
     @Test
     void loadsBeforeDoingFirstSave() {
-        lenient().doReturn(completedFuture(new VaultEntry(staleIdsKey, 
"id1".getBytes(UTF_8))))
-                .when(vaultManager).get(staleIdsKey);
-        
doReturn(nullCompletedFuture()).when(vaultManager).put(eq(staleIdsKey), any());
+        when(vaultManager.get(staleIdsKey)).thenReturn(new 
VaultEntry(staleIdsKey, "id1".getBytes(UTF_8)));
 
         staleIds.markAsStale("id2");
 
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h 
b/modules/platforms/cpp/ignite/common/error_codes.h
old mode 100755
new mode 100644
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs 
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
old mode 100755
new mode 100644
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 8e2258f211..d0de55fb56 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.configuration.storage;
 
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -114,7 +113,7 @@ public class ItDistributedConfigurationStorageTest extends 
BaseIgniteAbstractTes
         Node(TestInfo testInfo, Path workDir) {
             var addr = new NetworkAddress("localhost", 10000);
 
-            vaultManager = new VaultManager(new 
PersistentVaultService(testNodeName(testInfo, addr.port()), 
workDir.resolve("vault")));
+            vaultManager = new VaultManager(new 
PersistentVaultService(workDir.resolve("vault")));
 
             clusterService = ClusterServiceTestUtils.clusterService(
                     testInfo,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 13a685d976..5dfd681e2b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -268,7 +268,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         List<IgniteComponent> components = new ArrayList<>();
 
-        VaultManager vault = createVault(name, dir);
+        VaultManager vault = createVault(dir);
 
         ConfigurationModules modules = loadConfigurationModules(log, 
Thread.currentThread().getContextClassLoader());
 
@@ -561,7 +561,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
         // Start.
 
         vault.start();
-        vault.putName(name).join();
+        vault.putName(name);
 
         nodeCfgMgr.start();
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 2f76950b2f..8aac39c636 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -367,7 +367,7 @@ public class IgniteImpl implements Ignite {
 
         threadPoolsManager = new ThreadPoolsManager(name);
 
-        vaultMgr = createVault(name, workDir);
+        vaultMgr = createVault(workDir);
 
         metricManager = new MetricManager();
 
@@ -879,7 +879,7 @@ public class IgniteImpl implements Ignite {
 
             lifecycleManager.startComponent(vaultMgr);
 
-            vaultMgr.putName(name).get();
+            vaultMgr.putName(name);
 
             // Node configuration manager startup.
             lifecycleManager.startComponent(nodeCfgMgr);
@@ -1215,7 +1215,7 @@ public class IgniteImpl implements Ignite {
     /**
      * Starts the Vault component.
      */
-    private static VaultManager createVault(String nodeName, Path workDir) {
+    private static VaultManager createVault(Path workDir) {
         Path vaultPath = workDir.resolve(VAULT_DB_PATH);
 
         try {
@@ -1224,7 +1224,7 @@ public class IgniteImpl implements Ignite {
             throw new IgniteInternalException(e);
         }
 
-        return new VaultManager(new PersistentVaultService(nodeName, 
vaultPath));
+        return new VaultManager(new PersistentVaultService(vaultPath));
     }
 
     /**
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
index 52c636cd9f..ee4c143dcc 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
@@ -113,11 +113,15 @@ public class LocalConfigurationStorage implements 
ConfigurationStorage {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Serializable> readLatest(String key) {
-        return vaultMgr.get(new ByteArray(LOC_PREFIX + key))
-                .thenApply(entry -> entry == null ? null : 
fromBytes(entry.value()))
-                .exceptionally(e -> {
-                    throw new StorageException("Exception while reading vault 
entry", e);
-                });
+        return registerFuture(supplyAsync(() -> {
+            try {
+                VaultEntry entry = vaultMgr.get(new ByteArray(LOC_PREFIX + 
key));
+
+                return entry == null ? null : fromBytes(entry.value());
+            } catch (Exception e) {
+                throw new StorageException("Exception while reading vault 
entry", e);
+            }
+        }, threadPool));
     }
 
     /** {@inheritDoc} */
@@ -189,9 +193,9 @@ public class LocalConfigurationStorage implements 
ConfigurationStorage {
 
                         Data entries = new Data(newValues, version + 1);
 
-                        return vaultMgr.putAll(data)
-                                .thenCompose(v -> 
lsnr.onEntriesChanged(entries))
-                                .thenApply(v -> true);
+                        vaultMgr.putAll(data);
+
+                        return lsnr.onEntriesChanged(entries).thenApply(v -> 
true);
                     }, threadPool));
 
             // ignore any errors on the write future, because we are only 
interested in its completion
@@ -222,8 +226,11 @@ public class LocalConfigurationStorage implements 
ConfigurationStorage {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Long> lastRevision() {
-        return vaultMgr.get(VERSION_KEY)
-                .thenApply(entry -> entry == null ? 0 : (Long) 
fromBytes(entry.value()));
+        return registerFuture(supplyAsync(() -> {
+            VaultEntry entry = vaultMgr.get(VERSION_KEY);
+
+            return entry == null ? 0 : (Long) fromBytes(entry.value());
+        }, threadPool));
     }
 
     @Override
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
index 1afc9df950..a731be183d 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
@@ -180,7 +180,7 @@ public abstract class BaseIgniteRestartTest extends 
IgniteAbstractTest {
     /**
      * Starts the Vault component.
      */
-    public static VaultManager createVault(String nodeName, Path workDir) {
+    public static VaultManager createVault(Path workDir) {
         Path vaultPath = workDir.resolve(Paths.get("vault"));
 
         try {
@@ -189,7 +189,7 @@ public abstract class BaseIgniteRestartTest extends 
IgniteAbstractTest {
             throw new IgniteInternalException(e);
         }
 
-        return new VaultManager(new PersistentVaultService(nodeName, 
vaultPath));
+        return new VaultManager(new PersistentVaultService(vaultPath));
     }
 
     /**
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index dd61c4978f..c5671db508 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -931,7 +931,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
             Path dir = workDir.resolve(name);
 
-            vaultManager = createVault(name, dir);
+            vaultManager = createVault(dir);
 
             nodeCfgGenerator = new ConfigurationTreeGenerator(
                     List.of(
@@ -1287,8 +1287,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
     /**
      * Starts the Vault component.
      */
-    private static VaultManager createVault(String nodeName, Path workDir) {
-        return new VaultManager(new PersistentVaultService(nodeName, 
resolveDir(workDir, "vault")));
+    private static VaultManager createVault(Path workDir) {
+        return new VaultManager(new PersistentVaultService(resolveDir(workDir, 
"vault")));
     }
 
     private static Path resolveDir(Path workDir, String dirName) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
index 4db46d68b0..c0cf426414 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.table.distributed;
 
 import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
 import java.util.concurrent.Executors;
@@ -43,6 +42,7 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.jetbrains.annotations.Nullable;
 
@@ -76,7 +76,7 @@ public class LowWatermark implements ManuallyCloseable {
 
     private final AtomicBoolean closeGuard = new AtomicBoolean();
 
-    private final AtomicReference<HybridTimestamp> lowWatermark = new 
AtomicReference<>();
+    private volatile HybridTimestamp lowWatermark;
 
     private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
 
@@ -119,50 +119,45 @@ public class LowWatermark implements ManuallyCloseable {
      */
     public void start() {
         inBusyLock(busyLock, () -> {
-            vaultManager.get(LOW_WATERMARK_VAULT_KEY)
-                    .thenCompose(vaultEntry -> inBusyLock(busyLock, () -> {
-                        if (vaultEntry == null) {
-                            scheduleUpdateLowWatermarkBusy();
+            HybridTimestamp lowWatermark = readLowWatermarkFromVault();
 
-                            return nullCompletedFuture();
-                        }
+            if (lowWatermark == null) {
+                LOG.info("Previous value of the low watermark was not found, 
will schedule to update it");
+
+                scheduleUpdateLowWatermarkBusy();
 
-                        HybridTimestamp lowWatermark = 
ByteUtils.fromBytes(vaultEntry.value());
+                return;
+            }
 
-                        return txManager.updateLowWatermark(lowWatermark)
-                                .thenApply(unused -> {
-                                    this.lowWatermark.set(lowWatermark);
+            LOG.info(
+                    "Low watermark has been successfully retrieved from the 
vault and is scheduled to be updated: {}",
+                    lowWatermark
+            );
 
-                                    
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+            txManager.updateLowWatermark(lowWatermark)
+                    .thenRun(() -> inBusyLock(busyLock, () -> {
+                        this.lowWatermark = lowWatermark;
 
-                                    return lowWatermark;
-                                });
+                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
                     }))
-                    .whenComplete((lowWatermark, throwable) -> {
-                        if (throwable != null) {
-                            if (!(throwable instanceof NodeStoppingException)) 
{
-                                LOG.error("Error getting low watermark", 
throwable);
+                    .whenComplete((unused, throwable) -> {
+                        if (throwable != null && !(throwable instanceof 
NodeStoppingException)) {
+                            LOG.error("Error during the Watermark manager 
start", throwable);
 
-                                failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, throwable));
+                            failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, throwable));
 
-                                inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
-                            }
-                        } else {
-                            if (lowWatermark == null) {
-                                LOG.info(
-                                        "Previous value of the low watermark 
was not found, will schedule to update it"
-                                );
-                            } else {
-                                LOG.info(
-                                        "Low watermark has been successfully 
got from the vault and is scheduled to be updated: {}",
-                                        lowWatermark
-                                );
-                            }
+                            inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
                         }
                     });
         });
     }
 
+    private @Nullable HybridTimestamp readLowWatermarkFromVault() {
+        VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY);
+
+        return vaultEntry == null ? null : 
ByteUtils.fromBytes(vaultEntry.value());
+    }
+
     @Override
     public void close() {
         if (!closeGuard.compareAndSet(false, true)) {
@@ -184,7 +179,7 @@ public class LowWatermark implements ManuallyCloseable {
      * Returns the current low watermark, {@code null} means no low watermark 
has been assigned yet.
      */
     public @Nullable HybridTimestamp getLowWatermark() {
-        return lowWatermark.get();
+        return lowWatermark;
     }
 
     void updateLowWatermark() {
@@ -195,18 +190,13 @@ public class LowWatermark implements ManuallyCloseable {
             // created, then we can safely promote the candidate as a new low 
watermark, store it in vault, and we can safely start cleaning
             // up the stale/junk data in the tables.
             txManager.updateLowWatermark(lowWatermarkCandidate)
-                    .thenComposeAsync(
-                            unused -> inBusyLock(
-                                    busyLock,
-                                    () -> 
vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(lowWatermarkCandidate))
-                            ),
-                            scheduledThreadPool
-                    )
-                    .thenRun(() -> inBusyLock(busyLock, () -> {
-                        lowWatermark.set(lowWatermarkCandidate);
+                    .thenRunAsync(() -> inBusyLock(busyLock, () -> {
+                        vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(lowWatermarkCandidate));
+
+                        lowWatermark = lowWatermarkCandidate;
 
                         
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermarkCandidate);
-                    }))
+                    }), scheduledThreadPool)
                     .whenComplete((unused, throwable) -> {
                         if (throwable != null) {
                             if (!(throwable instanceof NodeStoppingException)) 
{
@@ -250,7 +240,7 @@ public class LowWatermark implements ManuallyCloseable {
                 -lowWatermarkConfig.dataAvailabilityTime().value() - 
getMaxClockSkew()
         );
 
-        HybridTimestamp lowWatermark = this.lowWatermark.get();
+        HybridTimestamp lowWatermark = this.lowWatermark;
 
         assert lowWatermark == null || 
lowWatermarkCandidate.compareTo(lowWatermark) > 0 :
                 "lowWatermark=" + lowWatermark + ", lowWatermarkCandidate=" + 
lowWatermarkCandidate;
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
index 7de224a2c4..42997ac27c 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.table.distributed;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.table.distributed.LowWatermark.LOW_WATERMARK_VAULT_KEY;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -46,7 +45,6 @@ import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.lang.ByteArray;
 import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
 import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -84,15 +82,13 @@ public class LowWatermarkTest extends 
BaseIgniteAbstractTest {
     }
 
     @AfterEach
-    void tearDown() throws Exception {
+    void tearDown() {
         lowWatermark.close();
     }
 
     @Test
     void testStartWithEmptyVault() {
         // Let's check the start with no low watermark in vault.
-        
when(vaultManager.get(LOW_WATERMARK_VAULT_KEY)).thenReturn(nullCompletedFuture());
-
         lowWatermark.start();
 
         verify(mvGc, never()).updateLowWatermark(any(HybridTimestamp.class));
@@ -104,7 +100,7 @@ public class LowWatermarkTest extends 
BaseIgniteAbstractTest {
         HybridTimestamp lowWatermark = new HybridTimestamp(10, 10);
 
         when(vaultManager.get(LOW_WATERMARK_VAULT_KEY))
-                .thenReturn(completedFuture(new 
VaultEntry(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(lowWatermark))));
+                .thenReturn(new VaultEntry(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(lowWatermark)));
 
         
when(txManager.updateLowWatermark(any(HybridTimestamp.class))).thenReturn(nullCompletedFuture());
 
@@ -134,8 +130,6 @@ public class LowWatermarkTest extends 
BaseIgniteAbstractTest {
 
         
when(txManager.updateLowWatermark(any(HybridTimestamp.class))).thenReturn(nullCompletedFuture());
 
-        when(vaultManager.put(any(ByteArray.class), 
any(byte[].class))).thenReturn(nullCompletedFuture());
-
         // Make a predictable candidate to make it easier to test.
         HybridTimestamp newLowWatermarkCandidate = 
lowWatermark.createNewLowWatermarkCandidate();
 
@@ -157,11 +151,8 @@ public class LowWatermarkTest extends 
BaseIgniteAbstractTest {
      */
     @Test
     void testUpdateWatermarkSequentially() throws Exception {
-        
when(vaultManager.get(LOW_WATERMARK_VAULT_KEY)).thenReturn(nullCompletedFuture());
         assertThat(lowWatermarkConfig.updateFrequency().update(10L), 
willSucceedFast());
 
-        when(vaultManager.put(any(ByteArray.class), 
any(byte[].class))).thenReturn(nullCompletedFuture());
-
         CountDownLatch startGetAllReadOnlyTransactions = new CountDownLatch(3);
 
         CompletableFuture<Void> finishGetAllReadOnlyTransactions = new 
CompletableFuture<>();
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 7161778da0..8e2052afcb 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -77,7 +77,6 @@ import 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -704,9 +703,6 @@ public class TableManagerTest extends IgniteAbstractTest {
             Consumer<TxStateTableStorage> txStateTableStorageDecorator) {
         VaultManager vaultManager = mock(VaultManager.class);
 
-        
when(vaultManager.get(any(ByteArray.class))).thenReturn(nullCompletedFuture());
-        when(vaultManager.put(any(ByteArray.class), 
any(byte[].class))).thenReturn(nullCompletedFuture());
-
         TableManager tableManager = new TableManager(
                 NODE_NAME,
                 revisionUpdater,
diff --git 
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
 
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
index 02e910dc54..d94f6435d8 100644
--- 
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
+++ 
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
@@ -46,7 +46,6 @@ public class VaultManager implements IgniteComponent {
         this.vaultSvc = vaultSvc;
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> start() {
         vaultSvc.start();
@@ -54,7 +53,6 @@ public class VaultManager implements IgniteComponent {
         return nullCompletedFuture();
     }
 
-    /** {@inheritDoc} */
     @Override
     public void stop() {
         // TODO: IGNITE-15161 Implement component's stop.
@@ -65,9 +63,9 @@ public class VaultManager implements IgniteComponent {
      * See {@link VaultService#get}.
      *
      * @param key Key. Cannot be {@code null}.
-     * @return Future that resolves into an entry for the given key, or {@code 
null} if no such mapping exists.
+     * @return Entry for the given key, or {@code null} if no such mapping 
exists.
      */
-    public CompletableFuture<VaultEntry> get(ByteArray key) {
+    public @Nullable VaultEntry get(ByteArray key) {
         return vaultSvc.get(key);
     }
 
@@ -76,20 +74,18 @@ public class VaultManager implements IgniteComponent {
      *
      * @param key Vault key. Cannot be {@code null}.
      * @param val Value. If value is equal to {@code null}, then previous 
value with key will be deleted if there was any mapping.
-     * @return Future representing pending completion of the operation. Cannot 
be {@code null}.
      */
-    public CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val) {
-        return vaultSvc.put(key, val);
+    public void put(ByteArray key, byte @Nullable [] val) {
+        vaultSvc.put(key, val);
     }
 
     /**
      * See {@link VaultService#remove}.
      *
      * @param key Vault key. Cannot be {@code null}.
-     * @return Future representing pending completion of the operation. Cannot 
be {@code null}.
      */
-    public CompletableFuture<Void> remove(ByteArray key) {
-        return vaultSvc.remove(key);
+    public void remove(ByteArray key) {
+        vaultSvc.remove(key);
     }
 
     /**
@@ -108,30 +104,30 @@ public class VaultManager implements IgniteComponent {
      * value with key will be deleted if there was any mapping.
      *
      * @param vals The map of keys and corresponding values. Cannot be {@code 
null} or empty.
-     * @return Future representing pending completion of the operation. Cannot 
be {@code null}.
      */
-    public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) {
-        return vaultSvc.putAll(vals);
+    public void putAll(Map<ByteArray, byte[]> vals) {
+        vaultSvc.putAll(vals);
     }
 
     /**
      * Persist node name to the vault.
      *
      * @param name node name to persist. Cannot be null.
-     * @return Future representing pending completion of the operation.
      */
-    public CompletableFuture<Void> putName(String name) {
+    public void putName(String name) {
         if (name.isBlank()) {
             throw new IllegalArgumentException("Name must not be empty");
         }
 
-        return put(NODE_NAME, name.getBytes(UTF_8));
+        put(NODE_NAME, name.getBytes(UTF_8));
     }
 
     /**
-     * Returns {@code CompletableFuture} which, when complete, returns the 
node name, if was stored earlier, or {@code null} otherwise.
+     * Returns the node name, if was stored earlier, or {@code null} otherwise.
      */
-    public CompletableFuture<String> name() {
-        return vaultSvc.get(NODE_NAME).thenApply(name -> name == null ? null : 
new String(name.value(), UTF_8));
+    public @Nullable String name() {
+        VaultEntry nameEntry = vaultSvc.get(NODE_NAME);
+
+        return nameEntry == null ? null : new String(nameEntry.value(), UTF_8);
     }
 }
diff --git 
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java
 
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java
index f1035d672f..277bcb3dc4 100644
--- 
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java
+++ 
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.vault;
 
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.util.Cursor;
@@ -37,9 +36,9 @@ public interface VaultService extends ManuallyCloseable {
      * Retrieves an entry for the given key.
      *
      * @param key Key. Cannot be {@code null}.
-     * @return Future that resolves into an entry for the given key, or {@code 
null} no such mapping exists.
+     * @return Entry for the given key, or {@code null} no such mapping exists.
      */
-    CompletableFuture<VaultEntry> get(ByteArray key);
+    @Nullable VaultEntry get(ByteArray key);
 
     /**
      * Writes a given value to the Vault. If the value is {@code null}, then 
the previous value under the same key (if any) will
@@ -48,17 +47,15 @@ public interface VaultService extends ManuallyCloseable {
      * @param key Vault key. Cannot be {@code null}.
      * @param val Value. If value is equal to {@code null}, then the previous 
value under the same key (if any) will
      *      be deleted.
-     * @return Future representing pending completion of the operation. Cannot 
be {@code null}.
      */
-    CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val);
+    void put(ByteArray key, byte @Nullable [] val);
 
     /**
      * Removes a value from the vault.
      *
      * @param key Vault key. Cannot be {@code null}.
-     * @return Future representing pending completion of the operation. Cannot 
be {@code null}.
      */
-    CompletableFuture<Void> remove(ByteArray key);
+    void remove(ByteArray key);
 
     /**
      * Returns a view of the portion of the vault whose keys range from {@code 
fromKey}, inclusive, to {@code toKey}, exclusive.
@@ -74,9 +71,8 @@ public interface VaultService extends ManuallyCloseable {
      * then the corresponding key will be deleted.
      *
      * @param vals The map of keys and corresponding values. Cannot be {@code 
null}.
-     * @return Future representing pending completion of the operation. Cannot 
be {@code null}.
      */
-    CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals);
+    void putAll(Map<ByteArray, byte[]> vals);
 
     /**
      * Closes the service.
diff --git 
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
 
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
index 9ac8cea2d5..047cb8d7b0 100644
--- 
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
+++ 
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
@@ -19,21 +19,11 @@ package org.apache.ignite.internal.vault.persistence;
 
 import java.nio.file.Path;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import org.apache.ignite.internal.future.InFlightFutures;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultService;
 import org.jetbrains.annotations.Nullable;
@@ -58,12 +48,6 @@ public class PersistentVaultService implements VaultService {
         RocksDB.loadLibrary();
     }
 
-    private static final IgniteLogger LOG = 
Loggers.forClass(PersistentVaultService.class);
-
-    private final ExecutorService threadPool;
-
-    private final InFlightFutures futureTracker = new InFlightFutures();
-
     private final Options options = options();
 
     private volatile RocksDB db;
@@ -74,13 +58,10 @@ public class PersistentVaultService implements VaultService 
{
     /**
      * Creates persistent vault service.
      *
-     * @param nodeName Node name.
      * @param path base path for RocksDB
      */
-    public PersistentVaultService(String nodeName, Path path) {
+    public PersistentVaultService(Path path) {
         this.path = path;
-
-        threadPool = Executors.newFixedThreadPool(4, 
NamedThreadFactory.create(nodeName, "vault", LOG));
     }
 
     private static Options options() {
@@ -112,59 +93,44 @@ public class PersistentVaultService implements 
VaultService {
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close() {
-        IgniteUtils.shutdownAndAwaitTermination(threadPool, 10, 
TimeUnit.SECONDS);
-
-        futureTracker.cancelInFlightFutures();
-
-        RocksUtils.closeAll(options, db);
+        RocksUtils.closeAll(db, options);
     }
 
-    /** {@inheritDoc} */
     @Override
-    public CompletableFuture<VaultEntry> get(ByteArray key) {
-        return supplyAsync(() -> {
-            try {
-                byte[] value = db.get(key.bytes());
-
-                return value == null ? null : new VaultEntry(key, value);
-            } catch (RocksDBException e) {
-                throw new IgniteInternalException("Unable to read data from 
RocksDB", e);
-            }
-        });
+    public @Nullable VaultEntry get(ByteArray key) {
+        try {
+            byte[] value = db.get(key.bytes());
+
+            return value == null ? null : new VaultEntry(key, value);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to read data from 
RocksDB", e);
+        }
     }
 
-    /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val) {
-        return runAsync(() -> {
-            try {
-                if (val == null) {
-                    db.delete(key.bytes());
-                } else {
-                    db.put(key.bytes(), val);
-                }
-            } catch (RocksDBException e) {
-                throw new IgniteInternalException("Unable to write data to 
RocksDB", e);
+    public void put(ByteArray key, byte @Nullable [] val) {
+        try {
+            if (val == null) {
+                db.delete(key.bytes());
+            } else {
+                db.put(key.bytes(), val);
             }
-        });
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to write data to 
RocksDB", e);
+        }
     }
 
-    /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> remove(ByteArray key) {
-        return runAsync(() -> {
-            try {
-                db.delete(key.bytes());
-            } catch (RocksDBException e) {
-                throw new IgniteInternalException("Unable to remove data to 
RocksDB", e);
-            }
-        });
+    public void remove(ByteArray key) {
+        try {
+            db.delete(key.bytes());
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to remove data to 
RocksDB", e);
+        }
     }
 
-    /** {@inheritDoc} */
     @Override
     public Cursor<VaultEntry> range(ByteArray fromKey, ByteArray toKey) {
         var readOpts = new ReadOptions();
@@ -187,47 +153,28 @@ public class PersistentVaultService implements 
VaultService {
             public void close() {
                 super.close();
 
-                RocksUtils.closeAll(upperBound, readOpts);
+                RocksUtils.closeAll(readOpts, upperBound);
             }
         };
     }
 
-    /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) {
-        return runAsync(() -> {
-            try (
-                    var writeBatch = new WriteBatch();
-                    var writeOpts = new WriteOptions()
-            ) {
-                for (var entry : vals.entrySet()) {
-                    if (entry.getValue() == null) {
-                        writeBatch.delete(entry.getKey().bytes());
-                    } else {
-                        writeBatch.put(entry.getKey().bytes(), 
entry.getValue());
-                    }
+    public void putAll(Map<ByteArray, byte[]> vals) {
+        try (
+                var writeBatch = new WriteBatch();
+                var writeOpts = new WriteOptions()
+        ) {
+            for (var entry : vals.entrySet()) {
+                if (entry.getValue() == null) {
+                    writeBatch.delete(entry.getKey().bytes());
+                } else {
+                    writeBatch.put(entry.getKey().bytes(), entry.getValue());
                 }
-
-                db.write(writeOpts, writeBatch);
-            } catch (RocksDBException e) {
-                throw new IgniteInternalException("Unable to write data to 
RocksDB", e);
             }
-        });
-    }
 
-    private <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
-        CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier, 
threadPool);
-
-        futureTracker.registerFuture(future);
-
-        return future;
-    }
-
-    private CompletableFuture<Void> runAsync(Runnable runnable) {
-        CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, 
threadPool);
-
-        futureTracker.registerFuture(future);
-
-        return future;
+            db.write(writeOpts, writeBatch);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to write data to 
RocksDB", e);
+        }
     }
 }
diff --git 
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
 
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
index b84af179fb..320d599a54 100644
--- 
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
+++ 
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
@@ -17,9 +17,9 @@
 
 package org.apache.ignite.internal.vault;
 
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -54,15 +54,15 @@ public class VaultManagerTest {
      */
     @Test
     void testName() {
-        assertThat(vaultManager.name(), willBe(nullValue(String.class)));
+        assertThat(vaultManager.name(), is(nullValue(String.class)));
 
-        assertThat(vaultManager.putName("foobar"), 
willBe(nullValue(Void.class)));
+        vaultManager.putName("foobar");
 
-        assertThat(vaultManager.name(), willBe(equalTo("foobar")));
+        assertThat(vaultManager.name(), is(equalTo("foobar")));
 
-        assertThat(vaultManager.putName("foobarbaz"), 
willBe(nullValue(Void.class)));
+        vaultManager.putName("foobarbaz");
 
-        assertThat(vaultManager.name(), willBe(equalTo("foobarbaz")));
+        assertThat(vaultManager.name(), is(equalTo("foobarbaz")));
     }
 
     /**
diff --git 
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
 
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
index c863eb928f..3a60f8b53c 100644
--- 
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
+++ 
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
@@ -19,13 +19,13 @@ package org.apache.ignite.internal.vault;
 
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -76,18 +76,18 @@ public abstract class VaultServiceTest {
     public void testPut() {
         ByteArray key = getKey(1);
 
-        assertThat(vaultService.get(key), willBe(nullValue(VaultEntry.class)));
+        assertThat(vaultService.get(key), is(nullValue(VaultEntry.class)));
 
         byte[] val = getValue(1);
 
-        assertThat(vaultService.put(key, val), willBe(nullValue(Void.class)));
+        vaultService.put(key, val);
 
-        assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key, 
val))));
+        assertThat(vaultService.get(key), is(equalTo(new VaultEntry(key, 
val))));
 
         // test idempotency
-        assertThat(vaultService.put(key, val), willBe(nullValue(Void.class)));
+        vaultService.put(key, val);
 
-        assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key, 
val))));
+        assertThat(vaultService.get(key), is(equalTo(new VaultEntry(key, 
val))));
     }
 
     /**
@@ -99,13 +99,13 @@ public abstract class VaultServiceTest {
 
         byte[] val = getValue(1);
 
-        assertThat(vaultService.put(key, val), willBe(nullValue(Void.class)));
+        vaultService.put(key, val);
 
-        assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key, 
val))));
+        assertThat(vaultService.get(key), is(equalTo(new VaultEntry(key, 
val))));
 
-        assertThat(vaultService.put(key, null), willBe(nullValue(Void.class)));
+        vaultService.put(key, null);
 
-        assertThat(vaultService.get(key), willBe(nullValue(VaultEntry.class)));
+        assertThat(vaultService.get(key), is(nullValue(VaultEntry.class)));
     }
 
     /**
@@ -116,20 +116,20 @@ public abstract class VaultServiceTest {
         ByteArray key = getKey(1);
 
         // Remove non-existent value.
-        assertThat(vaultService.remove(key), willBe(nullValue(Void.class)));
+        assertDoesNotThrow(() -> vaultService.remove(key));
 
-        assertThat(vaultService.get(key), willBe(nullValue(VaultEntry.class)));
+        assertThat(vaultService.get(key), is(nullValue(VaultEntry.class)));
 
         byte[] val = getValue(1);
 
-        assertThat(vaultService.put(key, val), willBe(nullValue(Void.class)));
+        vaultService.put(key, val);
 
-        assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key, 
val))));
+        assertThat(vaultService.get(key), is(equalTo(new VaultEntry(key, 
val))));
 
         // Remove existing value.
-        assertThat(vaultService.remove(key), willBe(nullValue(Void.class)));
+        vaultService.remove(key);
 
-        assertThat(vaultService.get(key), willBe(nullValue(VaultEntry.class)));
+        assertThat(vaultService.get(key), is(nullValue(VaultEntry.class)));
     }
 
     /**
@@ -141,13 +141,13 @@ public abstract class VaultServiceTest {
                 .boxed()
                 .collect(toMap(VaultServiceTest::getKey, 
VaultServiceTest::getValue));
 
-        assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+        vaultService.putAll(batch);
 
-        batch.forEach((k, v) -> assertThat(vaultService.get(k), 
willBe(equalTo(new VaultEntry(k, v)))));
+        batch.forEach((k, v) -> assertThat(vaultService.get(k), is(equalTo(new 
VaultEntry(k, v)))));
 
-        assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+        vaultService.putAll(batch);
 
-        batch.forEach((k, v) -> assertThat(vaultService.get(k), 
willBe(equalTo(new VaultEntry(k, v)))));
+        batch.forEach((k, v) -> assertThat(vaultService.get(k), is(equalTo(new 
VaultEntry(k, v)))));
     }
 
     /**
@@ -159,9 +159,9 @@ public abstract class VaultServiceTest {
                 .boxed()
                 .collect(toMap(VaultServiceTest::getKey, 
VaultServiceTest::getValue));
 
-        assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+        vaultService.putAll(batch);
 
-        batch.forEach((k, v) -> assertThat(vaultService.get(k), 
willBe(equalTo(new VaultEntry(k, v)))));
+        batch.forEach((k, v) -> assertThat(vaultService.get(k), is(equalTo(new 
VaultEntry(k, v)))));
 
         Map<ByteArray, byte[]> secondBatch = new HashMap<>();
 
@@ -170,12 +170,12 @@ public abstract class VaultServiceTest {
         secondBatch.put(getKey(1), null);
         secondBatch.put(getKey(3), null);
 
-        assertThat(vaultService.putAll(secondBatch), 
willBe(nullValue(Void.class)));
+        vaultService.putAll(secondBatch);
 
-        assertThat(vaultService.get(getKey(4)), willBe(equalTo(new 
VaultEntry(getKey(4), getValue(3)))));
-        assertThat(vaultService.get(getKey(8)), willBe(equalTo(new 
VaultEntry(getKey(8), getValue(3)))));
-        assertThat(vaultService.get(getKey(1)), 
willBe(nullValue(VaultEntry.class)));
-        assertThat(vaultService.get(getKey(3)), 
willBe(nullValue(VaultEntry.class)));
+        assertThat(vaultService.get(getKey(4)), is(equalTo(new 
VaultEntry(getKey(4), getValue(3)))));
+        assertThat(vaultService.get(getKey(8)), is(equalTo(new 
VaultEntry(getKey(8), getValue(3)))));
+        assertThat(vaultService.get(getKey(1)), 
is(nullValue(VaultEntry.class)));
+        assertThat(vaultService.get(getKey(3)), 
is(nullValue(VaultEntry.class)));
     }
 
     /**
@@ -187,7 +187,7 @@ public abstract class VaultServiceTest {
 
         Map<ByteArray, byte[]> batch = 
entries.stream().collect(toMap(VaultEntry::key, VaultEntry::value));
 
-        assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+        vaultService.putAll(batch);
 
         List<VaultEntry> range = range(getKey(3), getKey(7));
 
@@ -203,7 +203,7 @@ public abstract class VaultServiceTest {
 
         Map<ByteArray, byte[]> batch = 
entries.stream().collect(toMap(VaultEntry::key, VaultEntry::value));
 
-        assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+        vaultService.putAll(batch);
 
         List<VaultEntry> range = range(getKey(0), getKey(9));
 
@@ -219,7 +219,7 @@ public abstract class VaultServiceTest {
 
         Map<ByteArray, byte[]> batch = 
entries.stream().collect(toMap(VaultEntry::key, VaultEntry::value));
 
-        assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+        vaultService.putAll(batch);
 
         List<VaultEntry> range = range(getKey(3), getKey(4));
 
@@ -233,7 +233,7 @@ public abstract class VaultServiceTest {
     public void testRangeInvalidBoundaries() throws Exception {
         Map<ByteArray, byte[]> batch = getRange(3, 
5).stream().collect(toMap(VaultEntry::key, VaultEntry::value));
 
-        assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class)));
+        vaultService.putAll(batch);
 
         List<VaultEntry> range = range(getKey(4), getKey(1));
 
diff --git 
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistencePropertiesVaultServiceTest.java
 
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistencePropertiesVaultServiceTest.java
index 134514361e..291a56bf1a 100644
--- 
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistencePropertiesVaultServiceTest.java
+++ 
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistencePropertiesVaultServiceTest.java
@@ -18,11 +18,9 @@
 package org.apache.ignite.internal.vault.persistence;
 
 import static java.util.stream.Collectors.toList;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
 
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
@@ -48,39 +46,37 @@ class ItPersistencePropertiesVaultServiceTest {
      * Tests that the Vault Service correctly persists data after multiple 
service restarts.
      */
     @Test
-    void testPersistentRestart() throws Exception {
+    void testPersistentRestart() {
         var data = Map.of(
                 new ByteArray("key" + 1), fromString("value" + 1),
                 new ByteArray("key" + 2), fromString("value" + 2),
                 new ByteArray("key" + 3), fromString("value" + 3)
         );
 
-        String nodeName = "test";
-
-        var service = new PersistentVaultService(nodeName, vaultDir);
+        var service = new PersistentVaultService(vaultDir);
 
         try {
             service.start();
 
-            assertThat(service.putAll(data), willBe(nullValue(Void.class)));
+            service.putAll(data);
         } finally {
             service.close();
         }
 
-        service = new PersistentVaultService(nodeName, vaultDir);
+        service = new PersistentVaultService(vaultDir);
 
         try {
             service.start();
 
             assertThat(
                     service.get(new ByteArray("key" + 1)),
-                    willBe(equalTo(new VaultEntry(new ByteArray("key" + 1), 
fromString("value" + 1))))
+                    is(equalTo(new VaultEntry(new ByteArray("key" + 1), 
fromString("value" + 1))))
             );
         } finally {
             service.close();
         }
 
-        service = new PersistentVaultService(nodeName, vaultDir);
+        service = new PersistentVaultService(vaultDir);
 
         try {
             service.start();
diff --git 
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistentVaultServiceTest.java
 
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistentVaultServiceTest.java
index 521cd3ddc4..1a8a077e91 100644
--- 
a/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistentVaultServiceTest.java
+++ 
b/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistentVaultServiceTest.java
@@ -35,6 +35,6 @@ class ItPersistentVaultServiceTest extends VaultServiceTest {
     /** {@inheritDoc} */
     @Override
     protected VaultService getVaultService() {
-        return new PersistentVaultService("test", vaultDir);
+        return new PersistentVaultService(vaultDir);
     }
 }
diff --git 
a/modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
 
b/modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
index 6dc5916ff5..87188e0afd 100644
--- 
a/modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
+++ 
b/modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
@@ -17,17 +17,15 @@
 
 package org.apache.ignite.internal.vault.inmemory;
 
-import static java.util.concurrent.CompletableFuture.runAsync;
-import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toList;
 
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.CursorUtils;
 import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultService;
 import org.jetbrains.annotations.Nullable;
@@ -47,75 +45,57 @@ public class InMemoryVaultService implements VaultService {
         // No-op.
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close() {
         // No-op.
     }
 
-    /** {@inheritDoc} */
     @Override
-    public CompletableFuture<VaultEntry> get(ByteArray key) {
-        return supplyAsync(() -> {
-            synchronized (mux) {
-                byte[] value = storage.get(key);
+    public @Nullable VaultEntry get(ByteArray key) {
+        synchronized (mux) {
+            byte[] value = storage.get(key);
 
-                return value == null ? null : new VaultEntry(key, 
storage.get(key));
-            }
-        });
+            return value == null ? null : new VaultEntry(key, value);
+        }
     }
 
-    /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val) {
-        return runAsync(() -> {
-            synchronized (mux) {
-                storage.put(key, val);
-            }
-        });
+    public void put(ByteArray key, byte @Nullable [] val) {
+        synchronized (mux) {
+            storage.put(key, val);
+        }
     }
 
-    /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> remove(ByteArray key) {
-        return runAsync(() -> {
-            synchronized (mux) {
-                storage.remove(key);
-            }
-        });
+    public void remove(ByteArray key) {
+        synchronized (mux) {
+            storage.remove(key);
+        }
     }
 
-    /** {@inheritDoc} */
     @Override
     public Cursor<VaultEntry> range(ByteArray fromKey, ByteArray toKey) {
-        Iterator<VaultEntry> it;
-
         if (fromKey.compareTo(toKey) >= 0) {
-            it = Collections.emptyIterator();
-        } else {
-            synchronized (mux) {
-                it = storage.subMap(fromKey, toKey).entrySet().stream()
-                        .map(e -> new VaultEntry(new ByteArray(e.getKey()), 
e.getValue()))
-                        .iterator();
-            }
+            return CursorUtils.emptyCursor();
         }
 
-        return Cursor.fromBareIterator(it);
+        synchronized (mux) {
+            return storage.subMap(fromKey, toKey).entrySet().stream()
+                    .map(e -> new VaultEntry(new ByteArray(e.getKey()), 
e.getValue()))
+                    .collect(collectingAndThen(toList(), 
Cursor::fromIterable));
+        }
     }
 
-    /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) {
-        return runAsync(() -> {
-            synchronized (mux) {
-                for (var entry : vals.entrySet()) {
-                    if (entry.getValue() == null) {
-                        storage.remove(entry.getKey());
-                    } else {
-                        storage.put(entry.getKey(), entry.getValue());
-                    }
+    public void putAll(Map<ByteArray, byte[]> vals) {
+        synchronized (mux) {
+            for (Map.Entry<ByteArray, byte[]> entry : vals.entrySet()) {
+                if (entry.getValue() == null) {
+                    storage.remove(entry.getKey());
+                } else {
+                    storage.put(entry.getKey(), entry.getValue());
                 }
             }
-        });
+        }
     }
 }

Reply via email to