This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 4d4c9e8748 IGNITE-19778 Restore components state on metastorage recovery (#2266) 4d4c9e8748 is described below commit 4d4c9e8748bd16e079757cc4f4beac6b8977e3c0 Author: Semyon Danilov <samvi...@yandex.ru> AuthorDate: Wed Jul 12 12:03:53 2023 +0400 IGNITE-19778 Restore components state on metastorage recovery (#2266) --- .../internal/catalog/storage/UpdateLogImpl.java | 2 +- .../configuration/ConfigurationChanger.java | 5 - .../storage/ConfigurationStorage.java | 10 - .../storage/TestConfigurationStorage.java | 5 - .../java/org/apache/ignite/lang/ByteArray.java | 7 + .../distributionzones/DistributionZoneManager.java | 2 +- .../BaseDistributionZoneManagerTest.java | 2 +- .../internal/metastorage/MetaStorageManager.java | 23 +- .../impl/ItMetaStorageManagerImplTest.java | 71 ------ ...MetaStorageSafeTimePropagationAbstractTest.java | 2 +- .../metastorage/impl/ItMetaStorageWatchTest.java | 6 +- .../metastorage/impl/MetaStorageManagerImpl.java | 40 ++-- .../server/persistence/RocksDbKeyValueStorage.java | 6 +- .../server/BasicOperationsKeyValueStorageTest.java | 6 +- .../server/SimpleInMemoryKeyValueStorage.java | 6 +- .../ignite/internal/BaseIgniteRestartTest.java | 66 +----- .../ItDistributedConfigurationPropertiesTest.java | 4 +- .../ItDistributedConfigurationStorageTest.java | 4 +- .../storage/ItRebalanceDistributedTest.java | 68 +++--- .../zones/ItDistributionZonesFilterTest.java | 7 + ...niteDistributionZoneManagerNodeRestartTest.java | 5 +- .../runner/app/ItIgniteNodeRestartTest.java | 9 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 37 +--- .../storage/DistributedConfigurationStorage.java | 134 +++++------ .../storage/LocalConfigurationStorage.java | 5 - .../storage/LocalFileConfigurationStorage.java | 5 - .../recovery/ConfigurationCatchUpListener.java | 117 ---------- .../recovery/RecoveryCompletionFutureFactory.java | 51 ----- .../DistributedConfigurationCatchUpTest.java | 244 --------------------- .../DistributedConfigurationStorageTest.java | 8 +- .../internal/table/distributed/TableManager.java | 12 +- 31 files changed, 189 insertions(+), 780 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java index f627c51118..1b7994fabc 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java @@ -157,7 +157,7 @@ public class UpdateLogImpl implements UpdateLog { // TODO: IGNITE-19790 Read range from metastore while (true) { ByteArray key = CatalogKey.update(ver++); - Entry entry = metastore.getLocally(key.bytes(), appliedRevision); + Entry entry = metastore.getLocally(key, appliedRevision); if (entry.empty() || entry.tombstone()) { break; diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java index 5a01225087..6b115f5995 100644 --- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java +++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java @@ -644,11 +644,6 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange rwLock.writeLock().unlock(); } - // Save revisions for recovery. - // We execute synchronously to avoid a race between notifications about updating the Meta Storage and updating the revision - // of the Meta Storage. - storage.writeConfigurationRevision(oldStorageRoots.version, newStorageRoots.version); - long notificationNumber = notificationListenerCnt.incrementAndGet(); CompletableFuture<Void> notificationFuture; diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java index 7315dfd51b..d9663eb284 100644 --- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java +++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java @@ -79,16 +79,6 @@ public interface ConfigurationStorage extends ManuallyCloseable { */ CompletableFuture<Long> lastRevision(); - /** - * Writes previous and current configuration's MetaStorage revision for recovery. - * We need previous and current for the fail-safety: in case if node fails before changing master key on configuration update, - * MetaStorage's applied revision will be lower than {@code currentRevision} and we will be using previous revision. - * - * @param prevRevision Previous revision. - * @param currentRevision Current revision. - */ - void writeConfigurationRevision(long prevRevision, long currentRevision); - /** * Closes the storage. */ diff --git a/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java b/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java index 94193714e4..4231915b31 100644 --- a/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java +++ b/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java @@ -170,11 +170,6 @@ public class TestConfigurationStorage implements ConfigurationStorage { return CompletableFuture.completedFuture(version); } - @Override - public void writeConfigurationRevision(long prevRevision, long currentRevision) { - // No-op. - } - /** * Increase the current revision of the storage. * diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java b/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java index 76129b6d55..93b1e14a99 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java @@ -77,6 +77,13 @@ public final class ByteArray implements Comparable<ByteArray> { return arr; } + /** + * Returns the length of this byte array. + */ + public int length() { + return arr.length; + } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java index 03ce9cbaf0..d61f044560 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java @@ -828,7 +828,7 @@ public class DistributionZoneManager implements IgniteComponent { ZoneState zoneState = new ZoneState(executor, topologyAugmentationMap); - VaultEntry dataNodes = vaultMgr.get(zoneDataNodesKey(zoneId)).join(); + Entry dataNodes = metaStorageManager.getLocally(zoneDataNodesKey(zoneId), revision); if (dataNodes != null) { String filter = zone.filter(); diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java index a8df523fba..9c69f09930 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java @@ -93,7 +93,7 @@ public class BaseDistributionZoneManagerTest extends BaseIgniteAbstractTest { components.add(metaStorageManager); - ConfigurationStorage cfgStorage = new DistributedConfigurationStorage(metaStorageManager, vaultMgr); + ConfigurationStorage cfgStorage = new DistributedConfigurationStorage(metaStorageManager); generator = new ConfigurationTreeGenerator( List.of(DistributionZonesConfiguration.KEY), diff --git a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java index 0461d73924..fb37f7f1ab 100644 --- a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java +++ b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.metastorage.dsl.StatementResult; import org.apache.ignite.internal.metastorage.exceptions.CompactedException; import org.apache.ignite.internal.metastorage.exceptions.OperationTimeoutException; import org.apache.ignite.internal.metastorage.server.time.ClusterTime; +import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.lang.ByteArray; import org.apache.ignite.lang.NodeStoppingException; import org.jetbrains.annotations.Nullable; @@ -66,6 +67,9 @@ public interface MetaStorageManager extends IgniteComponent { * Returns all entries corresponding to the given key and bounded by given revisions. * All these entries are ordered by revisions and have the same key. * The lower bound and the upper bound are inclusive. + * + * <p>This method doesn't wait for the storage's revision to become greater or equal to the revUpperBound parameter, so it is + * up to user to wait for the appropriate time to call this method. * TODO: IGNITE-19735 move this method to another interface for interaction with local KeyValueStorage. * * @param key The key. @@ -80,11 +84,28 @@ public interface MetaStorageManager extends IgniteComponent { * Returns an entry by the given key and bounded by the given revision. The entry is obtained * from the local storage. * + * <p>This method doesn't wait for the storage's revision to become greater or equal to the revUpperBound parameter, so it is + * up to user to wait for the appropriate time to call this method. + * * @param key The key. * @param revUpperBound The upper bound of revision. * @return Value corresponding to the given key. */ - Entry getLocally(byte[] key, long revUpperBound); + Entry getLocally(ByteArray key, long revUpperBound); + + /** + * Returns cursor by entries which correspond to the given keys range and bounded by revision number. The entries in the cursor + * are obtained from the local storage. + * + * <p>This method doesn't wait for the storage's revision to become greater or equal to the revUpperBound parameter, so it is + * up to user to wait for the appropriate time to call this method. + * + * @param startKey Start key of range (inclusive). + * @param endKey Last key of range (exclusive). + * @param revUpperBound Upper bound of revision. + * @return Cursor by entries which correspond to the given keys range. + */ + Cursor<Entry> getLocally(ByteArray startKey, ByteArray endKey, long revUpperBound); /** * Looks up a timestamp by a revision. This should only be invoked if it is guaranteed that the diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java index ff2a75c687..d2dbc13721 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.metastorage.impl; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will; @@ -31,8 +30,6 @@ import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; @@ -65,7 +62,6 @@ import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFacto import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.internal.vault.VaultEntry; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; import org.apache.ignite.lang.ByteArray; @@ -194,73 +190,6 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { assertThat(actualKeysFuture, will(contains(key1.bytes(), key2.bytes(), key3.bytes()))); } - /** - * Tests that "watched" Meta Storage keys get persisted in the Vault. - */ - @Test - void testWatchEventsPersistence() throws InterruptedException { - byte[] value = "value".getBytes(UTF_8); - - var key1 = new ByteArray("foo"); - var key2 = new ByteArray("bar"); - - CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke( - Conditions.notExists(new ByteArray("foo")), - List.of( - Operations.put(key1, value), - Operations.put(key2, value) - ), - List.of(Operations.noop()) - ); - - assertThat(invokeFuture, willBe(true)); - - // No data should be persisted until any watches are registered. - assertThat(vaultManager.get(key1), willBe(nullValue())); - assertThat(vaultManager.get(key2), willBe(nullValue())); - - metaStorageManager.registerExactWatch(key1, new NoOpListener()); - - invokeFuture = metaStorageManager.invoke( - Conditions.exists(new ByteArray("foo")), - List.of( - Operations.put(key1, value), - Operations.put(key2, value) - ), - List.of(Operations.noop()) - ); - - assertThat(invokeFuture, willBe(true)); - - assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() == 2, 10_000)); - - // Expect that only the watched key is persisted. - assertThat(vaultManager.get(key1).thenApply(VaultEntry::value), willBe(value)); - assertThat(vaultManager.get(key2), willBe(nullValue())); - - metaStorageManager.registerExactWatch(key2, new NoOpListener()); - - assertThat(metaStorageManager.appliedRevision(), is(2L)); - - byte[] newValue = "newValue".getBytes(UTF_8); - - invokeFuture = metaStorageManager.invoke( - Conditions.exists(new ByteArray("foo")), - List.of( - Operations.put(key1, newValue), - Operations.put(key2, newValue) - ), - List.of(Operations.noop()) - ); - - assertThat(invokeFuture, willBe(true)); - - assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() == 3, 10_000)); - - assertThat(vaultManager.get(key1).thenApply(VaultEntry::value), willBe(newValue)); - assertThat(vaultManager.get(key2).thenApply(VaultEntry::value), willBe(newValue)); - } - private static class NoOpListener implements WatchListener { @Override public CompletableFuture<Void> onUpdate(WatchEvent event) { diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java index 068b2b2721..47b8ad923a 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java @@ -45,7 +45,7 @@ public abstract class ItMetaStorageSafeTimePropagationAbstractTest extends Abstr @BeforeEach public void startWatches() { - storage.startWatches(0, (e, t) -> { + storage.startWatches(1, (e, t) -> { time.updateSafeTime(t); return CompletableFuture.completedFuture(null); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java index c0e2d93e9e..e919c6ccbd 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java @@ -218,6 +218,10 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest { String name = nodes.get(0).name(); nodes.get(0).cmgManager.initCluster(List.of(name), List.of(name), "test"); + + for (Node node : nodes) { + assertThat(node.metaStorageManager.recoveryFinishedFuture(), willCompleteSuccessfully()); + } } @Test @@ -312,7 +316,7 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest { } /** - * Tests that metastorage missed metastorage events are replayed after deploying watches. + * Tests that missed metastorage events are replayed after deploying watches. */ @Test void testReplayUpdates() throws Exception { diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index 5c9c72d8df..10ec576f1a 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -66,6 +66,7 @@ import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.vault.VaultEntry; @@ -206,7 +207,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { assert targetRevision != null; - listenForRecovery(recoveryFinishedFuture, targetRevision); + listenForRecovery(targetRevision); }); return recoveryFinishedFuture; @@ -215,10 +216,10 @@ public class MetaStorageManagerImpl implements MetaStorageManager { } } - private void listenForRecovery(CompletableFuture<Long> res, long targetRevision) { + private void listenForRecovery(long targetRevision) { storage.setRecoveryRevisionListener(storageRevision -> { if (!busyLock.enterBusy()) { - res.completeExceptionally(new NodeStoppingException()); + recoveryFinishedFuture.completeExceptionally(new NodeStoppingException()); return; } @@ -230,7 +231,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { storage.setRecoveryRevisionListener(null); - if (res.complete(targetRevision)) { + if (recoveryFinishedFuture.complete(targetRevision)) { LOG.info("Finished MetaStorage recovery"); } } finally { @@ -239,7 +240,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { }); if (!busyLock.enterBusy()) { - res.completeExceptionally(new NodeStoppingException()); + recoveryFinishedFuture.completeExceptionally(new NodeStoppingException()); return; } @@ -249,7 +250,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { if (storage.revision() >= targetRevision) { storage.setRecoveryRevisionListener(null); - if (res.complete(targetRevision)) { + if (recoveryFinishedFuture.complete(targetRevision)) { LOG.info("Finished MetaStorage recovery"); } } @@ -502,18 +503,23 @@ public class MetaStorageManagerImpl implements MetaStorageManager { } @Override - public Entry getLocally(byte[] key, long revUpperBound) { + public Entry getLocally(ByteArray key, long revUpperBound) { if (!busyLock.enterBusy()) { throw new IgniteException(new NodeStoppingException()); } try { - return storage.get(key, revUpperBound); + return storage.get(key.bytes(), revUpperBound); } finally { busyLock.leaveBusy(); } } + @Override + public Cursor<Entry> getLocally(ByteArray startKey, ByteArray endKey, long revUpperBound) { + return storage.range(startKey.bytes(), endKey.bytes(), revUpperBound); + } + @Override public HybridTimestamp timestampByRevision(long revision) { if (!busyLock.enterBusy()) { @@ -824,19 +830,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { clusterTime.updateSafeTime(time); try { - CompletableFuture<Void> saveToVaultFuture; - - if (watchEvent.entryEvents().isEmpty()) { - saveToVaultFuture = vaultMgr.put(APPLIED_REV_KEY, longToBytes(watchEvent.revision())); - } else { - Map<ByteArray, byte[]> batch = IgniteUtils.newHashMap(watchEvent.entryEvents().size() + 1); - - batch.put(APPLIED_REV_KEY, longToBytes(watchEvent.revision())); - - watchEvent.entryEvents().forEach(e -> batch.put(new ByteArray(e.newEntry().key()), e.newEntry().value())); - - saveToVaultFuture = vaultMgr.putAll(batch); - } + CompletableFuture<Void> saveToVaultFuture = vaultMgr.put(APPLIED_REV_KEY, longToBytes(watchEvent.revision())); return saveToVaultFuture.thenRun(() -> appliedRevision = watchEvent.revision()); } finally { @@ -906,7 +900,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { } /** Explicitly notifies revision update listeners. */ - public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long newRevision) { - return storage.notifyRevisionUpdateListenerOnStart(newRevision); + public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart() { + return recoveryFinishedFuture.thenCompose(storage::notifyRevisionUpdateListenerOnStart); } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java index ee7f3628f5..abb2d85848 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java @@ -992,6 +992,8 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { @Override public void startWatches(long startRevision, OnRevisionAppliedCallback revisionCallback) { + assert startRevision != 0 : "First meaningful revision is 1"; + long currentRevision; rwLock.readLock().lock(); @@ -1496,9 +1498,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { } private void replayUpdates(long lowerRevision, long upperRevision) { - // TODO: https://issues.apache.org/jira/browse/IGNITE-19778 Should be Math.max, so we start from the revision that - // components restored their state to (lowerRevision). - long minWatchRevision = Math.min(lowerRevision, watchProcessor.minWatchRevision().orElse(-1)); + long minWatchRevision = Math.max(lowerRevision, watchProcessor.minWatchRevision().orElse(-1)); if (minWatchRevision == -1 || minWatchRevision > upperRevision) { // No events to replay, we can start processing more recent events from the event queue. diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java index d0640a4d9c..198ade0395 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java @@ -1969,7 +1969,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu long appliedRevision = storage.revision(); - storage.startWatches(0, (event, ts) -> completedFuture(null)); + storage.startWatches(1, (event, ts) -> completedFuture(null)); CompletableFuture<byte[]> fut = new CompletableFuture<>(); @@ -2310,7 +2310,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu when(mockCallback.onRevisionApplied(any(), any())).thenReturn(completedFuture(null)); - storage.startWatches(0, mockCallback); + storage.startWatches(1, mockCallback); putToMs(key, value); @@ -2505,7 +2505,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu } }); - storage.startWatches(0, (event, ts) -> completedFuture(null)); + storage.startWatches(1, (event, ts) -> completedFuture(null)); return resultFuture; } diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java index dbc4ea68b1..6760682702 100644 --- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java +++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java @@ -483,6 +483,8 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { @Override public void startWatches(long startRevision, OnRevisionAppliedCallback revisionCallback) { + assert startRevision != 0 : "First meaningful revision is 1"; + synchronized (mux) { areWatchesEnabled = true; @@ -493,9 +495,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { } private void replayUpdates(long startRevision) { - // TODO: https://issues.apache.org/jira/browse/IGNITE-19778 Should be Math.max, so we start from the revision that - // components restored their state to (lowerRevision). - long minWatchRevision = Math.min(startRevision, watchProcessor.minWatchRevision().orElse(-1)); + long minWatchRevision = Math.max(startRevision, watchProcessor.minWatchRevision().orElse(-1)); if (minWatchRevision <= 0) { return; diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java index 54c6b5fe73..8604076f57 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java @@ -17,9 +17,9 @@ package org.apache.ignite.internal; -import static org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; import java.io.IOException; import java.nio.file.Files; @@ -29,7 +29,6 @@ import java.util.ArrayList; import java.util.List; import java.util.ListIterator; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import org.apache.ignite.IgnitionManager; import org.apache.ignite.configuration.ConfigurationModule; @@ -41,20 +40,16 @@ import org.apache.ignite.internal.configuration.ConfigurationModules; import org.apache.ignite.internal.configuration.ConfigurationRegistry; import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator; import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider; -import org.apache.ignite.internal.configuration.storage.ConfigurationStorage; import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.MetaStorageManager; -import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener; -import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.persistence.PersistentVaultService; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.lang.IgniteStringFormatter; -import org.apache.ignite.lang.IgniteSystemProperties; import org.intellij.lang.annotations.Language; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; @@ -239,34 +234,21 @@ public abstract class BaseIgniteRestartTest extends IgniteAbstractTest { ConfigurationTreeGenerator distributedConfigurationGenerator, ConfigurationRegistry clusterConfigRegistry ) { - AtomicLong lastRevision = new AtomicLong(); - - Consumer<Long> revisionCallback0 = rev -> { - if (revisionCallback != null) { - revisionCallback.accept(rev); - } - - lastRevision.set(rev); - }; - - CompletableFuture<Void> configurationCatchUpFuture = RecoveryCompletionFutureFactory.create( - clusterCfgMgr, - fut -> new TestConfigurationCatchUpListener(cfgStorage, fut, revisionCallback0) - ); - CompletableFuture<?> startFuture = CompletableFuture.allOf( nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(), clusterConfigRegistry.notifyCurrentConfigurationListeners() ).thenCompose(unused -> // Deploy all registered watches because all components are ready and have registered their listeners. - CompletableFuture.allOf(metaStorageMgr.deployWatches(), configurationCatchUpFuture) + metaStorageMgr.deployWatches() ); assertThat("Partial node was not started", startFuture, willCompleteSuccessfully()); - log.info("Completed recovery on partially started node, last revision applied: " + lastRevision.get() - + ", acceptableDifference: " + IgniteSystemProperties.getInteger(CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, 100) - ); + Long recoveryRevision = metaStorageMgr.recoveryFinishedFuture().getNow(null); + + assertNotNull(recoveryRevision); + + log.info("Completed recovery on partially started node, MetaStorage revision recovered to: " + recoveryRevision); return new PartialNode( components, @@ -333,38 +315,4 @@ public abstract class BaseIgniteRestartTest extends IgniteAbstractTest { return logicalTopology; } } - - /** - * Configuration catch-up listener for test. - */ - public static class TestConfigurationCatchUpListener extends ConfigurationCatchUpListener { - /** Callback called on revision update. */ - private final Consumer<Long> revisionCallback; - - /** - * Constructor. - * - * @param cfgStorage Configuration storage. - * @param catchUpFuture Catch-up future. - */ - TestConfigurationCatchUpListener( - ConfigurationStorage cfgStorage, - CompletableFuture<Void> catchUpFuture, - Consumer<Long> revisionCallback - ) { - super(cfgStorage, catchUpFuture, log); - - this.revisionCallback = revisionCallback; - } - - /** {@inheritDoc} */ - @Override - public CompletableFuture<?> onUpdate(long appliedRevision) { - if (revisionCallback != null) { - revisionCallback.accept(appliedRevision); - } - - return super.onUpdate(appliedRevision); - } - } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java index 9a98a90d9d..6d50b1e47f 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java @@ -184,7 +184,7 @@ public class ItDistributedConfigurationPropertiesTest { deployWatchesFut = metaStorageManager.deployWatches(); // create a custom storage implementation that is able to "lose" some storage updates - var distributedCfgStorage = new DistributedConfigurationStorage(metaStorageManager, vaultManager) { + var distributedCfgStorage = new DistributedConfigurationStorage(metaStorageManager) { /** {@inheritDoc} */ @Override public synchronized void registerConfigurationListener(ConfigurationStorageListener listener) { @@ -228,7 +228,7 @@ public class ItDistributedConfigurationPropertiesTest { Stream.of(clusterService, raftManager, cmgManager, metaStorageManager) .forEach(IgniteComponent::start); - distributedCfgManager.start(); + CompletableFuture.runAsync(distributedCfgManager::start); } /** diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java index 0a88f8fd22..9c8bac497c 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java @@ -157,7 +157,7 @@ public class ItDistributedConfigurationStorageTest { deployWatchesFut = metaStorageManager.deployWatches(); - cfgStorage = new DistributedConfigurationStorage(metaStorageManager, vaultManager); + cfgStorage = new DistributedConfigurationStorage(metaStorageManager); } /** @@ -232,8 +232,6 @@ public class ItDistributedConfigurationStorageTest { assertThat(node.cfgStorage.write(data, 0), willBe(equalTo(true))); - node.cfgStorage.writeConfigurationRevision(0, 1); - assertTrue(waitForCondition( () -> node.metaStorageManager.appliedRevision() != 0, 3000 diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java index 23fff6513d..77b022c0d4 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.configuration.storage; import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.function.Function.identity; import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZoneReplicas; import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments; @@ -56,6 +57,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -168,6 +170,7 @@ import org.apache.ignite.utils.ClusterServiceTestUtils; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; @@ -178,6 +181,7 @@ import org.mockito.Mockito; */ @ExtendWith(WorkDirectoryExtension.class) @ExtendWith(ConfigurationExtension.class) +@Disabled("https://issues.apache.org/jira/browse/IGNITE-19506") public class ItRebalanceDistributedTest { /** Ignite logger. */ private static final IgniteLogger LOG = Loggers.forClass(ItRebalanceDistributedTest.class); @@ -483,6 +487,7 @@ public class ItRebalanceDistributedTest { checkInvokeDestroyedPartitionStorages(evictedNode, TABLE_1_NAME, 0); } + @Disabled("https://issues.apache.org/jira/browse/IGNITE-19506") @Test @UseTestTxStateStorage @UseRocksMetaStorage @@ -711,7 +716,7 @@ public class ItRebalanceDistributedTest { metaStorageConfiguration ); - cfgStorage = new DistributedConfigurationStorage(metaStorageManager, vaultManager); + cfgStorage = new DistributedConfigurationStorage(metaStorageManager); clusterCfgGenerator = new ConfigurationTreeGenerator( List.of( @@ -863,40 +868,51 @@ public class ItRebalanceDistributedTest { * Starts the created components. */ void start() { - nodeComponents = List.of( + nodeComponents = new CopyOnWriteArrayList<>(); + + List<IgniteComponent> firstComponents = List.of( vaultManager, nodeCfgMgr, clusterService, - raftManager, - cmgManager, - metaStorageManager, - clusterCfgMgr, - clockWaiter, - catalogManager, - distributionZoneManager, - replicaManager, - txManager, - baselineMgr, - dataStorageMgr, - schemaManager, - tableManager + raftManager ); - nodeComponents.forEach(IgniteComponent::start); + firstComponents.forEach(IgniteComponent::start); + + nodeComponents.addAll(firstComponents); + + deployWatchesFut = CompletableFuture.supplyAsync(() -> { + List<IgniteComponent> secondComponents = List.of( + cmgManager, + metaStorageManager, + clusterCfgMgr, + clockWaiter, + catalogManager, + distributionZoneManager, + replicaManager, + txManager, + baselineMgr, + dataStorageMgr, + schemaManager, + tableManager + ); + + secondComponents.forEach(IgniteComponent::start); + + nodeComponents.addAll(secondComponents); - assertThat( - allOf( + var configurationNotificationFut = metaStorageManager.recoveryFinishedFuture().thenCompose(rev -> { + return allOf( nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(), clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(), - // Why "-1"? I don't know, it just works like that. - ((MetaStorageManagerImpl) metaStorageManager).notifyRevisionUpdateListenerOnStart( - metaStorageManager.appliedRevision() - 1 - ) - ), - willSucceedIn(1, TimeUnit.MINUTES) - ); + ((MetaStorageManagerImpl) metaStorageManager).notifyRevisionUpdateListenerOnStart() + ); + }); + + assertThat(configurationNotificationFut, willSucceedIn(1, TimeUnit.MINUTES)); - deployWatchesFut = metaStorageManager.deployWatches(); + return metaStorageManager.deployWatches(); + }).thenCompose(identity()); } /** diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java index 74198469b5..4f4743953c 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.lang.ByteArray; import org.apache.ignite.sql.Session; import org.intellij.lang.annotations.Language; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** @@ -95,6 +96,8 @@ public class ItDistributionZonesFilterTest extends ClusterPerTestIntegrationTest * * @throws Exception If failed. */ + // TODO: https://issues.apache.org/jira/browse/IGNITE-19955 also blocks this. + @Disabled("https://issues.apache.org/jira/browse/IGNITE-19506") @Test void testFilteredDataNodesPropagatedToStable() throws Exception { String filter = "'$[?(@.region == \"US\" && @.storage == \"SSD\")]'"; @@ -172,6 +175,8 @@ public class ItDistributionZonesFilterTest extends ClusterPerTestIntegrationTest * * @throws Exception If failed. */ + // TODO: https://issues.apache.org/jira/browse/IGNITE-19955 also blocks this. + @Disabled("https://issues.apache.org/jira/browse/IGNITE-19506") @Test void testAlteringFiltersPropagatedDataNodesToStableImmediately() throws Exception { String filter = "'$[?(@.region == \"US\" && @.storage == \"SSD\")]'"; @@ -240,6 +245,8 @@ public class ItDistributionZonesFilterTest extends ClusterPerTestIntegrationTest * * @throws Exception If failed. */ + // TODO: https://issues.apache.org/jira/browse/IGNITE-19955 also blocks this. + @Disabled("https://issues.apache.org/jira/browse/IGNITE-19506") @Test void testEmptyDataNodesDoNotPropagatedToStableAfterAlteringFilter() throws Exception { String filter = "'$[?(@.region == \"US\" && @.storage == \"SSD\")]'"; diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java index d744178a3a..daff409fc2 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java @@ -26,7 +26,6 @@ import static org.apache.ignite.internal.distributionzones.DistributionZonesTest import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision; -import static org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.util.ByteUtils.bytesToLong; @@ -80,7 +79,6 @@ import org.apache.ignite.internal.network.configuration.NetworkConfiguration; import org.apache.ignite.internal.network.recovery.VaultStateIds; import org.apache.ignite.internal.schema.configuration.TablesConfiguration; import org.apache.ignite.internal.testframework.TestIgnitionManager; -import org.apache.ignite.internal.testframework.WithSystemProperty; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.lang.ByteArray; import org.apache.ignite.network.ClusterNode; @@ -96,7 +94,6 @@ import org.junit.jupiter.params.provider.MethodSource; /** * Tests for checking {@link DistributionZoneManager} behavior after node's restart. */ -@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = "0") @ExtendWith(ConfigurationExtension.class) public class ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRestartTest { private static final LogicalNode A = new LogicalNode( @@ -181,7 +178,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe new TestRocksDbKeyValueStorage(name, workDir.resolve("metastorage")) )); - var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, vault); + var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr); ConfigurationTreeGenerator distributedConfigurationGenerator = new ConfigurationTreeGenerator( modules.distributed().rootKeys(), diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 5d3923ef7d..df0527853e 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.runner.app; -import static org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; @@ -116,7 +115,6 @@ import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.testframework.TestIgnitionManager; -import org.apache.ignite.internal.testframework.WithSystemProperty; import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; import org.apache.ignite.internal.tx.impl.TxManagerImpl; @@ -149,7 +147,6 @@ import org.junit.jupiter.params.provider.ValueSource; /** * These tests check node restart scenarios. */ -@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = "0") @ExtendWith(ConfigurationExtension.class) @Timeout(120) public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { @@ -297,7 +294,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { metaStorageConfiguration ); - var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, vault); + var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr); ConfigurationTreeGenerator distributedConfigurationGenerator = new ConfigurationTreeGenerator( modules.distributed().rootKeys(), @@ -779,7 +776,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { for (int i = 0; i < 10; i++) { ByteArray key = ByteArray.fromString("some-test-key-" + i); - byte[] value = restartedMs.getLocally(key.bytes(), 100).value(); + byte[] value = restartedMs.getLocally(key, 100).value(); assertEquals(1, value.length); assertEquals((byte) i, value[0]); @@ -1020,7 +1017,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { * The test for node restart when there is a gap between the node local configuration and distributed configuration. */ @Test - @WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = "0") public void testCfgGapWithoutData() throws InterruptedException { List<IgniteImpl> nodes = startNodes(3); @@ -1052,7 +1048,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { */ @Test @Disabled(value = "https://issues.apache.org/jira/browse/IGNITE-18919") - @WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = "0") public void testMetastorageStop() throws InterruptedException { int cfgGap = 4; diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 5b2b53f0ba..dfd08d83ce 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -69,7 +69,6 @@ import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator; import org.apache.ignite.internal.configuration.DistributedConfigurationUpdater; import org.apache.ignite.internal.configuration.SecurityConfiguration; import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider; -import org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListener; import org.apache.ignite.internal.configuration.presentation.HoconPresentation; import org.apache.ignite.internal.configuration.storage.ConfigurationStorage; import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage; @@ -103,8 +102,6 @@ import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator; -import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener; -import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory; import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.rest.RestComponent; @@ -430,7 +427,7 @@ public class IgniteImpl implements Ignite { topologyAwareRaftGroupServiceFactory ); - this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, vaultMgr); + this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr); clusterCfgMgr = new ConfigurationManager( modules.distributed().rootKeys(), @@ -982,44 +979,16 @@ public class IgniteImpl implements Ignite { * and deploying watches after that. */ private CompletableFuture<?> recoverComponentsStateOnStart(ExecutorService startupExecutor) { - // Recovery future must be created before configuration listeners are triggered. - CompletableFuture<?> recoveryFuture = RecoveryCompletionFutureFactory.create( - clusterCfgMgr, - fut -> new ConfigurationCatchUpListener(cfgStorage, fut, LOG) - ); - - //TODO https://issues.apache.org/jira/browse/IGNITE-19778 - // The order of these two lines matter, the first method relies on the second one not being called yet. - // After the fix, the order will most likely have to be reversed. - CompletableFuture<Void> startupRevisionUpdate = notifyRevisionUpdateListenerOnStart(); CompletableFuture<Void> startupConfigurationUpdate = notifyConfigurationListeners(); + CompletableFuture<Void> startupRevisionUpdate = metaStorageMgr.notifyRevisionUpdateListenerOnStart(); return CompletableFuture.allOf(startupConfigurationUpdate, startupRevisionUpdate) .thenComposeAsync(t -> { // Deploy all registered watches because all components are ready and have registered their listeners. - return metaStorageMgr.deployWatches().thenCompose(unused -> recoveryFuture); + return metaStorageMgr.deployWatches(); }, startupExecutor); } - private CompletableFuture<Void> notifyRevisionUpdateListenerOnStart() { - //TODO https://issues.apache.org/jira/browse/IGNITE-19778 Use meta-storages revision after recovery, - // it should match configuration revision. - // Temporary workaround. - // In order to avoid making a public getter for configuration revision, I read it from the startup notification. - // It should be removed once we start using up-to-date meta-storage revision for node startup. - var configurationRevisionFuture = new CompletableFuture<Void>(); - - ConfigurationStorageRevisionListener revisionListener = newStorageRevision -> - ((MetaStorageManagerImpl) metaStorageMgr).notifyRevisionUpdateListenerOnStart(newStorageRevision) - .thenRun(() -> configurationRevisionFuture.complete(null)); - - clusterConfiguration().listenUpdateStorageRevision(revisionListener); - - return configurationRevisionFuture.thenRun(() -> - clusterConfiguration().stopListenUpdateStorageRevision(revisionListener) - ); - } - /** * Notify all listeners of current configurations. */ diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java index cbfe4bd99b..71e79ff9ab 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java @@ -18,6 +18,9 @@ package org.apache.ignite.internal.configuration.storage; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.or; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision; import java.io.Serializable; import java.util.Arrays; @@ -41,19 +44,15 @@ import org.apache.ignite.internal.metastorage.EntryEvent; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.WatchEvent; import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.metastorage.dsl.Condition; import org.apache.ignite.internal.metastorage.dsl.ConditionType; -import org.apache.ignite.internal.metastorage.dsl.Conditions; import org.apache.ignite.internal.metastorage.dsl.Operation; import org.apache.ignite.internal.metastorage.dsl.Operations; -import org.apache.ignite.internal.metastorage.dsl.SimpleCondition; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.internal.vault.VaultEntry; -import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.lang.ByteArray; -import org.jetbrains.annotations.Nullable; /** * Distributed configuration storage. @@ -70,11 +69,6 @@ public class DistributedConfigurationStorage implements ConfigurationStorage { */ private static final ByteArray MASTER_KEY = new ByteArray(DISTRIBUTED_PREFIX + "$master$key"); - /** - * Vault's key for a value of previous and current configuration's MetaStorage revision. - */ - private static final ByteArray CONFIGURATION_REVISIONS_KEY = new ByteArray("$revisions"); - /** * Prefix for all keys in the distributed storage. This key is expected to be the first key in lexicographical order of distributed * configuration keys. @@ -91,9 +85,6 @@ public class DistributedConfigurationStorage implements ConfigurationStorage { /** Meta storage manager. */ private final MetaStorageManager metaStorageMgr; - /** Vault manager. */ - private final VaultManager vaultMgr; - /** Configuration changes listener. */ private volatile ConfigurationStorageListener lsnr; @@ -105,10 +96,10 @@ public class DistributedConfigurationStorage implements ConfigurationStorage { * <p>Given that {@link #MASTER_KEY} is updated on every configuration change, one could assume that {@code changeId} matches the * revision of {@link #MASTER_KEY}. * - * <p>This is true for all cases except for node restart. Key-specific revision values are lost on local vault copy after restart, so - * stored {@link MetaStorageManager#appliedRevision} value is used instead. This fact has very important side effect: it's no longer - * possible to use {@link ConditionType#REV_EQUAL} on {@link #MASTER_KEY} in {@link DistributedConfigurationStorage#write(Map, long)}. - * {@link ConditionType#REV_LESS_OR_EQUAL} must be used instead. + * <p>This is true for all cases except for node restart. We use latest values after restart, so MetaStorage's local revision is used + * instead. This fact has very important side effect: it's no longer possible to use {@link ConditionType#REV_EQUAL} on + * {@link #MASTER_KEY} in {@link DistributedConfigurationStorage#write(Map, long)}. {@link ConditionType#REV_LESS_OR_EQUAL} must be + * used instead. * * @see #MASTER_KEY * @see #write(Map, long) @@ -123,12 +114,9 @@ public class DistributedConfigurationStorage implements ConfigurationStorage { * Constructor. * * @param metaStorageMgr Meta storage manager. - * @param vaultMgr Vault manager. */ - public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr, VaultManager vaultMgr) { + public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) { this.metaStorageMgr = metaStorageMgr; - - this.vaultMgr = vaultMgr; } @Override @@ -206,60 +194,46 @@ public class DistributedConfigurationStorage implements ConfigurationStorage { @Override public CompletableFuture<Data> readDataOnRecovery() throws StorageException { - CompletableFuture<Data> future = vaultMgr.get(CONFIGURATION_REVISIONS_KEY) - .thenApplyAsync(entry -> { - long revision = resolveRevision(metaStorageMgr.appliedRevision(), entry); - - return readDataOnRecovery0(revision); - }, threadPool); + CompletableFuture<Data> future = metaStorageMgr.recoveryFinishedFuture() + .thenApplyAsync(this::readDataOnRecovery0, threadPool); return registerFuture(future); } - /** - * Resolves current configuration revision based on the saved in the Vault revision of the metastorage and also previous and current - * revisions of the configuration saved in the Vault. - * - * @param metaStorageRevision Meta Storage revision. - * @param revisionsEntry Configuration revisions entry. - * @return Configuration revision. - */ - private static long resolveRevision(long metaStorageRevision, @Nullable VaultEntry revisionsEntry) { - if (revisionsEntry != null) { - byte[] value = revisionsEntry.value(); - long prevMasterKeyRevision = ByteUtils.bytesToLong(value, 0); - long curMasterKeyRevision = ByteUtils.bytesToLong(value, Long.BYTES); - - // If current master key revision is higher than applied revision, then node failed - // before applied revision changed, so we have to use previous master key revision - return curMasterKeyRevision <= metaStorageRevision ? curMasterKeyRevision : prevMasterKeyRevision; - } else { - // Configuration has not been updated yet, so it is safe to return 0 as the revision for the master key. - return 0L; - } - } - private Data readDataOnRecovery0(long cfgRevision) { var data = new HashMap<String, Serializable>(); - try (Cursor<VaultEntry> entries = storedDistributedConfigKeys()) { - for (VaultEntry entry : entries) { - ByteArray key = entry.key(); + byte[] masterKey = MASTER_KEY.bytes(); + boolean sawMasterKey = false; + + try (Cursor<Entry> cursor = metaStorageMgr.getLocally(DST_KEYS_START_RANGE, DST_KEYS_END_RANGE, cfgRevision)) { + for (Entry entry : cursor) { + if (entry.tombstone()) { + continue; + } + + byte[] key = entry.key(); byte[] value = entry.value(); - // vault iterator should not return nulls as values + // MetaStorage iterator should not return nulls as values. assert value != null; - if (key.equals(MASTER_KEY)) { + if (!sawMasterKey && Arrays.equals(masterKey, key)) { + sawMasterKey = true; + continue; } - String dataKey = key.toString().substring(DISTRIBUTED_PREFIX.length()); + int startIdx = DST_KEYS_START_RANGE.length(); + + int keyLengthWithoutPrefix = key.length - startIdx; + + var dataKey = new String(key, startIdx, keyLengthWithoutPrefix, UTF_8); data.put(dataKey, ConfigurationSerializationUtil.fromBytes(value)); } } catch (Exception e) { - throw new StorageException("Exception when closing a Vault cursor", e); + throw new StorageException("Exception reading data on recovery", e); } assert data.isEmpty() || cfgRevision > 0; @@ -296,9 +270,26 @@ public class DistributedConfigurationStorage implements ConfigurationStorage { operations.add(Operations.put(MASTER_KEY, ByteUtils.longToBytes(curChangeId))); - SimpleCondition condition = curChangeId == 0L - ? Conditions.notExists(MASTER_KEY) - : Conditions.revision(MASTER_KEY).eq(curChangeId); + // Condition for a valid MetaStorage data update. Several possibilities here: + // - First update ever, MASTER_KEY property must be absent from MetaStorage. + // - Current node has already performed some updates or received them from MetaStorage watch listener. In this + // case "curChangeId" must match the MASTER_KEY revision exactly. + // - Current node has been restarted and received updates from MetaStorage watch listeners after that. Same as + // above, "curChangeId" must match the MASTER_KEY revision exactly. + // - Current node has been restarted and have not received any updates from MetaStorage watch listeners yet. + // In this case "curChangeId" matches MetaStorage's local revision, which may or may not match the MASTER_KEY revision. Two + // options here: + // - MASTER_KEY is missing in local MetaStorage copy. This means that current node have not performed nor + // observed any configuration changes. Valid condition is "MASTER_KEY does not exist". + // - MASTER_KEY is present in local MetaStorage copy. The MASTER_KEY revision is unknown but is less than or + // equal to MetaStorage's local revision. Obviously, there have been no updates from the future yet. It's also guaranteed + // that the next received configuration update will have the MASTER_KEY revision strictly greater than + // current MetaStorage's local revision. This allows to conclude that "MASTER_KEY revision <= curChangeId" is a valid + // condition for update. + // Joining all of the above, it's concluded that the following condition must be used: + Condition condition = curChangeId == 0L + ? notExists(MASTER_KEY) + : or(notExists(MASTER_KEY), revision(MASTER_KEY).le(curChangeId)); return metaStorageMgr.invoke(condition, operations, Set.of(Operations.noop())); } @@ -374,29 +365,6 @@ public class DistributedConfigurationStorage implements ConfigurationStorage { return metaStorageMgr.get(MASTER_KEY).thenApply(Entry::revision); } - @Override - public void writeConfigurationRevision(long prevRevision, long currentRevision) { - byte[] value = new byte[Long.BYTES * 2]; - - ByteUtils.putLongToBytes(prevRevision, value, 0); - ByteUtils.putLongToBytes(currentRevision, value, Long.BYTES); - - vaultMgr.put(CONFIGURATION_REVISIONS_KEY, value).join(); - } - - /** - * Method that returns all distributed configuration keys from the meta storage that were stored in the vault filtered out by the - * current applied revision as an upper bound. Applied revision is a revision of the last successful vault update. - * - * <p>This is possible to distinguish cfg keys from meta storage because we add a special prefix {@link - * DistributedConfigurationStorage#DISTRIBUTED_PREFIX} to all configuration keys that we put to the meta storage. - * - * @return Iterator built upon all distributed configuration entries stored in vault. - */ - private Cursor<VaultEntry> storedDistributedConfigKeys() { - return vaultMgr.range(DST_KEYS_START_RANGE, DST_KEYS_END_RANGE); - } - /** * Increments the last character of the given string. */ diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java index b832ee4927..c026a6aa04 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java @@ -223,11 +223,6 @@ public class LocalConfigurationStorage implements ConfigurationStorage { .thenApply(entry -> entry == null ? 0 : (Long) fromBytes(entry.value())); } - @Override - public void writeConfigurationRevision(long prevRevision, long currentRevision) { - // No-op. - } - /** * Increments the last character of the given string. */ diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java index e3d75d477b..742745c4cb 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java @@ -219,11 +219,6 @@ public class LocalFileConfigurationStorage implements ConfigurationStorage { return CompletableFuture.completedFuture(lastRevision); } - @Override - public void writeConfigurationRevision(long prevRevision, long currentRevision) { - // No-op. - } - @Override public void close() { futureTracker.cancelInFlightFutures(); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java b/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java deleted file mode 100644 index ef5cc3c4f0..0000000000 --- a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.recovery; - -import java.util.concurrent.CompletableFuture; -import org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListener; -import org.apache.ignite.internal.configuration.storage.ConfigurationStorage; -import org.apache.ignite.internal.logger.IgniteLogger; -import org.apache.ignite.lang.IgniteStringFormatter; -import org.apache.ignite.lang.IgniteSystemProperties; - -/** - * Configuration listener class that is intended to complete catch-up future during recovery when configuration - * is up-to-date. - */ -public class ConfigurationCatchUpListener implements ConfigurationStorageRevisionListener { - /** Configuration catch-up difference property name. */ - public static final String CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY = "CONFIGURATION_CATCH_UP_DIFFERENCE"; - - /** - * Difference between the local node applied revision and distributed data storage revision on start. - * TODO: IGNITE-16488 Make this property adjustable and remove system property. - */ - private final int configurationCatchUpDifference = - IgniteSystemProperties.getInteger(CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, 100); - - /** Revision to catch up. */ - private volatile long targetRevision = -1; - - /** Catch-up future. */ - private final CompletableFuture<Void> catchUpFuture; - - /** Configuration storage. */ - private final ConfigurationStorage cfgStorage; - - /** Mutex for updating target revision. */ - private final Object targetRevisionUpdateMutex = new Object(); - - /** Logger. */ - private final IgniteLogger log; - - /** - * Constructor. - * - * @param catchUpFuture Catch-up future. - */ - public ConfigurationCatchUpListener(ConfigurationStorage cfgStorage, CompletableFuture<Void> catchUpFuture, IgniteLogger log) { - this.cfgStorage = cfgStorage; - this.catchUpFuture = catchUpFuture; - this.log = log; - } - - /** - * Checks the node up to date by distributed configuration. - * - * @param targetRevision Configuration revision. - * @param appliedRevision Last applied node revision. - * @return True when the applied revision is great enough for node recovery to complete, false otherwise. - */ - private boolean isConfigurationUpToDate(long targetRevision, long appliedRevision) { - return targetRevision - configurationCatchUpDifference <= appliedRevision; - } - - /** - * Retrieve distribute configuration revision and check whether local revision is great enough to complete the recovery. - * - * @param appliedRevision Applied revision. - */ - private CompletableFuture<?> checkRevisionUpToDate(long appliedRevision) { - return cfgStorage.lastRevision().thenAccept(rev -> { - synchronized (targetRevisionUpdateMutex) { - assert rev >= appliedRevision : IgniteStringFormatter.format( - "Configuration revision must be greater than local node applied revision [msRev={}, appliedRev={}", - rev, appliedRevision); - - targetRevision = rev; - - log.info("Checking revision on recovery [targetRevision={}, appliedRevision={}, acceptableDifference={}]", - targetRevision, appliedRevision, configurationCatchUpDifference); - - if (isConfigurationUpToDate(targetRevision, appliedRevision)) { - catchUpFuture.complete(null); - } - } - }); - } - - /** {@inheritDoc} */ - @Override public CompletableFuture<?> onUpdate(long appliedRevision) { - long targetRev = targetRevision; - - if (targetRev >= 0) { - if (isConfigurationUpToDate(targetRev, appliedRevision)) { - return checkRevisionUpToDate(appliedRevision); - } - } else { - return checkRevisionUpToDate(appliedRevision); - } - - return CompletableFuture.completedFuture(null); - } -} diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/RecoveryCompletionFutureFactory.java b/modules/runner/src/main/java/org/apache/ignite/internal/recovery/RecoveryCompletionFutureFactory.java deleted file mode 100644 index 32cc5ac7f7..0000000000 --- a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/RecoveryCompletionFutureFactory.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.recovery; - -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; -import org.apache.ignite.internal.configuration.ConfigurationManager; -import org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListener; - -/** - * Creates a future that completes when local recovery is finished. - */ -public class RecoveryCompletionFutureFactory { - /** - * Create recovery completion future. - * - * @param clusterCfgMgr Cluster configuration manager. - * @param listenerProvider Provider of configuration listener. - * @return Recovery completion future. - */ - public static CompletableFuture<Void> create( - ConfigurationManager clusterCfgMgr, - Function<CompletableFuture<Void>, ConfigurationStorageRevisionListener> listenerProvider - ) { - CompletableFuture<Void> configCatchUpFuture = new CompletableFuture<>(); - - ConfigurationStorageRevisionListener listener = listenerProvider.apply(configCatchUpFuture); - - CompletableFuture<Void> recoveryCompletionFuture = - configCatchUpFuture.thenRun(() -> clusterCfgMgr.configurationRegistry().stopListenUpdateStorageRevision(listener)); - - clusterCfgMgr.configurationRegistry().listenUpdateStorageRevision(listener); - - return recoveryCompletionFuture; - } -} diff --git a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java deleted file mode 100644 index 4e47a634e6..0000000000 --- a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.configuration.storage; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyCollection; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import org.apache.ignite.configuration.RootKey; -import org.apache.ignite.configuration.annotation.ConfigurationRoot; -import org.apache.ignite.configuration.annotation.Value; -import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator; -import org.apache.ignite.internal.configuration.TestConfigurationChanger; -import org.apache.ignite.internal.configuration.tree.ConfigurationSource; -import org.apache.ignite.internal.configuration.tree.ConstructableTreeNode; -import org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl; -import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.metastorage.EntryEvent; -import org.apache.ignite.internal.metastorage.MetaStorageManager; -import org.apache.ignite.internal.metastorage.WatchEvent; -import org.apache.ignite.internal.metastorage.WatchListener; -import org.apache.ignite.internal.metastorage.dsl.Operation; -import org.apache.ignite.internal.metastorage.impl.EntryImpl; -import org.apache.ignite.internal.vault.VaultManager; -import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; -import org.apache.ignite.lang.ByteArray; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** - * Tests for the {@link DistributedConfigurationStorage}. - */ -public class DistributedConfigurationCatchUpTest { - private final VaultManager vaultManager = new VaultManager(new InMemoryVaultService()); - - /** - * Before each. - */ - @BeforeEach - void start() { - vaultManager.start(); - } - - /** - * After each. - */ - @AfterEach - void stop() throws Exception { - vaultManager.stop(); - } - - /** - * Dummy configuration. - */ - @ConfigurationRoot(rootName = "someKey", type = DISTRIBUTED) - public static class DistributedTestConfigurationSchema { - @Value(hasDefault = true) - public final int foobar = 0; - } - - /** - * Tests that distributed configuration storage correctly picks up latest configuration MetaStorage revision during recovery process. - * - * @throws Exception If failed. - */ - @Test - public void testMetaStorageRevisionDifferentFromConfigurationOnRestart() throws Exception { - RootKey<DistributedTestConfiguration, DistributedTestView> rootKey = DistributedTestConfiguration.KEY; - - ConfigurationTreeGenerator generator = new ConfigurationTreeGenerator(rootKey); - - MetaStorageMockWrapper wrapper = new MetaStorageMockWrapper(); - - DistributedConfigurationStorage storage = storage(wrapper); - - try { - var validator = new ConfigurationValidatorImpl(generator, Set.of()); - var changer = new TestConfigurationChanger(List.of(rootKey), storage, generator, validator); - - try { - changer.start(); - - ConfigurationSource source = source( - rootKey, - (DistributedTestChange change) -> change.changeFoobar(1) - ); - - CompletableFuture<Void> change = changer.change(source); - - assertThat(change, willCompleteSuccessfully()); - } finally { - changer.stop(); - } - } finally { - storage.close(); - } - - // Put a value to the configuration, so we start on non-empty vault. - vaultManager.put(MetaStorageMockWrapper.TEST_KEY, new byte[]{4, 1, 2, 3, 4}).get(); - - // This emulates a change in MetaStorage that is not related to the configuration. - when(wrapper.mock.appliedRevision()).thenReturn(2L); - - storage = storage(wrapper); - - try { - - var configurationValidator = new ConfigurationValidatorImpl(generator, Set.of()); - var changer = new TestConfigurationChanger(List.of(rootKey), storage, generator, configurationValidator); - - try { - changer.start(); - - // Should return last configuration change, not last MetaStorage change. - assertThat(storage.lastRevision(), willBe(1L)); - } finally { - changer.stop(); - } - } finally { - storage.close(); - } - } - - private DistributedConfigurationStorage storage(MetaStorageMockWrapper wrapper) { - return new DistributedConfigurationStorage(wrapper.metaStorageManager(), vaultManager); - } - - /** - * This class stores data for {@link MetaStorageManager}'s mock. - */ - private static class MetaStorageMockWrapper { - private static final String DISTRIBUTED_PREFIX = "dst-cfg."; - - /** - * This and previous field are copy-pasted intentionally, so in case if something changes, this test should fail and be reviewed and - * re-written. - */ - private static final ByteArray MASTER_KEY = new ByteArray(DISTRIBUTED_PREFIX + "$master$key"); - - private static final ByteArray TEST_KEY = new ByteArray(DISTRIBUTED_PREFIX + "someKey.foobar"); - - /** MetaStorage mock. */ - private final MetaStorageManager mock; - - /** Captured MetaStorage listener. */ - private WatchListener lsnr; - - /** Current master key revision. */ - private final AtomicLong masterKeyRevision = new AtomicLong(); - - private MetaStorageMockWrapper() { - mock = mock(MetaStorageManager.class); - - setup(); - } - - private void setup() { - // Returns current master key revision. - when(mock.get(MASTER_KEY)).then(invocation -> { - return completedFuture(new EntryImpl(MASTER_KEY.bytes(), null, masterKeyRevision.get(), -1)); - }); - - // On any invocation - trigger storage listener. - when(mock.invoke(any(), anyCollection(), any())) - .then(invocation -> triggerStorageListener()); - - when(mock.invoke(any(), any(Operation.class), any())) - .then(invocation -> triggerStorageListener()); - - // This captures the listener. - doAnswer(invocation -> { - lsnr = invocation.getArgument(1); - - return null; - }).when(mock).registerPrefixWatch(any(), any()); - } - - /** - * Triggers MetaStorage listener incrementing master key revision. - */ - private CompletableFuture<Boolean> triggerStorageListener() { - return CompletableFuture.supplyAsync(() -> { - long newRevision = masterKeyRevision.incrementAndGet(); - - lsnr.onUpdate(new WatchEvent(List.of( - new EntryEvent(null, new EntryImpl(MASTER_KEY.bytes(), null, newRevision, -1)), - // Add a mock entry to simulate a configuration update. - new EntryEvent(null, new EntryImpl((DISTRIBUTED_PREFIX + "foobar").getBytes(UTF_8), null, newRevision, -1)) - ), newRevision, HybridTimestamp.MAX_VALUE)); - - return true; - }); - } - - private MetaStorageManager metaStorageManager() { - return mock; - } - } - - private static <CHANGET> ConfigurationSource source(RootKey<?, ? super CHANGET> rootKey, Consumer<CHANGET> changer) { - return new ConfigurationSource() { - @Override - public void descend(ConstructableTreeNode node) { - ConfigurationSource changerSrc = new ConfigurationSource() { - @Override - public void descend(ConstructableTreeNode node) { - changer.accept((CHANGET) node); - } - }; - - node.construct(rootKey.key(), changerSrc, true); - } - }; - } -} diff --git a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java index 17da06caf6..36d40052d8 100644 --- a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java +++ b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java @@ -34,8 +34,6 @@ import org.apache.ignite.internal.metastorage.server.ExistenceCondition; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.RevisionCondition; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; -import org.apache.ignite.internal.vault.VaultManager; -import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; import org.apache.ignite.lang.ByteArray; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -44,8 +42,6 @@ import org.junit.jupiter.api.BeforeEach; * Tests for the {@link DistributedConfigurationStorage}. */ public class DistributedConfigurationStorageTest extends ConfigurationStorageTest { - private final VaultManager vaultManager = new VaultManager(new InMemoryVaultService()); - private final KeyValueStorage metaStorage = new SimpleInMemoryKeyValueStorage("test"); private final MetaStorageManager metaStorageManager = mockMetaStorageManager(); @@ -55,7 +51,6 @@ public class DistributedConfigurationStorageTest extends ConfigurationStorageTes */ @BeforeEach void start() { - vaultManager.start(); metaStorage.start(); metaStorageManager.start(); } @@ -67,13 +62,12 @@ public class DistributedConfigurationStorageTest extends ConfigurationStorageTes void stop() throws Exception { metaStorageManager.stop(); metaStorage.close(); - vaultManager.stop(); } /** {@inheritDoc} */ @Override public ConfigurationStorage getStorage() { - return new DistributedConfigurationStorage(metaStorageManager, vaultManager); + return new DistributedConfigurationStorage(metaStorageManager); } /** diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 23ba896aa2..b534b2e831 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -633,6 +633,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp } }); + // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 Probably should be reworked so that + // the future is returned along with createTableFut. Right now it will break some tests. writeTableAssignmentsToMetastore(tableId, assignments); return createTableFut; @@ -641,7 +643,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp } } - private void writeTableAssignmentsToMetastore(int tableId, List<Set<Assignment>> assignments) { + private CompletableFuture<Boolean> writeTableAssignmentsToMetastore(int tableId, List<Set<Assignment>> assignments) { assert !assignments.isEmpty(); List<Operation> partitionAssignments = new ArrayList<>(assignments.size()); @@ -655,7 +657,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp Condition condition = Conditions.notExists(new ByteArray(partitionAssignments.get(0).key())); - metaStorageMgr + return metaStorageMgr .invoke(condition, partitionAssignments, Collections.emptyList()) .exceptionally(e -> { LOG.error("Couldn't write assignments to metastore", e); @@ -1265,7 +1267,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp }); })); - createTablePartitionsLocally(causalityToken, assignments, zoneDescriptor.id(), table); + CompletableFuture<?> createPartsFut = createTablePartitionsLocally(causalityToken, assignments, zoneDescriptor.id(), table); pendingTables.put(tableId, table); startedTables.put(tableId, table); @@ -1279,7 +1281,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp // TODO should be reworked in IGNITE-16763 // We use the event notification future as the result so that dependent components can complete the schema updates. - return fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId)); + + // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible performance degradation. + return allOf(createPartsFut, fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId))); } /**