This is an automated email from the ASF dual-hosted git repository. apolovtsev pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 4363510950 IGNITE-21496 Make Vault methods synchronous (#3189) 4363510950 is described below commit 4363510950a71d6641e31aa83de35fc40434b11e Author: Alexander Polovtcev <alex.polovt...@gmail.com> AuthorDate: Fri Feb 9 11:54:43 2024 +0200 IGNITE-21496 Make Vault methods synchronous (#3189) --- .../management/ClusterManagementGroupManager.java | 24 ++-- .../cluster/management/LocalStateStorage.java | 23 ++-- .../internal/cluster/management/MockNode.java | 4 +- ...niteDistributionZoneManagerNodeRestartTest.java | 4 +- .../internal/network/recovery/VaultStaleIds.java | 6 +- .../network/recovery/VaultStaleIdsTest.java | 51 +++----- modules/platforms/cpp/ignite/common/error_codes.h | 0 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 0 .../ItDistributedConfigurationStorageTest.java | 3 +- .../runner/app/ItIgniteNodeRestartTest.java | 4 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 8 +- .../storage/LocalConfigurationStorage.java | 27 +++-- .../ignite/internal/BaseIgniteRestartTest.java | 4 +- .../rebalance/ItRebalanceDistributedTest.java | 6 +- .../internal/table/distributed/LowWatermark.java | 80 ++++++------- .../table/distributed/LowWatermarkTest.java | 13 +- .../table/distributed/TableManagerTest.java | 4 - .../apache/ignite/internal/vault/VaultManager.java | 34 +++--- .../apache/ignite/internal/vault/VaultService.java | 14 +-- .../vault/persistence/PersistentVaultService.java | 133 +++++++-------------- .../ignite/internal/vault/VaultManagerTest.java | 12 +- .../ignite/internal/vault/VaultServiceTest.java | 62 +++++----- .../ItPersistencePropertiesVaultServiceTest.java | 16 +-- .../persistence/ItPersistentVaultServiceTest.java | 2 +- .../vault/inmemory/InMemoryVaultService.java | 80 +++++-------- 25 files changed, 243 insertions(+), 371 deletions(-) diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java index 372ce984fd..0ef44d4dfb 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java @@ -247,17 +247,7 @@ public class ClusterManagementGroupManager implements IgniteComponent { */ @Nullable private CompletableFuture<CmgRaftService> recoverLocalState() { - LocalState localState; - - try { - localState = localStateStorage.getLocalState().get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInternalException("Interrupted while retrieving local CMG state", e); - } catch (ExecutionException e) { - throw new IgniteInternalException("Error while retrieving local CMG state", e); - } + LocalState localState = localStateStorage.getLocalState(); if (localState == null) { return null; @@ -336,8 +326,9 @@ public class ClusterManagementGroupManager implements IgniteComponent { .thenCompose(state -> { var localState = new LocalState(state.cmgNodes(), state.clusterTag()); - return localStateStorage.saveLocalState(localState) - .thenCompose(v -> joinCluster(service, state.clusterTag())); + localStateStorage.saveLocalState(localState); + + return joinCluster(service, state.clusterTag()); }); } @@ -440,7 +431,7 @@ public class ClusterManagementGroupManager implements IgniteComponent { raftManager.stopRaftNodes(CmgGroupId.INSTANCE); - localStateStorage.clear().get(); + localStateStorage.clear(); } catch (Exception e) { throw new IgniteInternalException("Error when cleaning the CMG state", e); } @@ -578,8 +569,9 @@ public class ClusterManagementGroupManager implements IgniteComponent { .thenCompose(service -> { var localState = new LocalState(state.cmgNodes(), state.clusterTag()); - return localStateStorage.saveLocalState(localState) - .thenCompose(v -> joinCluster(service, state.clusterTag())); + localStateStorage.saveLocalState(localState); + + return joinCluster(service, state.clusterTag()); }); } diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java index 9a6bcd8438..d708e5e038 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java @@ -19,10 +19,11 @@ package org.apache.ignite.internal.cluster.management; import java.io.Serializable; import java.util.Set; -import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.internal.vault.VaultEntry; import org.apache.ignite.internal.vault.VaultManager; +import org.jetbrains.annotations.Nullable; /** * Class that represents a local CMG state (persisted in the Vault). @@ -33,6 +34,8 @@ class LocalStateStorage { private static final ByteArray CMG_STATE_VAULT_KEY = ByteArray.fromString("cmg_state"); static class LocalState implements Serializable { + private static final long serialVersionUID = -5069326157367860480L; + private final Set<String> cmgNodeNames; private final ClusterTag clusterTag; @@ -62,27 +65,25 @@ class LocalStateStorage { * * @return Local state. */ - CompletableFuture<LocalState> getLocalState() { - return vault.get(CMG_STATE_VAULT_KEY) - .thenApply(entry -> entry == null ? null : ByteUtils.fromBytes(entry.value())); + @Nullable LocalState getLocalState() { + VaultEntry entry = vault.get(CMG_STATE_VAULT_KEY); + + return entry == null ? null : ByteUtils.fromBytes(entry.value()); } /** * Saves a given local state. * * @param state Local state to save. - * @return Future that represents the state of the operation. */ - CompletableFuture<Void> saveLocalState(LocalState state) { - return vault.put(CMG_STATE_VAULT_KEY, ByteUtils.toBytes(state)); + void saveLocalState(LocalState state) { + vault.put(CMG_STATE_VAULT_KEY, ByteUtils.toBytes(state)); } /** * Removes all data from the local storage. - * - * @return Future that represents the state of the operation. */ - CompletableFuture<Void> clear() { - return vault.remove(CMG_STATE_VAULT_KEY); + void clear() { + vault.remove(CMG_STATE_VAULT_KEY); } } diff --git a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java index d4e3c87261..8db7c05a7f 100644 --- a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java +++ b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java @@ -18,8 +18,6 @@ package org.apache.ignite.internal.cluster.management; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; - import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -105,7 +103,7 @@ public class MockNode { private void init(int port) throws IOException { Path vaultDir = workDir.resolve("vault"); - var vaultManager = new VaultManager(new PersistentVaultService(testNodeName(testInfo, port), Files.createDirectories(vaultDir))); + var vaultManager = new VaultManager(new PersistentVaultService(Files.createDirectories(vaultDir))); this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder); diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java index 8bc669ef5b..76516e2e88 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java @@ -161,7 +161,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe List<IgniteComponent> components = new ArrayList<>(); - VaultManager vault = createVault(name, dir); + VaultManager vault = createVault(dir); ConfigurationModules modules = loadConfigurationModules(log, Thread.currentThread().getContextClassLoader()); @@ -261,7 +261,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe // Start. vault.start(); - vault.putName(name).join(); + vault.putName(name); nodeCfgMgr.start(); diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java index 2ad20206e4..d02f94b5e7 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java @@ -28,7 +28,7 @@ import org.apache.ignite.internal.vault.VaultEntry; import org.apache.ignite.internal.vault.VaultManager; /** - * {@link StaleIds} implementating using Vault as a persistent storage. + * {@link StaleIds} implementation using Vault as a persistent storage. */ public class VaultStaleIds implements StaleIds { private static final ByteArray STALE_IDS_KEY = new ByteArray("network.staleIds"); @@ -64,7 +64,7 @@ public class VaultStaleIds implements StaleIds { } private Set<String> loadStaleIdsFromVault() { - VaultEntry entry = vaultManager.get(STALE_IDS_KEY).join(); + VaultEntry entry = vaultManager.get(STALE_IDS_KEY); if (entry == null) { return new LinkedHashSet<>(); @@ -99,6 +99,6 @@ public class VaultStaleIds implements StaleIds { private void saveIdsToVault() { String joinedIds = String.join("\n", staleIds); - vaultManager.put(STALE_IDS_KEY, joinedIds.getBytes(UTF_8)).join(); + vaultManager.put(STALE_IDS_KEY, joinedIds.getBytes(UTF_8)); } } diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java index c3c4b9b38c..7154edfeb3 100644 --- a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java +++ b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java @@ -18,19 +18,14 @@ package org.apache.ignite.internal.network.recovery; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.vault.VaultEntry; @@ -38,6 +33,7 @@ import org.apache.ignite.internal.vault.VaultManager; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -57,8 +53,7 @@ class VaultStaleIdsTest extends BaseIgniteAbstractTest { @Test void consultsVaultWhenCheckingForStaleness() { - doReturn(completedFuture(new VaultEntry(staleIdsKey, "id1\nid2\nid3".getBytes(UTF_8)))) - .when(vaultManager).get(staleIdsKey); + when(vaultManager.get(staleIdsKey)).thenReturn(new VaultEntry(staleIdsKey, "id1\nid2\nid3".getBytes(UTF_8))); assertThat(staleIds.isIdStale("id1"), is(true)); assertThat(staleIds.isIdStale("id2"), is(true)); @@ -68,56 +63,44 @@ class VaultStaleIdsTest extends BaseIgniteAbstractTest { @Test void cachesVaultStateInMemory() { - doReturn(completedFuture(new VaultEntry(staleIdsKey, "id1\nid2\nid3".getBytes(UTF_8)))) - .when(vaultManager).get(staleIdsKey); + when(vaultManager.get(staleIdsKey)).thenReturn(new VaultEntry(staleIdsKey, "id1\nid2\nid3".getBytes(UTF_8))); staleIds.isIdStale("id1"); staleIds.isIdStale("id2"); staleIds.isIdStale("id3"); - verify(vaultManager, times(1)).get(any()); + verify(vaultManager).get(any()); } @Test void savesNewStaleIdsToVault() { - doReturn(nullCompletedFuture()).when(vaultManager).get(staleIdsKey); - doReturn(nullCompletedFuture()) - .when(vaultManager).put(staleIdsKey, "id2".getBytes(UTF_8)); - doReturn(nullCompletedFuture()) - .when(vaultManager).put(staleIdsKey, "id2\nid1".getBytes(UTF_8)); - staleIds.markAsStale("id2"); + + verify(vaultManager).put(staleIdsKey, "id2".getBytes(UTF_8)); + staleIds.markAsStale("id1"); + + verify(vaultManager).put(staleIdsKey, "id2\nid1".getBytes(UTF_8)); } @Test void respectsMaxIdsLimit() { staleIds = new VaultStaleIds(vaultManager, 2); - doReturn(nullCompletedFuture()).when(vaultManager).get(staleIdsKey); - - AtomicReference<String> lastSavedIds = new AtomicReference<>(); - - doAnswer(invocation -> { - byte[] value = invocation.getArgument(1); - - lastSavedIds.set(new String(value, UTF_8)); - - return nullCompletedFuture(); - }).when(vaultManager).put(eq(staleIdsKey), any()); - staleIds.markAsStale("id3"); staleIds.markAsStale("id2"); staleIds.markAsStale("id1"); - assertThat(lastSavedIds.get(), is("id2\nid1")); + ArgumentCaptor<byte[]> idsCaptor = ArgumentCaptor.forClass(byte[].class); + + verify(vaultManager, times(3)).put(eq(staleIdsKey), idsCaptor.capture()); + + assertThat(idsCaptor.getValue(), is("id2\nid1".getBytes(UTF_8))); } @Test void loadsBeforeDoingFirstSave() { - lenient().doReturn(completedFuture(new VaultEntry(staleIdsKey, "id1".getBytes(UTF_8)))) - .when(vaultManager).get(staleIdsKey); - doReturn(nullCompletedFuture()).when(vaultManager).put(eq(staleIdsKey), any()); + when(vaultManager.get(staleIdsKey)).thenReturn(new VaultEntry(staleIdsKey, "id1".getBytes(UTF_8))); staleIds.markAsStale("id2"); diff --git a/modules/platforms/cpp/ignite/common/error_codes.h b/modules/platforms/cpp/ignite/common/error_codes.h old mode 100755 new mode 100644 diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs old mode 100755 new mode 100644 diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java index 8e2258f211..d0de55fb56 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.configuration.storage; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; @@ -114,7 +113,7 @@ public class ItDistributedConfigurationStorageTest extends BaseIgniteAbstractTes Node(TestInfo testInfo, Path workDir) { var addr = new NetworkAddress("localhost", 10000); - vaultManager = new VaultManager(new PersistentVaultService(testNodeName(testInfo, addr.port()), workDir.resolve("vault"))); + vaultManager = new VaultManager(new PersistentVaultService(workDir.resolve("vault"))); clusterService = ClusterServiceTestUtils.clusterService( testInfo, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 13a685d976..5dfd681e2b 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -268,7 +268,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { List<IgniteComponent> components = new ArrayList<>(); - VaultManager vault = createVault(name, dir); + VaultManager vault = createVault(dir); ConfigurationModules modules = loadConfigurationModules(log, Thread.currentThread().getContextClassLoader()); @@ -561,7 +561,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { // Start. vault.start(); - vault.putName(name).join(); + vault.putName(name); nodeCfgMgr.start(); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 2f76950b2f..8aac39c636 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -367,7 +367,7 @@ public class IgniteImpl implements Ignite { threadPoolsManager = new ThreadPoolsManager(name); - vaultMgr = createVault(name, workDir); + vaultMgr = createVault(workDir); metricManager = new MetricManager(); @@ -879,7 +879,7 @@ public class IgniteImpl implements Ignite { lifecycleManager.startComponent(vaultMgr); - vaultMgr.putName(name).get(); + vaultMgr.putName(name); // Node configuration manager startup. lifecycleManager.startComponent(nodeCfgMgr); @@ -1215,7 +1215,7 @@ public class IgniteImpl implements Ignite { /** * Starts the Vault component. */ - private static VaultManager createVault(String nodeName, Path workDir) { + private static VaultManager createVault(Path workDir) { Path vaultPath = workDir.resolve(VAULT_DB_PATH); try { @@ -1224,7 +1224,7 @@ public class IgniteImpl implements Ignite { throw new IgniteInternalException(e); } - return new VaultManager(new PersistentVaultService(nodeName, vaultPath)); + return new VaultManager(new PersistentVaultService(vaultPath)); } /** diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java index 52c636cd9f..ee4c143dcc 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java @@ -113,11 +113,15 @@ public class LocalConfigurationStorage implements ConfigurationStorage { /** {@inheritDoc} */ @Override public CompletableFuture<Serializable> readLatest(String key) { - return vaultMgr.get(new ByteArray(LOC_PREFIX + key)) - .thenApply(entry -> entry == null ? null : fromBytes(entry.value())) - .exceptionally(e -> { - throw new StorageException("Exception while reading vault entry", e); - }); + return registerFuture(supplyAsync(() -> { + try { + VaultEntry entry = vaultMgr.get(new ByteArray(LOC_PREFIX + key)); + + return entry == null ? null : fromBytes(entry.value()); + } catch (Exception e) { + throw new StorageException("Exception while reading vault entry", e); + } + }, threadPool)); } /** {@inheritDoc} */ @@ -189,9 +193,9 @@ public class LocalConfigurationStorage implements ConfigurationStorage { Data entries = new Data(newValues, version + 1); - return vaultMgr.putAll(data) - .thenCompose(v -> lsnr.onEntriesChanged(entries)) - .thenApply(v -> true); + vaultMgr.putAll(data); + + return lsnr.onEntriesChanged(entries).thenApply(v -> true); }, threadPool)); // ignore any errors on the write future, because we are only interested in its completion @@ -222,8 +226,11 @@ public class LocalConfigurationStorage implements ConfigurationStorage { /** {@inheritDoc} */ @Override public CompletableFuture<Long> lastRevision() { - return vaultMgr.get(VERSION_KEY) - .thenApply(entry -> entry == null ? 0 : (Long) fromBytes(entry.value())); + return registerFuture(supplyAsync(() -> { + VaultEntry entry = vaultMgr.get(VERSION_KEY); + + return entry == null ? 0 : (Long) fromBytes(entry.value()); + }, threadPool)); } @Override diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java index 1afc9df950..a731be183d 100644 --- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java +++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java @@ -180,7 +180,7 @@ public abstract class BaseIgniteRestartTest extends IgniteAbstractTest { /** * Starts the Vault component. */ - public static VaultManager createVault(String nodeName, Path workDir) { + public static VaultManager createVault(Path workDir) { Path vaultPath = workDir.resolve(Paths.get("vault")); try { @@ -189,7 +189,7 @@ public abstract class BaseIgniteRestartTest extends IgniteAbstractTest { throw new IgniteInternalException(e); } - return new VaultManager(new PersistentVaultService(nodeName, vaultPath)); + return new VaultManager(new PersistentVaultService(vaultPath)); } /** diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index dd61c4978f..c5671db508 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -931,7 +931,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { Path dir = workDir.resolve(name); - vaultManager = createVault(name, dir); + vaultManager = createVault(dir); nodeCfgGenerator = new ConfigurationTreeGenerator( List.of( @@ -1287,8 +1287,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { /** * Starts the Vault component. */ - private static VaultManager createVault(String nodeName, Path workDir) { - return new VaultManager(new PersistentVaultService(nodeName, resolveDir(workDir, "vault"))); + private static VaultManager createVault(Path workDir) { + return new VaultManager(new PersistentVaultService(resolveDir(workDir, "vault"))); } private static Path resolveDir(Path workDir, String dirName) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java index 4db46d68b0..c0cf426414 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.table.distributed; import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR; -import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import java.util.concurrent.Executors; @@ -43,6 +42,7 @@ import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.vault.VaultEntry; import org.apache.ignite.internal.vault.VaultManager; import org.jetbrains.annotations.Nullable; @@ -76,7 +76,7 @@ public class LowWatermark implements ManuallyCloseable { private final AtomicBoolean closeGuard = new AtomicBoolean(); - private final AtomicReference<HybridTimestamp> lowWatermark = new AtomicReference<>(); + private volatile HybridTimestamp lowWatermark; private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture = new AtomicReference<>(); @@ -119,50 +119,45 @@ public class LowWatermark implements ManuallyCloseable { */ public void start() { inBusyLock(busyLock, () -> { - vaultManager.get(LOW_WATERMARK_VAULT_KEY) - .thenCompose(vaultEntry -> inBusyLock(busyLock, () -> { - if (vaultEntry == null) { - scheduleUpdateLowWatermarkBusy(); + HybridTimestamp lowWatermark = readLowWatermarkFromVault(); - return nullCompletedFuture(); - } + if (lowWatermark == null) { + LOG.info("Previous value of the low watermark was not found, will schedule to update it"); + + scheduleUpdateLowWatermarkBusy(); - HybridTimestamp lowWatermark = ByteUtils.fromBytes(vaultEntry.value()); + return; + } - return txManager.updateLowWatermark(lowWatermark) - .thenApply(unused -> { - this.lowWatermark.set(lowWatermark); + LOG.info( + "Low watermark has been successfully retrieved from the vault and is scheduled to be updated: {}", + lowWatermark + ); - runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark); + txManager.updateLowWatermark(lowWatermark) + .thenRun(() -> inBusyLock(busyLock, () -> { + this.lowWatermark = lowWatermark; - return lowWatermark; - }); + runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark); })) - .whenComplete((lowWatermark, throwable) -> { - if (throwable != null) { - if (!(throwable instanceof NodeStoppingException)) { - LOG.error("Error getting low watermark", throwable); + .whenComplete((unused, throwable) -> { + if (throwable != null && !(throwable instanceof NodeStoppingException)) { + LOG.error("Error during the Watermark manager start", throwable); - failureProcessor.process(new FailureContext(CRITICAL_ERROR, throwable)); + failureProcessor.process(new FailureContext(CRITICAL_ERROR, throwable)); - inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy); - } - } else { - if (lowWatermark == null) { - LOG.info( - "Previous value of the low watermark was not found, will schedule to update it" - ); - } else { - LOG.info( - "Low watermark has been successfully got from the vault and is scheduled to be updated: {}", - lowWatermark - ); - } + inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy); } }); }); } + private @Nullable HybridTimestamp readLowWatermarkFromVault() { + VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY); + + return vaultEntry == null ? null : ByteUtils.fromBytes(vaultEntry.value()); + } + @Override public void close() { if (!closeGuard.compareAndSet(false, true)) { @@ -184,7 +179,7 @@ public class LowWatermark implements ManuallyCloseable { * Returns the current low watermark, {@code null} means no low watermark has been assigned yet. */ public @Nullable HybridTimestamp getLowWatermark() { - return lowWatermark.get(); + return lowWatermark; } void updateLowWatermark() { @@ -195,18 +190,13 @@ public class LowWatermark implements ManuallyCloseable { // created, then we can safely promote the candidate as a new low watermark, store it in vault, and we can safely start cleaning // up the stale/junk data in the tables. txManager.updateLowWatermark(lowWatermarkCandidate) - .thenComposeAsync( - unused -> inBusyLock( - busyLock, - () -> vaultManager.put(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(lowWatermarkCandidate)) - ), - scheduledThreadPool - ) - .thenRun(() -> inBusyLock(busyLock, () -> { - lowWatermark.set(lowWatermarkCandidate); + .thenRunAsync(() -> inBusyLock(busyLock, () -> { + vaultManager.put(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(lowWatermarkCandidate)); + + lowWatermark = lowWatermarkCandidate; runGcAndScheduleUpdateLowWatermarkBusy(lowWatermarkCandidate); - })) + }), scheduledThreadPool) .whenComplete((unused, throwable) -> { if (throwable != null) { if (!(throwable instanceof NodeStoppingException)) { @@ -250,7 +240,7 @@ public class LowWatermark implements ManuallyCloseable { -lowWatermarkConfig.dataAvailabilityTime().value() - getMaxClockSkew() ); - HybridTimestamp lowWatermark = this.lowWatermark.get(); + HybridTimestamp lowWatermark = this.lowWatermark; assert lowWatermark == null || lowWatermarkCandidate.compareTo(lowWatermark) > 0 : "lowWatermark=" + lowWatermark + ", lowWatermarkCandidate=" + lowWatermarkCandidate; diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java index 7de224a2c4..42997ac27c 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.table.distributed; -import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.table.distributed.LowWatermark.LOW_WATERMARK_VAULT_KEY; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; @@ -46,7 +45,6 @@ import org.apache.ignite.internal.failure.FailureProcessor; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; import org.apache.ignite.internal.table.distributed.gc.MvGc; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; @@ -84,15 +82,13 @@ public class LowWatermarkTest extends BaseIgniteAbstractTest { } @AfterEach - void tearDown() throws Exception { + void tearDown() { lowWatermark.close(); } @Test void testStartWithEmptyVault() { // Let's check the start with no low watermark in vault. - when(vaultManager.get(LOW_WATERMARK_VAULT_KEY)).thenReturn(nullCompletedFuture()); - lowWatermark.start(); verify(mvGc, never()).updateLowWatermark(any(HybridTimestamp.class)); @@ -104,7 +100,7 @@ public class LowWatermarkTest extends BaseIgniteAbstractTest { HybridTimestamp lowWatermark = new HybridTimestamp(10, 10); when(vaultManager.get(LOW_WATERMARK_VAULT_KEY)) - .thenReturn(completedFuture(new VaultEntry(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(lowWatermark)))); + .thenReturn(new VaultEntry(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(lowWatermark))); when(txManager.updateLowWatermark(any(HybridTimestamp.class))).thenReturn(nullCompletedFuture()); @@ -134,8 +130,6 @@ public class LowWatermarkTest extends BaseIgniteAbstractTest { when(txManager.updateLowWatermark(any(HybridTimestamp.class))).thenReturn(nullCompletedFuture()); - when(vaultManager.put(any(ByteArray.class), any(byte[].class))).thenReturn(nullCompletedFuture()); - // Make a predictable candidate to make it easier to test. HybridTimestamp newLowWatermarkCandidate = lowWatermark.createNewLowWatermarkCandidate(); @@ -157,11 +151,8 @@ public class LowWatermarkTest extends BaseIgniteAbstractTest { */ @Test void testUpdateWatermarkSequentially() throws Exception { - when(vaultManager.get(LOW_WATERMARK_VAULT_KEY)).thenReturn(nullCompletedFuture()); assertThat(lowWatermarkConfig.updateFrequency().update(10L), willSucceedFast()); - when(vaultManager.put(any(ByteArray.class), any(byte[].class))).thenReturn(nullCompletedFuture()); - CountDownLatch startGetAllReadOnlyTransactions = new CountDownLatch(3); CompletableFuture<Void> finishGetAllReadOnlyTransactions = new CompletableFuture<>(); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index 7161778da0..8e2052afcb 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -77,7 +77,6 @@ import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil; import org.apache.ignite.internal.failure.FailureProcessor; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; -import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.metastorage.MetaStorageManager; @@ -704,9 +703,6 @@ public class TableManagerTest extends IgniteAbstractTest { Consumer<TxStateTableStorage> txStateTableStorageDecorator) { VaultManager vaultManager = mock(VaultManager.class); - when(vaultManager.get(any(ByteArray.class))).thenReturn(nullCompletedFuture()); - when(vaultManager.put(any(ByteArray.class), any(byte[].class))).thenReturn(nullCompletedFuture()); - TableManager tableManager = new TableManager( NODE_NAME, revisionUpdater, diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java index 02e910dc54..d94f6435d8 100644 --- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java +++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java @@ -46,7 +46,6 @@ public class VaultManager implements IgniteComponent { this.vaultSvc = vaultSvc; } - /** {@inheritDoc} */ @Override public CompletableFuture<Void> start() { vaultSvc.start(); @@ -54,7 +53,6 @@ public class VaultManager implements IgniteComponent { return nullCompletedFuture(); } - /** {@inheritDoc} */ @Override public void stop() { // TODO: IGNITE-15161 Implement component's stop. @@ -65,9 +63,9 @@ public class VaultManager implements IgniteComponent { * See {@link VaultService#get}. * * @param key Key. Cannot be {@code null}. - * @return Future that resolves into an entry for the given key, or {@code null} if no such mapping exists. + * @return Entry for the given key, or {@code null} if no such mapping exists. */ - public CompletableFuture<VaultEntry> get(ByteArray key) { + public @Nullable VaultEntry get(ByteArray key) { return vaultSvc.get(key); } @@ -76,20 +74,18 @@ public class VaultManager implements IgniteComponent { * * @param key Vault key. Cannot be {@code null}. * @param val Value. If value is equal to {@code null}, then previous value with key will be deleted if there was any mapping. - * @return Future representing pending completion of the operation. Cannot be {@code null}. */ - public CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val) { - return vaultSvc.put(key, val); + public void put(ByteArray key, byte @Nullable [] val) { + vaultSvc.put(key, val); } /** * See {@link VaultService#remove}. * * @param key Vault key. Cannot be {@code null}. - * @return Future representing pending completion of the operation. Cannot be {@code null}. */ - public CompletableFuture<Void> remove(ByteArray key) { - return vaultSvc.remove(key); + public void remove(ByteArray key) { + vaultSvc.remove(key); } /** @@ -108,30 +104,30 @@ public class VaultManager implements IgniteComponent { * value with key will be deleted if there was any mapping. * * @param vals The map of keys and corresponding values. Cannot be {@code null} or empty. - * @return Future representing pending completion of the operation. Cannot be {@code null}. */ - public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) { - return vaultSvc.putAll(vals); + public void putAll(Map<ByteArray, byte[]> vals) { + vaultSvc.putAll(vals); } /** * Persist node name to the vault. * * @param name node name to persist. Cannot be null. - * @return Future representing pending completion of the operation. */ - public CompletableFuture<Void> putName(String name) { + public void putName(String name) { if (name.isBlank()) { throw new IllegalArgumentException("Name must not be empty"); } - return put(NODE_NAME, name.getBytes(UTF_8)); + put(NODE_NAME, name.getBytes(UTF_8)); } /** - * Returns {@code CompletableFuture} which, when complete, returns the node name, if was stored earlier, or {@code null} otherwise. + * Returns the node name, if was stored earlier, or {@code null} otherwise. */ - public CompletableFuture<String> name() { - return vaultSvc.get(NODE_NAME).thenApply(name -> name == null ? null : new String(name.value(), UTF_8)); + public @Nullable String name() { + VaultEntry nameEntry = vaultSvc.get(NODE_NAME); + + return nameEntry == null ? null : new String(nameEntry.value(), UTF_8); } } diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java index f1035d672f..277bcb3dc4 100644 --- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java +++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.vault; import java.util.Map; -import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.util.Cursor; @@ -37,9 +36,9 @@ public interface VaultService extends ManuallyCloseable { * Retrieves an entry for the given key. * * @param key Key. Cannot be {@code null}. - * @return Future that resolves into an entry for the given key, or {@code null} no such mapping exists. + * @return Entry for the given key, or {@code null} no such mapping exists. */ - CompletableFuture<VaultEntry> get(ByteArray key); + @Nullable VaultEntry get(ByteArray key); /** * Writes a given value to the Vault. If the value is {@code null}, then the previous value under the same key (if any) will @@ -48,17 +47,15 @@ public interface VaultService extends ManuallyCloseable { * @param key Vault key. Cannot be {@code null}. * @param val Value. If value is equal to {@code null}, then the previous value under the same key (if any) will * be deleted. - * @return Future representing pending completion of the operation. Cannot be {@code null}. */ - CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val); + void put(ByteArray key, byte @Nullable [] val); /** * Removes a value from the vault. * * @param key Vault key. Cannot be {@code null}. - * @return Future representing pending completion of the operation. Cannot be {@code null}. */ - CompletableFuture<Void> remove(ByteArray key); + void remove(ByteArray key); /** * Returns a view of the portion of the vault whose keys range from {@code fromKey}, inclusive, to {@code toKey}, exclusive. @@ -74,9 +71,8 @@ public interface VaultService extends ManuallyCloseable { * then the corresponding key will be deleted. * * @param vals The map of keys and corresponding values. Cannot be {@code null}. - * @return Future representing pending completion of the operation. Cannot be {@code null}. */ - CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals); + void putAll(Map<ByteArray, byte[]> vals); /** * Closes the service. diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java index 9ac8cea2d5..047cb8d7b0 100644 --- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java +++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java @@ -19,21 +19,11 @@ package org.apache.ignite.internal.vault.persistence; import java.nio.file.Path; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; -import org.apache.ignite.internal.future.InFlightFutures; import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.lang.IgniteInternalException; -import org.apache.ignite.internal.logger.IgniteLogger; -import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter; import org.apache.ignite.internal.rocksdb.RocksUtils; -import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.Cursor; -import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.vault.VaultEntry; import org.apache.ignite.internal.vault.VaultService; import org.jetbrains.annotations.Nullable; @@ -58,12 +48,6 @@ public class PersistentVaultService implements VaultService { RocksDB.loadLibrary(); } - private static final IgniteLogger LOG = Loggers.forClass(PersistentVaultService.class); - - private final ExecutorService threadPool; - - private final InFlightFutures futureTracker = new InFlightFutures(); - private final Options options = options(); private volatile RocksDB db; @@ -74,13 +58,10 @@ public class PersistentVaultService implements VaultService { /** * Creates persistent vault service. * - * @param nodeName Node name. * @param path base path for RocksDB */ - public PersistentVaultService(String nodeName, Path path) { + public PersistentVaultService(Path path) { this.path = path; - - threadPool = Executors.newFixedThreadPool(4, NamedThreadFactory.create(nodeName, "vault", LOG)); } private static Options options() { @@ -112,59 +93,44 @@ public class PersistentVaultService implements VaultService { } } - /** {@inheritDoc} */ @Override public void close() { - IgniteUtils.shutdownAndAwaitTermination(threadPool, 10, TimeUnit.SECONDS); - - futureTracker.cancelInFlightFutures(); - - RocksUtils.closeAll(options, db); + RocksUtils.closeAll(db, options); } - /** {@inheritDoc} */ @Override - public CompletableFuture<VaultEntry> get(ByteArray key) { - return supplyAsync(() -> { - try { - byte[] value = db.get(key.bytes()); - - return value == null ? null : new VaultEntry(key, value); - } catch (RocksDBException e) { - throw new IgniteInternalException("Unable to read data from RocksDB", e); - } - }); + public @Nullable VaultEntry get(ByteArray key) { + try { + byte[] value = db.get(key.bytes()); + + return value == null ? null : new VaultEntry(key, value); + } catch (RocksDBException e) { + throw new IgniteInternalException("Unable to read data from RocksDB", e); + } } - /** {@inheritDoc} */ @Override - public CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val) { - return runAsync(() -> { - try { - if (val == null) { - db.delete(key.bytes()); - } else { - db.put(key.bytes(), val); - } - } catch (RocksDBException e) { - throw new IgniteInternalException("Unable to write data to RocksDB", e); + public void put(ByteArray key, byte @Nullable [] val) { + try { + if (val == null) { + db.delete(key.bytes()); + } else { + db.put(key.bytes(), val); } - }); + } catch (RocksDBException e) { + throw new IgniteInternalException("Unable to write data to RocksDB", e); + } } - /** {@inheritDoc} */ @Override - public CompletableFuture<Void> remove(ByteArray key) { - return runAsync(() -> { - try { - db.delete(key.bytes()); - } catch (RocksDBException e) { - throw new IgniteInternalException("Unable to remove data to RocksDB", e); - } - }); + public void remove(ByteArray key) { + try { + db.delete(key.bytes()); + } catch (RocksDBException e) { + throw new IgniteInternalException("Unable to remove data to RocksDB", e); + } } - /** {@inheritDoc} */ @Override public Cursor<VaultEntry> range(ByteArray fromKey, ByteArray toKey) { var readOpts = new ReadOptions(); @@ -187,47 +153,28 @@ public class PersistentVaultService implements VaultService { public void close() { super.close(); - RocksUtils.closeAll(upperBound, readOpts); + RocksUtils.closeAll(readOpts, upperBound); } }; } - /** {@inheritDoc} */ @Override - public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) { - return runAsync(() -> { - try ( - var writeBatch = new WriteBatch(); - var writeOpts = new WriteOptions() - ) { - for (var entry : vals.entrySet()) { - if (entry.getValue() == null) { - writeBatch.delete(entry.getKey().bytes()); - } else { - writeBatch.put(entry.getKey().bytes(), entry.getValue()); - } + public void putAll(Map<ByteArray, byte[]> vals) { + try ( + var writeBatch = new WriteBatch(); + var writeOpts = new WriteOptions() + ) { + for (var entry : vals.entrySet()) { + if (entry.getValue() == null) { + writeBatch.delete(entry.getKey().bytes()); + } else { + writeBatch.put(entry.getKey().bytes(), entry.getValue()); } - - db.write(writeOpts, writeBatch); - } catch (RocksDBException e) { - throw new IgniteInternalException("Unable to write data to RocksDB", e); } - }); - } - private <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) { - CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier, threadPool); - - futureTracker.registerFuture(future); - - return future; - } - - private CompletableFuture<Void> runAsync(Runnable runnable) { - CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, threadPool); - - futureTracker.registerFuture(future); - - return future; + db.write(writeOpts, writeBatch); + } catch (RocksDBException e) { + throw new IgniteInternalException("Unable to write data to RocksDB", e); + } } } diff --git a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java index b84af179fb..320d599a54 100644 --- a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java +++ b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java @@ -17,9 +17,9 @@ package org.apache.ignite.internal.vault; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -54,15 +54,15 @@ public class VaultManagerTest { */ @Test void testName() { - assertThat(vaultManager.name(), willBe(nullValue(String.class))); + assertThat(vaultManager.name(), is(nullValue(String.class))); - assertThat(vaultManager.putName("foobar"), willBe(nullValue(Void.class))); + vaultManager.putName("foobar"); - assertThat(vaultManager.name(), willBe(equalTo("foobar"))); + assertThat(vaultManager.name(), is(equalTo("foobar"))); - assertThat(vaultManager.putName("foobarbaz"), willBe(nullValue(Void.class))); + vaultManager.putName("foobarbaz"); - assertThat(vaultManager.name(), willBe(equalTo("foobarbaz"))); + assertThat(vaultManager.name(), is(equalTo("foobarbaz"))); } /** diff --git a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java index c863eb928f..3a60f8b53c 100644 --- a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java +++ b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java @@ -19,13 +19,13 @@ package org.apache.ignite.internal.vault; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -76,18 +76,18 @@ public abstract class VaultServiceTest { public void testPut() { ByteArray key = getKey(1); - assertThat(vaultService.get(key), willBe(nullValue(VaultEntry.class))); + assertThat(vaultService.get(key), is(nullValue(VaultEntry.class))); byte[] val = getValue(1); - assertThat(vaultService.put(key, val), willBe(nullValue(Void.class))); + vaultService.put(key, val); - assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key, val)))); + assertThat(vaultService.get(key), is(equalTo(new VaultEntry(key, val)))); // test idempotency - assertThat(vaultService.put(key, val), willBe(nullValue(Void.class))); + vaultService.put(key, val); - assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key, val)))); + assertThat(vaultService.get(key), is(equalTo(new VaultEntry(key, val)))); } /** @@ -99,13 +99,13 @@ public abstract class VaultServiceTest { byte[] val = getValue(1); - assertThat(vaultService.put(key, val), willBe(nullValue(Void.class))); + vaultService.put(key, val); - assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key, val)))); + assertThat(vaultService.get(key), is(equalTo(new VaultEntry(key, val)))); - assertThat(vaultService.put(key, null), willBe(nullValue(Void.class))); + vaultService.put(key, null); - assertThat(vaultService.get(key), willBe(nullValue(VaultEntry.class))); + assertThat(vaultService.get(key), is(nullValue(VaultEntry.class))); } /** @@ -116,20 +116,20 @@ public abstract class VaultServiceTest { ByteArray key = getKey(1); // Remove non-existent value. - assertThat(vaultService.remove(key), willBe(nullValue(Void.class))); + assertDoesNotThrow(() -> vaultService.remove(key)); - assertThat(vaultService.get(key), willBe(nullValue(VaultEntry.class))); + assertThat(vaultService.get(key), is(nullValue(VaultEntry.class))); byte[] val = getValue(1); - assertThat(vaultService.put(key, val), willBe(nullValue(Void.class))); + vaultService.put(key, val); - assertThat(vaultService.get(key), willBe(equalTo(new VaultEntry(key, val)))); + assertThat(vaultService.get(key), is(equalTo(new VaultEntry(key, val)))); // Remove existing value. - assertThat(vaultService.remove(key), willBe(nullValue(Void.class))); + vaultService.remove(key); - assertThat(vaultService.get(key), willBe(nullValue(VaultEntry.class))); + assertThat(vaultService.get(key), is(nullValue(VaultEntry.class))); } /** @@ -141,13 +141,13 @@ public abstract class VaultServiceTest { .boxed() .collect(toMap(VaultServiceTest::getKey, VaultServiceTest::getValue)); - assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class))); + vaultService.putAll(batch); - batch.forEach((k, v) -> assertThat(vaultService.get(k), willBe(equalTo(new VaultEntry(k, v))))); + batch.forEach((k, v) -> assertThat(vaultService.get(k), is(equalTo(new VaultEntry(k, v))))); - assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class))); + vaultService.putAll(batch); - batch.forEach((k, v) -> assertThat(vaultService.get(k), willBe(equalTo(new VaultEntry(k, v))))); + batch.forEach((k, v) -> assertThat(vaultService.get(k), is(equalTo(new VaultEntry(k, v))))); } /** @@ -159,9 +159,9 @@ public abstract class VaultServiceTest { .boxed() .collect(toMap(VaultServiceTest::getKey, VaultServiceTest::getValue)); - assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class))); + vaultService.putAll(batch); - batch.forEach((k, v) -> assertThat(vaultService.get(k), willBe(equalTo(new VaultEntry(k, v))))); + batch.forEach((k, v) -> assertThat(vaultService.get(k), is(equalTo(new VaultEntry(k, v))))); Map<ByteArray, byte[]> secondBatch = new HashMap<>(); @@ -170,12 +170,12 @@ public abstract class VaultServiceTest { secondBatch.put(getKey(1), null); secondBatch.put(getKey(3), null); - assertThat(vaultService.putAll(secondBatch), willBe(nullValue(Void.class))); + vaultService.putAll(secondBatch); - assertThat(vaultService.get(getKey(4)), willBe(equalTo(new VaultEntry(getKey(4), getValue(3))))); - assertThat(vaultService.get(getKey(8)), willBe(equalTo(new VaultEntry(getKey(8), getValue(3))))); - assertThat(vaultService.get(getKey(1)), willBe(nullValue(VaultEntry.class))); - assertThat(vaultService.get(getKey(3)), willBe(nullValue(VaultEntry.class))); + assertThat(vaultService.get(getKey(4)), is(equalTo(new VaultEntry(getKey(4), getValue(3))))); + assertThat(vaultService.get(getKey(8)), is(equalTo(new VaultEntry(getKey(8), getValue(3))))); + assertThat(vaultService.get(getKey(1)), is(nullValue(VaultEntry.class))); + assertThat(vaultService.get(getKey(3)), is(nullValue(VaultEntry.class))); } /** @@ -187,7 +187,7 @@ public abstract class VaultServiceTest { Map<ByteArray, byte[]> batch = entries.stream().collect(toMap(VaultEntry::key, VaultEntry::value)); - assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class))); + vaultService.putAll(batch); List<VaultEntry> range = range(getKey(3), getKey(7)); @@ -203,7 +203,7 @@ public abstract class VaultServiceTest { Map<ByteArray, byte[]> batch = entries.stream().collect(toMap(VaultEntry::key, VaultEntry::value)); - assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class))); + vaultService.putAll(batch); List<VaultEntry> range = range(getKey(0), getKey(9)); @@ -219,7 +219,7 @@ public abstract class VaultServiceTest { Map<ByteArray, byte[]> batch = entries.stream().collect(toMap(VaultEntry::key, VaultEntry::value)); - assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class))); + vaultService.putAll(batch); List<VaultEntry> range = range(getKey(3), getKey(4)); @@ -233,7 +233,7 @@ public abstract class VaultServiceTest { public void testRangeInvalidBoundaries() throws Exception { Map<ByteArray, byte[]> batch = getRange(3, 5).stream().collect(toMap(VaultEntry::key, VaultEntry::value)); - assertThat(vaultService.putAll(batch), willBe(nullValue(Void.class))); + vaultService.putAll(batch); List<VaultEntry> range = range(getKey(4), getKey(1)); diff --git a/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistencePropertiesVaultServiceTest.java b/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistencePropertiesVaultServiceTest.java index 134514361e..291a56bf1a 100644 --- a/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistencePropertiesVaultServiceTest.java +++ b/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistencePropertiesVaultServiceTest.java @@ -18,11 +18,9 @@ package org.apache.ignite.internal.vault.persistence; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; import java.nio.charset.StandardCharsets; import java.nio.file.Path; @@ -48,39 +46,37 @@ class ItPersistencePropertiesVaultServiceTest { * Tests that the Vault Service correctly persists data after multiple service restarts. */ @Test - void testPersistentRestart() throws Exception { + void testPersistentRestart() { var data = Map.of( new ByteArray("key" + 1), fromString("value" + 1), new ByteArray("key" + 2), fromString("value" + 2), new ByteArray("key" + 3), fromString("value" + 3) ); - String nodeName = "test"; - - var service = new PersistentVaultService(nodeName, vaultDir); + var service = new PersistentVaultService(vaultDir); try { service.start(); - assertThat(service.putAll(data), willBe(nullValue(Void.class))); + service.putAll(data); } finally { service.close(); } - service = new PersistentVaultService(nodeName, vaultDir); + service = new PersistentVaultService(vaultDir); try { service.start(); assertThat( service.get(new ByteArray("key" + 1)), - willBe(equalTo(new VaultEntry(new ByteArray("key" + 1), fromString("value" + 1)))) + is(equalTo(new VaultEntry(new ByteArray("key" + 1), fromString("value" + 1)))) ); } finally { service.close(); } - service = new PersistentVaultService(nodeName, vaultDir); + service = new PersistentVaultService(vaultDir); try { service.start(); diff --git a/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistentVaultServiceTest.java b/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistentVaultServiceTest.java index 521cd3ddc4..1a8a077e91 100644 --- a/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistentVaultServiceTest.java +++ b/modules/vault/src/test/java/org/apache/ignite/internal/vault/persistence/ItPersistentVaultServiceTest.java @@ -35,6 +35,6 @@ class ItPersistentVaultServiceTest extends VaultServiceTest { /** {@inheritDoc} */ @Override protected VaultService getVaultService() { - return new PersistentVaultService("test", vaultDir); + return new PersistentVaultService(vaultDir); } } diff --git a/modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java b/modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java index 6dc5916ff5..87188e0afd 100644 --- a/modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java +++ b/modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java @@ -17,17 +17,15 @@ package org.apache.ignite.internal.vault.inmemory; -import static java.util.concurrent.CompletableFuture.runAsync; -import static java.util.concurrent.CompletableFuture.supplyAsync; +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toList; -import java.util.Collections; -import java.util.Iterator; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.CursorUtils; import org.apache.ignite.internal.vault.VaultEntry; import org.apache.ignite.internal.vault.VaultService; import org.jetbrains.annotations.Nullable; @@ -47,75 +45,57 @@ public class InMemoryVaultService implements VaultService { // No-op. } - /** {@inheritDoc} */ @Override public void close() { // No-op. } - /** {@inheritDoc} */ @Override - public CompletableFuture<VaultEntry> get(ByteArray key) { - return supplyAsync(() -> { - synchronized (mux) { - byte[] value = storage.get(key); + public @Nullable VaultEntry get(ByteArray key) { + synchronized (mux) { + byte[] value = storage.get(key); - return value == null ? null : new VaultEntry(key, storage.get(key)); - } - }); + return value == null ? null : new VaultEntry(key, value); + } } - /** {@inheritDoc} */ @Override - public CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val) { - return runAsync(() -> { - synchronized (mux) { - storage.put(key, val); - } - }); + public void put(ByteArray key, byte @Nullable [] val) { + synchronized (mux) { + storage.put(key, val); + } } - /** {@inheritDoc} */ @Override - public CompletableFuture<Void> remove(ByteArray key) { - return runAsync(() -> { - synchronized (mux) { - storage.remove(key); - } - }); + public void remove(ByteArray key) { + synchronized (mux) { + storage.remove(key); + } } - /** {@inheritDoc} */ @Override public Cursor<VaultEntry> range(ByteArray fromKey, ByteArray toKey) { - Iterator<VaultEntry> it; - if (fromKey.compareTo(toKey) >= 0) { - it = Collections.emptyIterator(); - } else { - synchronized (mux) { - it = storage.subMap(fromKey, toKey).entrySet().stream() - .map(e -> new VaultEntry(new ByteArray(e.getKey()), e.getValue())) - .iterator(); - } + return CursorUtils.emptyCursor(); } - return Cursor.fromBareIterator(it); + synchronized (mux) { + return storage.subMap(fromKey, toKey).entrySet().stream() + .map(e -> new VaultEntry(new ByteArray(e.getKey()), e.getValue())) + .collect(collectingAndThen(toList(), Cursor::fromIterable)); + } } - /** {@inheritDoc} */ @Override - public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) { - return runAsync(() -> { - synchronized (mux) { - for (var entry : vals.entrySet()) { - if (entry.getValue() == null) { - storage.remove(entry.getKey()); - } else { - storage.put(entry.getKey(), entry.getValue()); - } + public void putAll(Map<ByteArray, byte[]> vals) { + synchronized (mux) { + for (Map.Entry<ByteArray, byte[]> entry : vals.entrySet()) { + if (entry.getValue() == null) { + storage.remove(entry.getKey()); + } else { + storage.put(entry.getKey(), entry.getValue()); } } - }); + } } }