This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 407419d900 IGNITE-20391 Return future metastore event processing from UpdateListener#onUpdate (#2570) 407419d900 is described below commit 407419d900014d272df0755884d6b0c884975b9f Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Mon Sep 11 17:18:12 2023 +0300 IGNITE-20391 Return future metastore event processing from UpdateListener#onUpdate (#2570) --- .../internal/catalog/CatalogManagerImpl.java | 5 +-- .../ignite/internal/catalog/storage/UpdateLog.java | 8 ++--- .../internal/catalog/storage/UpdateLogImpl.java | 24 ++++++++++---- .../catalog/storage/UpdateLogImplTest.java | 37 ++++++++++++++++++++-- 4 files changed, 57 insertions(+), 17 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java index ad709ccf39..eaf0cf8df9 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.catalog; +import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateAlterZoneParams; @@ -511,7 +512,7 @@ public class CatalogManagerImpl extends Producer<CatalogEvent, CatalogEventParam class OnUpdateHandlerImpl implements OnUpdateHandler { @Override - public void handle(VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) { + public CompletableFuture<Void> handle(VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) { int version = update.version(); Catalog catalog = catalogByVer.get(version - 1); @@ -538,7 +539,7 @@ public class CatalogManagerImpl extends Producer<CatalogEvent, CatalogEventParam } } - CompletableFuture.allOf(eventFutures.toArray(CompletableFuture[]::new)) + return allOf(eventFutures.toArray(CompletableFuture[]::new)) .whenComplete((ignore, err) -> { if (err != null) { LOG.warn("Failed to apply catalog update.", err); diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java index eb17046103..c2d24086e3 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java @@ -58,10 +58,7 @@ public interface UpdateLog extends IgniteComponent { */ @Override void start() throws IgniteInternalException; - /** - * An interface describing a handler that will receive notification - * when a new update is added to the log. - */ + /** An interface describing a handler that will receive notification when a new update is added to the log. */ @FunctionalInterface interface OnUpdateHandler { /** @@ -70,7 +67,8 @@ public interface UpdateLog extends IgniteComponent { * @param update A new update. * @param metaStorageUpdateTimestamp Timestamp assigned to the update by the Metastorage. * @param causalityToken Causality token. + * @return Handler future. */ - void handle(VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken); + CompletableFuture<Void> handle(VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken); } } diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java index 4d0e00198c..062e5f07f4 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.catalog.storage; +import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; import static org.apache.ignite.internal.metastorage.dsl.Conditions.or; @@ -27,9 +28,13 @@ import static org.apache.ignite.internal.metastorage.dsl.Statements.iif; import static org.apache.ignite.internal.util.ByteUtils.fromBytes; import static org.apache.ignite.internal.util.ByteUtils.intToBytes; +import java.util.ArrayList; +import java.util.Collection; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.EntryEvent; import org.apache.ignite.internal.metastorage.MetaStorageManager; @@ -52,12 +57,16 @@ import org.jetbrains.annotations.Nullable; * Metastore-based implementation of UpdateLog. */ public class UpdateLogImpl implements UpdateLog { + private static final IgniteLogger LOG = Loggers.forClass(UpdateLogImpl.class); + private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); + private final AtomicBoolean stopGuard = new AtomicBoolean(); private final MetaStorageManager metastore; private volatile OnUpdateHandler onUpdateHandler; + private volatile @Nullable UpdateListener listener; /** @@ -197,25 +206,26 @@ public class UpdateLogImpl implements UpdateLog { @Override public CompletableFuture<Void> onUpdate(WatchEvent event) { - for (EntryEvent eventEntry : event.entryEvents()) { - assert eventEntry.newEntry() != null; - assert !eventEntry.newEntry().empty(); + Collection<EntryEvent> entryEvents = event.entryEvents(); + + var handleFutures = new ArrayList<CompletableFuture<Void>>(entryEvents.size()); + for (EntryEvent eventEntry : entryEvents) { byte[] payload = eventEntry.newEntry().value(); - assert payload != null; + assert payload != null : eventEntry; VersionedUpdate update = fromBytes(payload); - onUpdateHandler.handle(update, event.timestamp(), event.revision()); + handleFutures.add(onUpdateHandler.handle(update, event.timestamp(), event.revision())); } - return CompletableFuture.completedFuture(null); + return allOf(handleFutures.toArray(CompletableFuture[]::new)); } @Override public void onError(Throwable e) { - assert false; + LOG.warn("Unable to process catalog event", e); } } } diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java index 27c1b524c2..414974e159 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java @@ -17,18 +17,21 @@ package org.apache.ignite.internal.catalog.storage; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.catalog.Catalog; import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler; @@ -49,7 +52,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; /** Tests to verify {@link UpdateLogImpl}. */ -@SuppressWarnings("ConstantConditions") class UpdateLogImplTest extends BaseIgniteAbstractTest { private KeyValueStorage keyValueStorage; @@ -66,6 +68,7 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest { metastore = StandaloneMetaStorageManager.create(vault, keyValueStorage); vault.start(); + keyValueStorage.start(); metastore.start(); } @@ -73,6 +76,7 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest { public void tearDown() throws Exception { IgniteUtils.closeAll( metastore == null ? null : metastore::stop, + keyValueStorage == null ? null : keyValueStorage::close, vault == null ? null : vault::stop ); } @@ -80,7 +84,7 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest { @Test void logReplayedOnStart() throws Exception { // First, let's append a few entries to the update log. - UpdateLogImpl updateLogImpl = createAndStartUpdateLogImpl((update, ts, causalityToken) -> {/* no-op */}); + UpdateLogImpl updateLogImpl = createAndStartUpdateLogImpl((update, ts, causalityToken) -> completedFuture(null)); assertThat(metastore.deployWatches(), willCompleteSuccessfully()); @@ -95,7 +99,11 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest { var actualUpdates = new ArrayList<VersionedUpdate>(); - createAndStartUpdateLogImpl((update, ts, causalityToken) -> actualUpdates.add(update)); + createAndStartUpdateLogImpl((update, ts, causalityToken) -> { + actualUpdates.add(update); + + return completedFuture(null); + }); // Let's check that we have recovered to the latest version. assertThat(actualUpdates, equalTo(expectedUpdates)); @@ -162,6 +170,8 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest { updateLog.registerUpdateHandler((update, ts, causalityToken) -> { appliedVersions.add(update.version()); causalityTokens.add(causalityToken); + + return completedFuture(null); }); long revisionBefore = metastore.appliedRevision(); @@ -203,6 +213,27 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest { assertThat(causalityTokens, equalTo(expectedTokens)); } + @Test + void testUpdateMetastoreRevisionAfterUpdateHandlerComplete() throws Exception { + CompletableFuture<Void> onUpdateHandlerFuture = new CompletableFuture<>(); + + UpdateLog updateLog = createAndStartUpdateLogImpl((update, metaStorageUpdateTimestamp, causalityToken) -> onUpdateHandlerFuture); + + assertThat(metastore.deployWatches(), willCompleteSuccessfully()); + + long metastoreRevision = metastore.appliedRevision(); + + assertThat(updateLog.append(singleEntryUpdateOfVersion(1)), willCompleteSuccessfully()); + + // Let's make sure that the metastore revision will not increase until onUpdateHandlerFuture is completed. + assertFalse(waitForCondition(() -> metastore.appliedRevision() > metastoreRevision, 200)); + + // Let's make sure that the metastore revision increases after completing onUpdateHandlerFuture. + onUpdateHandlerFuture.complete(null); + + assertTrue(waitForCondition(() -> metastore.appliedRevision() > metastoreRevision, 200)); + } + private static VersionedUpdate singleEntryUpdateOfVersion(int version) { return new VersionedUpdate(version, 1, List.of(new TestUpdateEntry("foo_" + version))); }