This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new f5f2c65827 IGNITE-19606 Linearize metaStorageManager.deployWatches and metaStorageManager.start() (#2183) f5f2c65827 is described below commit f5f2c658273efaf98b5df156690f3b1afe10876d Author: Vladislav Pyatkov <vldpyat...@gmail.com> AuthorDate: Thu Jun 15 18:01:15 2023 +0400 IGNITE-19606 Linearize metaStorageManager.deployWatches and metaStorageManager.start() (#2183) --- .../internal/catalog/CatalogServiceSelfTest.java | 7 +- .../catalog/storage/UpdateLogImplTest.java | 5 +- .../metastore/DeploymentUnitStoreImplTest.java | 6 +- .../DistributionZonesTestUtil.java | 11 +- .../internal/metastorage/MetaStorageManager.java | 4 +- .../impl/ItMetaStorageManagerImplTest.java | 6 +- .../ItMetaStorageMultipleNodesAbstractTest.java | 28 ++++- .../metastorage/impl/ItMetaStorageWatchTest.java | 11 +- .../metastorage/impl/MetaStorageManagerImpl.java | 11 +- .../MetaStorageDeployWatchesCorrectnessTest.java | 122 +++++++++++++++++++++ .../MultiActorPlacementDriverTest.java | 7 +- .../PlacementDriverManagerTest.java | 6 +- .../placementdriver/PlacementDriverTest.java | 5 +- .../ItDistributedConfigurationPropertiesTest.java | 23 ++-- .../ItDistributedConfigurationStorageTest.java | 17 ++- .../storage/ItRebalanceDistributedTest.java | 19 +++- .../runner/app/ItIgniteNodeRestartTest.java | 21 +--- .../org/apache/ignite/internal/app/IgniteImpl.java | 8 +- 18 files changed, 241 insertions(+), 76 deletions(-) diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java index 3c6affd572..d4f21d0222 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java @@ -100,7 +100,6 @@ import org.apache.ignite.lang.DistributionZoneNotFoundException; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.lang.IndexAlreadyExistsException; import org.apache.ignite.lang.IndexNotFoundException; -import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.lang.TableAlreadyExistsException; import org.apache.ignite.lang.TableNotFoundException; import org.apache.ignite.sql.ColumnType; @@ -138,7 +137,7 @@ public class CatalogServiceSelfTest { private HybridClock clock; @BeforeEach - void setUp() throws NodeStoppingException { + void setUp() { vault = new VaultManager(new InMemoryVaultService()); metastore = StandaloneMetaStorageManager.create( @@ -152,7 +151,7 @@ public class CatalogServiceSelfTest { metastore.start(); service.start(); - metastore.deployWatches(); + assertThat("Watches were not deployed", metastore.deployWatches(), willCompleteSuccessfully()); } @AfterEach @@ -1128,7 +1127,7 @@ public class CatalogServiceSelfTest { metaStorageManager.start(); service.start(); - metaStorageManager.deployWatches(); + assertThat("Watches were not deployed", metaStorageManager.deployWatches(), willCompleteSuccessfully()); try { CreateTableParams params = CreateTableParams.builder() diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java index e41ba4eb53..dd9bf36ee5 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.catalog.storage; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -79,7 +80,7 @@ class UpdateLogImplTest { updateLog.registerUpdateHandler(update -> {/* no-op */}); updateLog.start(); - metastore.deployWatches(); + assertThat("Watches were not deployed", metastore.deployWatches(), willCompleteSuccessfully()); List<VersionedUpdate> expectedLog = List.of( new VersionedUpdate(1, 1L, List.of(new TestUpdateEntry("foo"))), @@ -138,7 +139,7 @@ class UpdateLogImplTest { long revisionBefore = metastore.appliedRevision(); - metastore.deployWatches(); + assertThat("Watches were not deployed", metastore.deployWatches(), willCompleteSuccessfully()); // first update should always be successful assertTrue(await(updateLog.append(singleEntryUpdateOfVersion(startVersion)))); diff --git a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java index 5474c10613..2cc191f3d9 100644 --- a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java +++ b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java @@ -22,6 +22,7 @@ import static org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.OB import static org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.REMOVING; import static org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.UPLOADING; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -46,7 +47,6 @@ import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; -import org.apache.ignite.lang.NodeStoppingException; import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -76,7 +76,7 @@ public class DeploymentUnitStoreImplTest { private Path workDir; @BeforeEach - public void setup() throws NodeStoppingException { + public void setup() { history.clear(); KeyValueStorage storage = new RocksDbKeyValueStorage("test", workDir); @@ -87,7 +87,7 @@ public class DeploymentUnitStoreImplTest { vaultManager.start(); metaStorageManager.start(); - metaStorageManager.deployWatches(); + assertThat("Watches were not deployed", metaStorageManager.deployWatches(), willCompleteSuccessfully()); } @Test diff --git a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java index 132795d09f..16997064c5 100644 --- a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java +++ b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java @@ -57,7 +57,6 @@ import org.apache.ignite.internal.schema.configuration.storage.DataStorageChange import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.lang.ByteArray; -import org.apache.ignite.lang.NodeStoppingException; import org.jetbrains.annotations.Nullable; @@ -306,24 +305,22 @@ public class DistributionZonesTestUtil { * TODO: IGNITE-19403 Watch listeners must be deployed after the zone manager starts. * * @param metaStorageManager Meta storage manager. - * @throws NodeStoppingException If node is stopping. * @throws InterruptedException If thread was interrupted. */ - public static void deployWatchesAndUpdateMetaStorageRevision(MetaStorageManager metaStorageManager) - throws NodeStoppingException, InterruptedException { + public static void deployWatchesAndUpdateMetaStorageRevision(MetaStorageManager metaStorageManager) throws InterruptedException { // Watches are deployed before distributionZoneManager start in order to update Meta Storage revision before // distributionZoneManager's recovery. - metaStorageManager.deployWatches(); + CompletableFuture<Void> deployWatchesFut = metaStorageManager.deployWatches(); // Bump Meta Storage applied revision by modifying a fake key. DistributionZoneManager breaks on start if Vault is not empty, but // Meta Storage revision is equal to 0. var fakeKey = new ByteArray("foobar"); - CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke( + CompletableFuture<Boolean> invokeFuture = deployWatchesFut.thenCompose(unused -> metaStorageManager.invoke( Conditions.notExists(fakeKey), Operations.put(fakeKey, fakeKey.bytes()), Operations.noop() - ); + )); assertThat(invokeFuture, willBe(true)); diff --git a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java index efca5855ed..5d1d411f70 100644 --- a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java +++ b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java @@ -170,8 +170,10 @@ public interface MetaStorageManager extends IgniteComponent { * Starts all registered watches. * * <p>Should be called after all Ignite components have registered required watches and they are ready to process Meta Storage events. + * + * @return Future which completes when Meta storage manager is started and deploying watches is finished. */ - void deployWatches() throws NodeStoppingException; + CompletableFuture<Void> deployWatches(); /** * Returns cluster time with a hybrid clock instance and access to safe time. diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java index 7b1ef8573d..04dec2a164 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java @@ -24,6 +24,7 @@ import static org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscr import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -61,7 +62,6 @@ import org.apache.ignite.internal.vault.VaultEntry; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; import org.apache.ignite.lang.ByteArray; -import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.StaticNodeFinder; @@ -87,7 +87,7 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { private MetaStorageManagerImpl metaStorageManager; @BeforeEach - void setUp(TestInfo testInfo, @InjectConfiguration RaftConfiguration raftConfiguration) throws NodeStoppingException { + void setUp(TestInfo testInfo, @InjectConfiguration RaftConfiguration raftConfiguration) { var addr = new NetworkAddress("localhost", 10_000); clusterService = clusterService(testInfo, addr.port(), new StaticNodeFinder(List.of(addr))); @@ -118,7 +118,7 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { raftManager.start(); metaStorageManager.start(); - metaStorageManager.deployWatches(); + assertThat("Watches were not deployed", metaStorageManager.deployWatches(), willCompleteSuccessfully()); } @AfterEach diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java index 53ef8f6d2a..9103686e64 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java @@ -115,6 +115,9 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr private final MetaStorageManagerImpl metaStorageManager; + /** The future have to be complete after the node start and all Meta storage watches are deployd. */ + private final CompletableFuture<Void> deployWatchesFut; + Node(ClusterService clusterService, Path dataPath) { this.clusterService = clusterService; @@ -151,6 +154,8 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr createStorage(name(), basePath), clock ); + + deployWatchesFut = metaStorageManager.deployWatches(); } void start() throws NodeStoppingException { @@ -164,8 +169,13 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr ); components.forEach(IgniteComponent::start); + } - metaStorageManager.deployWatches(); + /** + * Waits for watches deployed. + */ + void waitWatches() { + assertThat("Watches were not deployed", deployWatchesFut, willCompleteSuccessfully()); } String name() { @@ -228,6 +238,8 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test"); + firstNode.waitWatches(); + var key = new ByteArray("foo"); byte[] value = "bar".getBytes(StandardCharsets.UTF_8); @@ -237,6 +249,8 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr Node secondNode = startNode(testInfo); + secondNode.waitWatches(); + // Check that reading remote data works correctly. assertThat(secondNode.metaStorageManager.get(key).thenApply(Entry::value), willBe(value)); @@ -288,6 +302,9 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test"); + firstNode.waitWatches(); + secondNode.waitWatches(); + // Try reading some data to make sure that Raft has been configured correctly. assertThat(secondNode.metaStorageManager.get(new ByteArray("test")).thenApply(Entry::value), willBe(nullValue())); @@ -314,6 +331,9 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr assertThat(allOf(firstNode.cmgManager.onJoinReady(), secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully()); + firstNode.waitWatches(); + secondNode.waitWatches(); + CompletableFuture<Set<String>> logicalTopologyNodes = firstNode.cmgManager .logicalTopology() .thenApply(logicalTopology -> logicalTopology.nodes().stream().map(ClusterNode::name).collect(toSet())); @@ -357,6 +377,9 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr assertThat(allOf(firstNode.cmgManager.onJoinReady(), secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully()); + firstNode.waitWatches(); + secondNode.waitWatches(); + CompletableFuture<Void> watchCompletedFuture = new CompletableFuture<>(); CountDownLatch watchCalledLatch = new CountDownLatch(1); @@ -438,6 +461,9 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr assertThat(allOf(firstNode.cmgManager.onJoinReady(), secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully()); + firstNode.waitWatches(); + secondNode.waitWatches(); + assertThat( firstNode.metaStorageManager.put(ByteArray.fromString("test-key"), new byte[]{0, 1, 2, 3}), willCompleteSuccessfully() diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java index 1ea7959294..eb7701110f 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -273,7 +274,7 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest { for (Node node : nodes) { registerWatchAction.accept(node, latch); - node.metaStorageManager.deployWatches(); + assertThat("Watches were not deployed", node.metaStorageManager.deployWatches(), willCompleteSuccessfully()); } var key = new ByteArray("foo"); @@ -364,13 +365,7 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest { assertThat(invokeFuture, willBe(true)); - nodes.forEach(node -> { - try { - node.metaStorageManager.deployWatches(); - } catch (NodeStoppingException e) { - throw new RuntimeException(e); - } - }); + nodes.forEach(node -> assertThat("Watches were not deployed", node.metaStorageManager.deployWatches(), willCompleteSuccessfully())); assertTrue(exactLatch.await(10, TimeUnit.SECONDS)); assertTrue(prefixLatch.await(10, TimeUnit.SECONDS)); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index 143b30acab..082249a8fa 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.util.ByteUtils.bytesToLong; import static org.apache.ignite.internal.util.ByteUtils.longToBytes; import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume; +import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.lang.ErrorGroups.MetaStorage.RESTORING_STORAGE_ERR; import java.util.Collection; @@ -277,14 +278,16 @@ public class MetaStorageManagerImpl implements MetaStorageManager { } @Override - public void deployWatches() throws NodeStoppingException { + public CompletableFuture<Void> deployWatches() { if (!busyLock.enterBusy()) { - throw new NodeStoppingException(); + return CompletableFuture.failedFuture(new NodeStoppingException()); } try { - // Meta Storage contract states that all updated entries under a particular revision must be stored in the Vault. - storage.startWatches(this::onRevisionApplied); + return metaStorageSvcFut.thenRun(() -> inBusyLock(busyLock, () -> { + // Meta Storage contract states that all updated entries under a particular revision must be stored in the Vault. + storage.startWatches(this::onRevisionApplied); + })); } finally { busyLock.leaveBusy(); } diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java new file mode 100644 index 0000000000..9f68f87de2 --- /dev/null +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.impl; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Set; +import java.util.stream.Stream; +import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; +import org.apache.ignite.internal.raft.RaftManager; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.vault.VaultManager; +import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; +import org.apache.ignite.network.ClusterService; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Tests that check correctness of an invocation {@link MetaStorageManager#deployWatches()}. + */ +public class MetaStorageDeployWatchesCorrectnessTest extends IgniteAbstractTest { + /** Vault manager. */ + private static VaultManager vaultManager; + + @BeforeAll + public static void init() { + vaultManager = new VaultManager(new InMemoryVaultService()); + + vaultManager.start(); + } + + @AfterAll + public static void deInit() { + vaultManager.beforeNodeStop(); + + vaultManager.stop(); + } + + /** + * Returns a stream with test arguments. + * + * @return Stream of different types of Meta storages to to check. + * @throws Exception If failed. + */ + private static Stream<MetaStorageManager> metaStorageProvider() throws Exception { + HybridClock clock = new HybridClockImpl(); + String mcNodeName = "mc-node-1"; + + ClusterManagementGroupManager cmgManager = mock(ClusterManagementGroupManager.class); + ClusterService clusterService = mock(ClusterService.class); + LogicalTopologyService logicalTopologyService = mock(LogicalTopologyService.class); + RaftManager raftManager = mock(RaftManager.class); + + when(cmgManager.metaStorageNodes()).thenReturn(completedFuture(Set.of(mcNodeName))); + when(clusterService.nodeName()).thenReturn(mcNodeName); + when(raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(any(), any(), any(), any(), any())).thenReturn(completedFuture(mock( + RaftGroupService.class))); + + return Stream.of( + new MetaStorageManagerImpl( + vaultManager, + clusterService, + cmgManager, + logicalTopologyService, + raftManager, + new SimpleInMemoryKeyValueStorage(mcNodeName), + clock + ), + StandaloneMetaStorageManager.create(vaultManager) + ); + } + + /** + * Invokes {@link MetaStorageManager#deployWatches()} and checks result. + * + * @param metastore Meta storage. + */ + @ParameterizedTest + @MethodSource("metaStorageProvider") + public void testCheckCorrectness(MetaStorageManager metastore) throws Exception { + var deployWatchesFut = metastore.deployWatches(); + + assertFalse(deployWatchesFut.isDone()); + + metastore.start(); + + assertThat(deployWatchesFut, willCompleteSuccessfully()); + + metastore.beforeNodeStop(); + + metastore.stop(); + } +} diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java index 006cfcd436..6e6f7145aa 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java @@ -23,8 +23,10 @@ import static org.apache.ignite.internal.placementdriver.PlacementDriverManager. import static org.apache.ignite.internal.placementdriver.leases.Lease.fromBytes; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey; import static org.apache.ignite.lang.ByteArray.fromString; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -230,12 +232,11 @@ public class MultiActorPlacementDriverTest extends IgniteAbstractTest { * @param services Cluster services. * @param logicalTopManagers The list to update in the method. The list might be used for driving of the logical topology. * @return List of closures to stop the services. - * @throws Exception If something goes wrong. */ public List<Closeable> startPlacementDriver( Map<String, ClusterService> services, List<LogicalTopologyServiceTestImpl> logicalTopManagers - ) throws Exception { + ) { var res = new ArrayList<Closeable>(placementDriverNodeNames.size()); for (String nodeName : placementDriverNodeNames) { @@ -305,7 +306,7 @@ public class MultiActorPlacementDriverTest extends IgniteAbstractTest { metaStorageManager.start(); placementDriverManager.start(); - metaStorageManager.deployWatches(); + assertThat("Watches were not deployed", metaStorageManager.deployWatches(), willCompleteSuccessfully()); res.add(() -> { try { diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java index 6cceabe662..592a2e67b9 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java @@ -25,9 +25,11 @@ import static org.apache.ignite.internal.placementdriver.PlacementDriverManager. import static org.apache.ignite.internal.placementdriver.leases.Lease.fromBytes; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX; import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey; import static org.apache.ignite.lang.ByteArray.fromString; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -149,7 +151,7 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest { startPlacementDriverManager(); } - private void startPlacementDriverManager() throws NodeStoppingException { + private void startPlacementDriverManager() { vaultManager = new VaultManager(new PersistentVaultService(testNodeName(testInfo, PORT), workDir.resolve("vault"))); var nodeFinder = new StaticNodeFinder(Collections.singletonList(new NetworkAddress("localhost", PORT))); @@ -221,7 +223,7 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest { metaStorageManager.start(); placementDriverManager.start(); - metaStorageManager.deployWatches(); + assertThat("Watches were not deployed", metaStorageManager.deployWatches(), willCompleteSuccessfully()); } /** diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java index 50497b06d2..a6453191fd 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java @@ -22,6 +22,7 @@ import static org.apache.ignite.internal.metastorage.dsl.Operations.put; import static org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -84,7 +85,7 @@ public class PlacementDriverTest { private LeaseTracker placementDriver; @BeforeEach - void setUp() throws Exception { + void setUp() { vault = new VaultManager(new InMemoryVaultService()); metastore = StandaloneMetaStorageManager.create(vault); @@ -98,7 +99,7 @@ public class PlacementDriverTest { metastore.start(); placementDriver.startTrack(); - metastore.deployWatches(); + assertThat("Watches were not deployed", metastore.deployWatches(), willCompleteSuccessfully()); } @AfterEach diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java index d9d10ced6f..54881775cb 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.configuration; import static java.util.stream.Collectors.toUnmodifiableList; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -60,7 +61,6 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; -import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.StaticNodeFinder; @@ -114,6 +114,9 @@ public class ItDistributedConfigurationPropertiesTest { private final ConfigurationManager distributedCfgManager; + /** The future have to be complete after the node start and all Meta storage watches are deployd. */ + private final CompletableFuture<Void> deployWatchesFut; + /** Flag that disables storage updates. */ private volatile boolean receivesUpdates = true; @@ -161,6 +164,8 @@ public class ItDistributedConfigurationPropertiesTest { clock ); + deployWatchesFut = metaStorageManager.deployWatches(); + // create a custom storage implementation that is able to "lose" some storage updates var distributedCfgStorage = new DistributedConfigurationStorage(metaStorageManager, vaultManager) { /** {@inheritDoc} */ @@ -206,16 +211,16 @@ public class ItDistributedConfigurationPropertiesTest { Stream.of(clusterService, raftManager, cmgManager, metaStorageManager) .forEach(IgniteComponent::start); - // deploy watches to propagate data from the metastore into the vault - try { - metaStorageManager.deployWatches(); - } catch (NodeStoppingException e) { - throw new RuntimeException(e); - } - distributedCfgManager.start(); } + /** + * Waits for watches deployed. + */ + void waitWatches() { + assertThat("Watches were not deployed", deployWatchesFut, willCompleteSuccessfully()); + } + /** * Stops the created components. */ @@ -286,6 +291,8 @@ public class ItDistributedConfigurationPropertiesTest { Stream.of(firstNode, secondNode).parallel().forEach(Node::start); firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(), "cluster"); + + Stream.of(firstNode, secondNode).parallel().forEach(Node::waitWatches); } /** diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java index ffa58c6682..89dfa81caa 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java @@ -92,6 +92,9 @@ public class ItDistributedConfigurationStorageTest { private final DistributedConfigurationStorage cfgStorage; + /** The future have to be complete after the node start and all Meta storage watches are deployd. */ + private final CompletableFuture<Void> deployWatchesFut; + /** * Constructor that simply creates a subset of components of this node. */ @@ -133,6 +136,8 @@ public class ItDistributedConfigurationStorageTest { clock ); + deployWatchesFut = metaStorageManager.deployWatches(); + cfgStorage = new DistributedConfigurationStorage(metaStorageManager, vaultManager); } @@ -157,9 +162,13 @@ public class ItDistributedConfigurationStorageTest { return completedFuture(null); } }); + } - // deploy watches to propagate data from the metastore into the vault - metaStorageManager.deployWatches(); + /** + * Waits for watches deployed. + */ + void waitWatches() { + assertThat("Watches were not deployed", deployWatchesFut, willCompleteSuccessfully()); } /** @@ -200,6 +209,8 @@ public class ItDistributedConfigurationStorageTest { node.cmgManager.initCluster(List.of(node.name()), List.of(), "cluster"); + node.waitWatches(); + assertThat(node.cfgStorage.write(data, 0), willBe(equalTo(true))); assertThat(node.cfgStorage.writeConfigurationRevision(0, 1), willCompleteSuccessfully()); @@ -216,6 +227,8 @@ public class ItDistributedConfigurationStorageTest { try { node2.start(); + node2.waitWatches(); + CompletableFuture<Data> storageData = node2.cfgStorage.readDataOnRecovery(); assertThat(storageData.thenApply(Data::values), willBe(equalTo(data))); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java index a7d41e2b00..1594f8f319 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java @@ -237,6 +237,8 @@ public class ItRebalanceDistributedTest { allOf(nodes.get(0).cmgManager.onJoinReady(), nodes.get(1).cmgManager.onJoinReady(), nodes.get(2).cmgManager.onJoinReady()), willCompleteSuccessfully() ); + + nodes.stream().forEach(Node::waitWatches); } @AfterEach @@ -514,6 +516,8 @@ public class ItRebalanceDistributedTest { newNode.start(); + newNode.waitWatches(); + nodes.set(evictedNodeIndex, newNode); // Let's make sure that we will destroy the partition again. @@ -600,6 +604,9 @@ public class ItRebalanceDistributedTest { private final NetworkAddress networkAddress; + /** The future have to be complete after the node start and all Meta storage watches are deployd. */ + private CompletableFuture<Void> deployWatchesFut; + /** * Constructor that simply creates a subset of components of this node. */ @@ -828,7 +835,7 @@ public class ItRebalanceDistributedTest { /** * Starts the created components. */ - void start() throws Exception { + void start() { nodeComponents = List.of( vaultManager, nodeCfgMgr, @@ -857,8 +864,14 @@ public class ItRebalanceDistributedTest { willSucceedIn(1, TimeUnit.MINUTES) ); - // deploy watches to propagate data from the metastore into the vault - metaStorageManager.deployWatches(); + deployWatchesFut = metaStorageManager.deployWatches(); + } + + /** + * Waits for watches deployed. + */ + void waitWatches() { + assertThat("Watches were not deployed", deployWatchesFut, willCompleteSuccessfully()); } /** diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 20261f8cee..76d9c698dc 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -45,7 +45,6 @@ import java.util.Objects; import java.util.ServiceLoader; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.IntFunction; @@ -124,7 +123,6 @@ import org.apache.ignite.internal.vault.persistence.PersistentVaultService; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.lang.IgniteStringFormatter; import org.apache.ignite.lang.IgniteSystemProperties; -import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.network.NettyBootstrapFactory; import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; @@ -494,24 +492,15 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest { fut -> new TestConfigurationCatchUpListener(cfgStorage, fut, revisionCallback0) ); - CompletableFuture<?> notificationFuture = CompletableFuture.allOf( + CompletableFuture<?> startFuture = CompletableFuture.allOf( nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(), clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners() + ).thenCompose(unused -> + // Deploy all registered watches because all components are ready and have registered their listeners. + CompletableFuture.allOf(metaStorageMgr.deployWatches(), configurationCatchUpFuture) ); - CompletableFuture<?> startFuture = notificationFuture - .thenCompose(v -> { - // Deploy all registered watches because all components are ready and have registered their listeners. - try { - metaStorageMgr.deployWatches(); - } catch (NodeStoppingException e) { - throw new CompletionException(e); - } - - return configurationCatchUpFuture; - }); - - assertThat(startFuture, willCompleteSuccessfully()); + assertThat("Partial node was not started", startFuture, willCompleteSuccessfully()); log.info("Completed recovery on partially started node, last revision applied: " + lastRevision.get() + ", acceptableDifference: " + IgniteSystemProperties.getInteger(CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, 100) diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index dfa0fd35a1..91c329dcac 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -729,13 +729,7 @@ public class IgniteImpl implements Ignite { return notifyConfigurationListeners() .thenComposeAsync(t -> { // Deploy all registered watches because all components are ready and have registered their listeners. - try { - metaStorageMgr.deployWatches(); - } catch (NodeStoppingException e) { - throw new CompletionException(e); - } - - return recoveryFuture; + return metaStorageMgr.deployWatches().thenCompose(unused -> recoveryFuture); }, startupExecutor); }, startupExecutor) .thenRunAsync(() -> {