This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-21585 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 17ff500007496d0c47e7b545c1e8483f19c9224e Author: amashenkov <[email protected]> AuthorDate: Tue Mar 5 19:41:44 2024 +0300 Switch TableManager and IndexManager to LWM events for table/index destruction purposes. --- .../internal/catalog/CatalogManagerImpl.java | 52 +---- .../ignite/internal/catalog/CatalogService.java | 1 + .../internal/catalog/events/CatalogEvent.java | 1 + .../internal/catalog/CatalogManagerSelfTest.java | 32 +-- .../ignite/internal/catalog/CatalogTestUtils.java | 27 --- .../ignite/client/handler/FakeCatalogService.java | 4 +- .../ignite/internal/index/ItIndexManagerTest.java | 1 + .../apache/ignite/internal/index/IndexManager.java | 133 +++++++++--- .../ignite/internal/index/IndexManagerTest.java | 16 +- .../runner/app/ItIgniteNodeRestartTest.java | 5 +- .../runner/app/PlatformTestNodeRunner.java | 3 - .../org/apache/ignite/internal/app/IgniteImpl.java | 5 +- .../ignite/internal/IgniteIntegrationTest.java | 12 -- .../ignite/internal/schema/SchemaManager.java | 38 +--- .../ignite/internal/schema/SchemaManagerTest.java | 3 + .../internal/sql/sqllogic/ItSqlLogicTest.java | 2 - .../rebalance/ItRebalanceDistributedTest.java | 3 +- .../internal/table/distributed/TableManager.java | 226 +++++++++++---------- .../table/distributed/index/IndexUtils.java | 27 ++- .../table/distributed/TableManagerTest.java | 6 +- 20 files changed, 297 insertions(+), 300 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java index 00d99f8670..a790d9b3e5 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java @@ -25,8 +25,6 @@ import static org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParam import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; -import it.unimi.dsi.fastutil.ints.IntOpenHashSet; -import it.unimi.dsi.fastutil.ints.IntSet; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -40,7 +38,6 @@ import java.util.concurrent.Flow.Publisher; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; -import java.util.function.Predicate; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus; import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor; @@ -51,8 +48,6 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CatalogEventParameters; -import org.apache.ignite.internal.catalog.events.DestroyIndexEvent; -import org.apache.ignite.internal.catalog.events.DestroyTableEvent; import org.apache.ignite.internal.catalog.storage.Fireable; import org.apache.ignite.internal.catalog.storage.SnapshotEntry; import org.apache.ignite.internal.catalog.storage.UpdateEntry; @@ -370,6 +365,8 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata private void truncateUpTo(Catalog catalog) { catalogByVer.headMap(catalog.version(), false).clear(); catalogByTs.headMap(catalog.time(), false).clear(); + + LOG.info("Catalog history was truncated up to version=" + catalog.version()); } private CompletableFuture<Void> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) { @@ -460,53 +457,12 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata private CompletableFuture<Void> handle(SnapshotEntry event, long causalityToken) { Catalog catalog = event.snapshot(); - - // Use reverse order to find latest descriptors. - Collection<Catalog> droppedCatalogVersions = catalogByVer.headMap(catalog.version(), false).descendingMap().values(); - - List<Fireable> events = new ArrayList<>(); - IntSet objectToSkip = new IntOpenHashSet(); - Predicate<CatalogObjectDescriptor> filter = obj -> objectToSkip.add(obj.id()); - - // At first, add alive indexes to filter. - applyToAliveIndexesFrom(catalog.version(), filter::test); - - // Create destroy events for dropped indexes. - droppedCatalogVersions.forEach(oldCatalog -> oldCatalog.indexes().stream() - .filter(filter) - .forEach(idx -> events.add( - new DestroyIndexEvent(idx.id(), idx.tableId(), tableZoneDescriptor(oldCatalog, idx.tableId()).partitions())) - )); - - objectToSkip.clear(); - // At last, create destroy events for dropped tables. - droppedCatalogVersions.forEach(oldCatalog -> oldCatalog.tables().stream() - .filter(tbl -> catalog.table(tbl.id()) == null) - .filter(filter) - .forEach(tbl -> events.add(new DestroyTableEvent(tbl.id(), tableZoneDescriptor(oldCatalog, tbl.id()).partitions())))); - // On recovery phase, we must register catalog from the snapshot. // In other cases, it is ok to rewrite an existed version, because it's exactly the same. registerCatalog(catalog); + truncateUpTo(catalog); - List<CompletableFuture<?>> eventFutures = new ArrayList<>(events.size()); - - for (Fireable fireEvent : events) { - eventFutures.add(fireEvent( - fireEvent.eventType(), - fireEvent.createEventParameters(causalityToken, catalog.version()) - )); - } - - return allOf(eventFutures.toArray(CompletableFuture[]::new)) - .whenComplete((ignore, err) -> { - if (err != null) { - LOG.warn("Failed to compact catalog.", err); - // TODO: IGNITE-14611 Pass exception to an error handler? - } else { - truncateUpTo(catalog); - } - }); + return nullCompletedFuture(); } private CompletableFuture<Void> handle(VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) { diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java index 1477ee7a3a..a570e01cfc 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java @@ -101,6 +101,7 @@ public interface CatalogService extends EventProducer<CatalogEvent, CatalogEvent int earliestCatalogVersion(); /** Returns the earliest registered version of the catalog, which is observable since given timestamp. */ + // TODO IGNITE-21608 Use method without timestamp instead? int earliestCatalogVersion(long timestamp); /** Returns the latest registered version of the catalog. */ diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java index ecc5e60c0a..e736f1c560 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java @@ -59,6 +59,7 @@ public enum CatalogEvent implements Event { INDEX_REMOVED, /** This event is fired when an index has been dropped from all catalog versions and can be destroyed. */ + @Deprecated INDEX_DESTROY, /** This event is fired, when a distribution zone was created in Catalog. */ diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java index 5bc20d6216..4511ae4100 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java @@ -47,6 +47,7 @@ import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus. import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; @@ -128,8 +129,6 @@ import org.apache.ignite.internal.catalog.events.CatalogEventParameters; import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters; import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters; -import org.apache.ignite.internal.catalog.events.DestroyIndexEventParameters; -import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters; import org.apache.ignite.internal.catalog.events.DropColumnEventParameters; import org.apache.ignite.internal.catalog.events.DropTableEventParameters; import org.apache.ignite.internal.catalog.events.DropZoneEventParameters; @@ -1182,17 +1181,6 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { verifyNoMoreInteractions(eventListener); clearInvocations(eventListener); - - // Got 'destroy' event only after Catalog compaction. - assertThat(CatalogTestUtils.waitCatalogCompaction(manager, clock.nowLong()), equalTo(true)); - verify(eventListener, times(2)).notify(any(DestroyTableEventParameters.class)); - - verifyNoMoreInteractions(eventListener); - clearInvocations(); - - // Expect no events if Catalog wasn't compacted. - assertThat(CatalogTestUtils.waitCatalogCompaction(manager, clock.nowLong()), equalTo(false)); - verifyNoMoreInteractions(eventListener); } @Test @@ -1209,7 +1197,6 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { manager.listen(CatalogEvent.INDEX_AVAILABLE, eventListener); manager.listen(CatalogEvent.INDEX_STOPPING, eventListener); manager.listen(CatalogEvent.INDEX_REMOVED, eventListener); - manager.listen(CatalogEvent.INDEX_DESTROY, eventListener); // Try to create index without table. assertThat(manager.execute(createIndexCmd), willThrow(TableNotFoundValidationException.class)); @@ -1258,16 +1245,6 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { verify(eventListener).notify(any(RemoveIndexEventParameters.class)); verifyNoMoreInteractions(eventListener); clearInvocations(eventListener); - - // Got 'destroy' event only after Catalog compaction. - assertThat(CatalogTestUtils.waitCatalogCompaction(manager, clock.nowLong()), equalTo(true)); - verify(eventListener, times(2 /* PK + secondary index */)).notify(any(DestroyIndexEventParameters.class)); - - clearInvocations(); - - // Expect no events if Catalog wasn't compacted. - assertThat(CatalogTestUtils.waitCatalogCompaction(manager, clock.nowLong()), equalTo(false)); - verifyNoMoreInteractions(eventListener); } @Test @@ -2482,7 +2459,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { } @Test - public void testCatalogCompaction() { + public void testCatalogCompaction() throws InterruptedException { assertThat(manager.execute(simpleTable(TABLE_NAME)), willBe(nullValue())); assertThat(manager.execute(simpleTable(TABLE_NAME_2)), willBe(nullValue())); @@ -2493,9 +2470,8 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { assertThat(manager.execute(simpleIndex(TABLE_NAME, INDEX_NAME)), willBe(nullValue())); assertThat(manager.execute(simpleIndex(TABLE_NAME, INDEX_NAME_2)), willBe(nullValue())); - assertThat(CatalogTestUtils.waitCatalogCompaction(manager, timestamp), equalTo(true)); - - assertEquals(catalog.version(), manager.earliestCatalogVersion()); + assertThat(manager.compactCatalog(timestamp), willBe(Boolean.TRUE)); + assertTrue(waitForCondition(() -> catalog.version() == manager.earliestCatalogVersion(), 3_000)); assertNull(manager.catalog(0)); assertNull(manager.catalog(catalog.version() - 1)); diff --git a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java index 10e6dd46eb..3f0c3596c8 100644 --- a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java +++ b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java @@ -19,12 +19,10 @@ package org.apache.ignite.internal.catalog; import static java.util.concurrent.CompletableFuture.allOf; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.fail; import java.util.List; import java.util.Set; @@ -343,31 +341,6 @@ public class CatalogTestUtils { return AlterZoneCommand.builder().zoneName(zoneName); } - /** - * Starts catalog compaction and waits it finished locally. - * - * @param catalogManager Catalog manager. - * @param timestamp Timestamp catalog should be compacted up to. - * @return {@code True} if a new snapshot has been successfully written, {@code false} otherwise. - */ - public static boolean waitCatalogCompaction(CatalogManager catalogManager, long timestamp) { - int version = catalogManager.activeCatalogVersion(timestamp); - - CompletableFuture<Boolean> operationFuture = ((CatalogManagerImpl) catalogManager).compactCatalog(timestamp); - - try { - boolean result = operationFuture.get(); - - if (result) { - waitForCondition(() -> catalogManager.earliestCatalogVersion() == version, 3_000); - } - } catch (Exception e) { - fail(e); - } - - return operationFuture.join(); - } - private static class TestUpdateLog implements UpdateLog { private final HybridClock clock; diff --git a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java index bd22a7e675..8ef91d7e8e 100644 --- a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java +++ b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java @@ -150,12 +150,12 @@ public class FakeCatalogService implements CatalogService { } @Override - public int latestCatalogVersion() { + public int earliestCatalogVersion(long timestamp) { return 0; } @Override - public int earliestCatalogVersion(long timestamp) { + public int latestCatalogVersion() { return 0; } diff --git a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java index ca176498f9..fcb3b54e29 100644 --- a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java +++ b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java @@ -113,6 +113,7 @@ public class ItIndexManagerTest extends ClusterPerClassIntegrationTest { return future.join(); } + // TODO: validate this. private static List<Integer> collectIndexIdsFromCatalogForRecovery(IgniteImpl ignite, TableImpl table) { CatalogManager catalogManager = ignite.catalogManager(); diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java index 3bdbe766d5..537f8a3c71 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java @@ -17,14 +17,18 @@ package org.apache.ignite.internal.index; +import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.failedFuture; +import static java.util.concurrent.CompletableFuture.runAsync; import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE; -import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_DESTROY; +import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_REMOVED; import static org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexToTable; +import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,13 +36,15 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongFunction; +import java.util.stream.Collectors; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters; -import org.apache.ignite.internal.catalog.events.DestroyIndexEventParameters; +import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters; import org.apache.ignite.internal.causality.IncrementalVersionedValue; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.IgniteComponent; @@ -47,7 +53,9 @@ import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.storage.index.IndexStorage; +import org.apache.ignite.internal.table.DeferredEventsQueue; import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.distributed.LowWatermark; import org.apache.ignite.internal.table.distributed.PartitionSet; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.util.IgniteSpinBusyLock; @@ -84,6 +92,12 @@ public class IndexManager implements IgniteComponent { /** Versioned value to prevent races when registering/unregistering indexes when processing metastore or catalog events. */ private final IncrementalVersionedValue<Void> handleMetastoreEventVv; + /** Low watermark. */ + private final LowWatermark lowWatermark; + + /** Deferred destruction queue. */ + private final DeferredEventsQueue<DestroyIndexEvent> deferredQueue = new DeferredEventsQueue<>(DestroyIndexEvent::catalogVersion); + /** * Constructor. * @@ -97,12 +111,14 @@ public class IndexManager implements IgniteComponent { TableManager tableManager, CatalogService catalogService, ExecutorService ioExecutor, - Consumer<LongFunction<CompletableFuture<?>>> registry + Consumer<LongFunction<CompletableFuture<?>>> registry, + LowWatermark lowWatermark ) { this.schemaManager = schemaManager; this.tableManager = tableManager; this.catalogService = catalogService; this.ioExecutor = ioExecutor; + this.lowWatermark = lowWatermark; handleMetastoreEventVv = new IncrementalVersionedValue<>(registry); } @@ -111,8 +127,11 @@ public class IndexManager implements IgniteComponent { public CompletableFuture<Void> start() { LOG.debug("Index manager is about to start"); + recoverDeferredQueue(); + catalogService.listen(INDEX_CREATE, (CreateIndexEventParameters parameters) -> onIndexCreate(parameters)); - catalogService.listen(INDEX_DESTROY, (DestroyIndexEventParameters parameters) -> onIndexDestroy(parameters)); + catalogService.listen(INDEX_REMOVED, (RemoveIndexEventParameters parameters) -> onIndexRemoved(parameters)); + lowWatermark.addUpdateListener(this::onLwmChanged); LOG.info("Index manager started"); @@ -152,27 +171,6 @@ public class IndexManager implements IgniteComponent { return tableManager.tableAsync(causalityToken, tableId).thenApply(table -> table.internalTable().storage()); } - private CompletableFuture<Boolean> onIndexDestroy(DestroyIndexEventParameters parameters) { - int indexId = parameters.indexId(); - int tableId = parameters.tableId(); - - long causalityToken = parameters.causalityToken(); - - CompletableFuture<TableViewInternal> tableFuture = tableManager.tableAsync(causalityToken, tableId); - - return inBusyLockAsync(busyLock, () -> handleMetastoreEventVv.update( - causalityToken, - updater(unused -> tableFuture.thenApply(table -> inBusyLock(busyLock, () -> { - if (table != null) { - // In case of DROP TABLE the table will be removed first. - table.unregisterIndex(indexId); - } - - return null; - }))) - )).thenApply(unused -> false); - } - private CompletableFuture<Boolean> onIndexCreate(CreateIndexEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { CatalogIndexDescriptor index = parameters.indexDescriptor(); @@ -198,6 +196,51 @@ public class IndexManager implements IgniteComponent { }); } + private CompletableFuture<Boolean> onIndexRemoved(RemoveIndexEventParameters parameters) { + return inBusyLockAsync(busyLock, () -> { + int indexId = parameters.indexId(); + int version = parameters.catalogVersion(); + int prevVersion = version - 1; + + // Retrieve descriptor during synchronous call, before the previous catalog version could be concurrently compacted. + CatalogIndexDescriptor indexDescriptor = catalogService.index(indexId, prevVersion); + assert indexDescriptor != null : "index"; + + deferredQueue.enqueue(new DestroyIndexEvent(version, indexId, indexDescriptor.tableId())); + + return falseCompletedFuture(); + }); + } + + private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) { + return inBusyLockAsync(busyLock, () -> { + int earliestVersion = catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(ts)); + + List<DestroyIndexEvent> events = deferredQueue.drainUpTo(earliestVersion); + + if (events.isEmpty()) { + return nullCompletedFuture(); + } + + List<CompletableFuture<Void>> futures = deferredQueue.drainUpTo(earliestVersion).stream() + .map(event -> destroyIndexAsync(event.indexId(), event.tableId())) + .collect(Collectors.toList()); + + return allOf(futures.toArray(CompletableFuture[]::new)); + }); + } + + private CompletableFuture<Void> destroyIndexAsync(int indexId, int tableId) { + return runAsync(() -> inBusyLock(busyLock, () -> { + TableViewInternal table = tableManager.cachedTable(tableId); + + if (table != null) { + // In case of DROP TABLE the table will be removed with all it's indexes. + table.unregisterIndex(indexId); + } + }), ioExecutor); + } + private CompletableFuture<?> startIndexAsync( CatalogTableDescriptor table, CatalogIndexDescriptor index, @@ -249,4 +292,44 @@ public class IndexManager implements IgniteComponent { return updateFunction.apply(t); }; } + + /** Recover deferred destroy events. */ + private void recoverDeferredQueue() { + int earliestVersion = catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(lowWatermark.getLowWatermark())); + int latestVersion = catalogService.latestCatalogVersion(); + + synchronized ((deferredQueue)) { + for (int version = latestVersion - 1; version >= earliestVersion; version--) { + int nextVersion = version + 1; + catalogService.indexes(version).stream() + .filter(idx -> catalogService.index(idx.id(), nextVersion) == null) + .forEach(idx -> deferredQueue.enqueue(new DestroyIndexEvent(nextVersion, idx.id(), idx.tableId()))); + } + } + } + + /** Internal event. */ + private static class DestroyIndexEvent { + final int catalogVersion; + final int indexId; + final int tableId; + + DestroyIndexEvent(int catalogVersion, int indexId, int tableId) { + this.catalogVersion = catalogVersion; + this.indexId = indexId; + this.tableId = tableId; + } + + public int catalogVersion() { + return catalogVersion; + } + + public int indexId() { + return indexId; + } + + public int tableId() { + return tableId; + } + } } diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java index 72e809b7d0..1c7d7f9032 100644 --- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java @@ -22,7 +22,6 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME; import static org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager; -import static org.apache.ignite.internal.catalog.CatalogTestUtils.waitCatalogCompaction; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_DATA_REGION; import static org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME; import static org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME; @@ -34,7 +33,6 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; @@ -67,6 +65,7 @@ import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.TableTestUtils; import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.TestLowWatermark; import org.apache.ignite.internal.table.distributed.PartitionSet; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions; @@ -102,6 +101,8 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { private final Map<Integer, TableViewInternal> tableViewInternalByTableId = new ConcurrentHashMap<>(); + private TestLowWatermark lowWatermark = new TestLowWatermark(); + @BeforeEach public void setUp() { mockTableManager = mock(TableManager.class); @@ -146,7 +147,7 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { int tableId = indexDescriptor.tableId(); dropIndex(INDEX_NAME); - assertTrue(waitCatalogCompaction(catalogManager, Long.MAX_VALUE)); + assertThat(fireDestroyEvent(), willCompleteSuccessfully()); long causalityToken = 0L; // Use last token. MvTableStorage mvTableStorage = indexManager.getMvTableStorage(causalityToken, tableId).get(); @@ -164,7 +165,7 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { int tableId = indexDescriptor.tableId(); dropTable(TABLE_NAME); - assertTrue(waitCatalogCompaction(catalogManager, Long.MAX_VALUE)); + assertThat(fireDestroyEvent(), willCompleteSuccessfully()); long causalityToken = 0L; // Use last token. MvTableStorage mvTableStorage = indexManager.getMvTableStorage(causalityToken, tableId).get(); @@ -220,7 +221,8 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { mockTableManager, catalogManager, ForkJoinPool.commonPool(), - (LongFunction<CompletableFuture<?>> function) -> metaStorageManager.registerRevisionUpdateListener(function::apply) + (LongFunction<CompletableFuture<?>> function) -> metaStorageManager.registerRevisionUpdateListener(function::apply), + lowWatermark ); assertThat(allOf(metaStorageManager.start(), catalogManager.start(), indexManager.start()), willCompleteSuccessfully()); @@ -245,4 +247,8 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { private void dropIndex(String indexName) { TableTestUtils.dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, indexName); } + + private CompletableFuture<Void> fireDestroyEvent() { + return lowWatermark.updateAndNotify(clock.now()); + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index c22f720f00..ce4aee0028 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -525,7 +525,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { var sqlRef = new AtomicReference<IgniteSqlImpl>(); - LowWatermarkImpl lowWatermark = new LowWatermarkImpl(name, gcConfig.lowWatermark(), hybridClock, txManager, vault, failureProcessor); + var lowWatermark = new LowWatermarkImpl(name, gcConfig.lowWatermark(), hybridClock, txManager, vault, failureProcessor); TableManager tableManager = new TableManager( name, @@ -566,7 +566,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { tableManager, catalogManager, threadPoolsManager.tableIoExecutor(), - registry + registry, + lowWatermark ); var metricManager = new MetricManager(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java index 1580bde38b..14627286b1 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java @@ -61,7 +61,6 @@ import org.apache.ignite.IgnitionManager; import org.apache.ignite.InitParameters; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobExecutionContext; -import org.apache.ignite.internal.IgniteIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.binarytuple.BinaryTupleReader; import org.apache.ignite.internal.catalog.commands.ColumnParams; @@ -568,8 +567,6 @@ public class PlatformTestNodeRunner { session.execute(null, "DROP TABLE " + tableName + ""); } - IgniteIntegrationTest.forceCleanupAbandonedResources(context.ignite()); - return tableName; } } diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 210c08fcdd..faa1e19995 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -727,7 +727,8 @@ public class IgniteImpl implements Ignite { distributedTblMgr, catalogManager, threadPoolsManager.tableIoExecutor(), - registry + registry, + lowWatermark ); indexBuildingManager = new IndexBuildingManager( @@ -1450,7 +1451,7 @@ public class IgniteImpl implements Ignite { return resourcesRegistry; } - /** Returns low watermark */ + /** Returns low watermark. */ @TestOnly public LowWatermarkImpl lowWatermark() { return lowWatermark; diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java index 0ac6d4ce3e..9fa5710043 100644 --- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java +++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java @@ -17,9 +17,6 @@ package org.apache.ignite.internal; -import org.apache.ignite.Ignite; -import org.apache.ignite.internal.app.IgniteImpl; -import org.apache.ignite.internal.catalog.CatalogManagerImpl; import org.apache.ignite.internal.junit.StopAllIgnitesAfterTests; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; @@ -31,13 +28,4 @@ import org.junit.jupiter.api.extension.ExtendWith; // The order is important here. @ExtendWith({WorkDirectoryExtension.class, StopAllIgnitesAfterTests.class}) public abstract class IgniteIntegrationTest extends BaseIgniteAbstractTest { - /** - * Forcibly destroys partitions for dropped tables and indexes via triggering catalog compaction to the latest catalog version. - */ - public static void forceCleanupAbandonedResources(Ignite node) { - IgniteImpl node0 = (IgniteImpl) node; - CatalogManagerImpl catalogManager = (CatalogManagerImpl) node0.catalogManager(); - - catalogManager.compactCatalog(Long.MAX_VALUE); - } } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java index 3e9a6f7251..ccfb5a03e3 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java @@ -22,6 +22,7 @@ import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; +import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; import java.util.Collection; @@ -38,7 +39,6 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CatalogEventParameters; import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; -import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters; import org.apache.ignite.internal.catalog.events.TableEventParameters; import org.apache.ignite.internal.causality.IncrementalVersionedValue; import org.apache.ignite.internal.lang.IgniteInternalException; @@ -79,7 +79,8 @@ public class SchemaManager implements IgniteComponent { public CompletableFuture<Void> start() { catalogService.listen(CatalogEvent.TABLE_CREATE, this::onTableCreated); catalogService.listen(CatalogEvent.TABLE_ALTER, this::onTableAltered); - catalogService.listen(CatalogEvent.TABLE_DESTROY, this::onTableDestroyed); + + // TODO IGNITE-21585 subscribe to LWM updates. registerExistingTables(); @@ -173,12 +174,6 @@ public class SchemaManager implements IgniteComponent { } } - private CompletableFuture<Boolean> onTableDestroyed(CatalogEventParameters event) { - DestroyTableEventParameters creationEvent = (DestroyTableEventParameters) event; - - return dropRegistry(creationEvent.causalityToken(), creationEvent.tableId()).thenApply(ignored -> false); - } - private void setColumnMapping(SchemaDescriptor schema, int tableId) throws ExecutionException, InterruptedException { if (schema.version() == CatalogTableDescriptor.INITIAL_TABLE_VERSION) { return; @@ -309,29 +304,16 @@ public class SchemaManager implements IgniteComponent { /** * Drops schema registry for the given table id (along with the corresponding schemas). * - * @param causalityToken Causality token. * @param tableId Table id. */ - private CompletableFuture<?> dropRegistry(long causalityToken, int tableId) { - if (!busyLock.enterBusy()) { - throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException()); - } + // TODO IGNITE-21585: subscribe to LWM updates. + public CompletableFuture<?> dropRegistryAsync(int tableId) { + return inBusyLockAsync(busyLock, () -> { + SchemaRegistryImpl removedRegistry = registriesById.remove(tableId); + removedRegistry.close(); - try { - return registriesVv.update(causalityToken, (registries, e) -> inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(new IgniteInternalException( - format("Cannot remove a schema registry for the table [tblId={}]", tableId), e)); - } - - SchemaRegistryImpl removedRegistry = registriesById.remove(tableId); - removedRegistry.close(); - - return nullCompletedFuture(); - })); - } finally { - busyLock.leaveBusy(); - } + return falseCompletedFuture(); + }); } @Override diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java index 16a19ec9e9..2d772b693a 100644 --- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java +++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.sql.ColumnType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; @@ -251,6 +252,8 @@ class SchemaManagerTest extends BaseIgniteAbstractTest { completeCausalityToken(CAUSALITY_TOKEN_2); } + // TODO IGNITE-21585: subscribe to LWM + @Disabled("IGNITE-21585") @Test void destroyTableMakesRegistryUnavailable() { createSomeTable(); diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java index ad0123193d..581eba0f22 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java @@ -291,8 +291,6 @@ public class ItSqlLogicTest extends IgniteIntegrationTest { } } } - - forceCleanupAbandonedResources(CLUSTER_NODES.get(0)); } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index aed0d5ce4c..ea44af1c06 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -1247,7 +1247,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { tableManager, catalogManager, threadPoolsManager.tableIoExecutor(), - registry + registry, + lowWatermark ); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 561ff10ddf..f1067d41d4 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -105,7 +105,7 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; -import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters; +import org.apache.ignite.internal.catalog.events.DropTableEventParameters; import org.apache.ignite.internal.catalog.events.RenameTableEventParameters; import org.apache.ignite.internal.causality.CompletionListener; import org.apache.ignite.internal.causality.IncrementalVersionedValue; @@ -162,6 +162,7 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.storage.engine.StorageEngine; import org.apache.ignite.internal.storage.engine.StorageTableDescriptor; import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier; +import org.apache.ignite.internal.table.DeferredEventsQueue; import org.apache.ignite.internal.table.IgniteTablesInternal; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.TableImpl; @@ -292,6 +293,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** Started tables. */ private final Map<Integer, TableImpl> startedTables = new ConcurrentHashMap<>(); + /** Deferred destruction queue. */ + private final DeferredEventsQueue<DestroyTableEvent> deferredQueue = new DeferredEventsQueue<>(DestroyTableEvent::catalogVersion); + /** Local partitions. */ private final Map<Integer, PartitionSet> localPartsByTableId = new ConcurrentHashMap<>(); @@ -561,7 +565,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { long recoveryRevision = recoveryFinishFuture.join(); - startTables(recoveryRevision); + startTables(recoveryRevision, lowWatermark.getLowWatermark()); processAssignmentsOnRecovery(recoveryRevision); @@ -569,14 +573,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), stableAssignmentsRebalanceListener); metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX), assignmentsSwitchRebalanceListener); - catalogService.listen(CatalogEvent.TABLE_CREATE, parameters -> { - return onTableCreate((CreateTableEventParameters) parameters).thenApply(unused -> false); - }); - - catalogService.listen(CatalogEvent.TABLE_DESTROY, parameters -> { - return onTableDestroy(((DestroyTableEventParameters) parameters)).thenApply(unused -> false); - }); - + catalogService.listen(CatalogEvent.TABLE_CREATE, parameters -> onTableCreate((CreateTableEventParameters) parameters)); + catalogService.listen(CatalogEvent.TABLE_DROP, parameters -> onTableDrop(((DropTableEventParameters) parameters))); catalogService.listen(CatalogEvent.TABLE_ALTER, parameters -> { if (parameters instanceof RenameTableEventParameters) { return onTableRename((RenameTableEventParameters) parameters).thenApply(unused -> false); @@ -585,6 +583,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } }); + lowWatermark.addUpdateListener(this::onLwmChanged); + partitionReplicatorNodeRecovery.start(); return nullCompletedFuture(); @@ -644,8 +644,10 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } } - private CompletableFuture<?> onTableCreate(CreateTableEventParameters parameters) { - return createTableLocally(parameters.causalityToken(), parameters.catalogVersion(), parameters.tableDescriptor(), false); + private CompletableFuture<Boolean> onTableCreate(CreateTableEventParameters parameters) { + return inBusyLockAsync(busyLock, () -> + createTableLocally(parameters.causalityToken(), parameters.catalogVersion(), parameters.tableDescriptor(), false)) + .thenApply(unused -> false); } /** @@ -719,11 +721,29 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { }); } - private CompletableFuture<Void> onTableDestroy(DestroyTableEventParameters parameters) { + private CompletableFuture<Boolean> onTableDrop(DropTableEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { - dropTableLocally(parameters.causalityToken(), parameters); + deferredQueue.enqueue(new DestroyTableEvent(parameters.catalogVersion(), parameters.tableId())); - return nullCompletedFuture(); + return falseCompletedFuture(); + }); + } + + private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) { + return inBusyLockAsync(busyLock, () -> { + int earliestVersion = catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(ts)); + + List<DestroyTableEvent> events = deferredQueue.drainUpTo(earliestVersion); + + if (events.isEmpty()) { + return nullCompletedFuture(); + } + + List<CompletableFuture<Void>> futures = events.stream() + .map(event -> destroyTableLocally(event.tableId())) + .collect(Collectors.toList()); + + return allOf(futures.toArray(CompletableFuture[]::new)); }); } @@ -1177,45 +1197,44 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { CatalogTableDescriptor tableDescriptor, boolean onNodeRecovery ) { - return inBusyLockAsync(busyLock, () -> { - int tableId = tableDescriptor.id(); - int zoneId = tableDescriptor.zoneId(); + int tableId = tableDescriptor.id(); + int zoneId = tableDescriptor.zoneId(); - CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion); + // Retrieve descriptor during synchronous call, before the previous catalog version could be concurrently compacted. + CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion); - CompletableFuture<List<Assignments>> assignmentsFuture; + CompletableFuture<List<Assignments>> assignmentsFuture; - // Check if the table already has assignments in the meta storage locally. - // So, it means, that it is a recovery process and we should use the meta storage local assignments instead of calculation - // of the new ones. - if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, causalityToken) != null) { - assignmentsFuture = completedFuture( - tableAssignmentsGetLocally(metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken)); - } else { - assignmentsFuture = distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneId) - .thenApply(dataNodes -> AffinityUtils.calculateAssignments( - dataNodes, - zoneDescriptor.partitions(), - zoneDescriptor.replicas() - ).stream().map(Assignments::of).collect(Collectors.toList())); - - assignmentsFuture.thenAccept(assignmentsList -> { - LOG.info(IgniteStringFormatter.format("Assignments calculated from data nodes [table={}, tableId={}, assignments={}, " - + "revision={}]", tableDescriptor.name(), tableId, assignmentListToString(assignmentsList), causalityToken)); - }); - } + // Check if the table already has assignments in the meta storage locally. + // So, it means, that it is a recovery process and we should use the meta storage local assignments instead of calculation + // of the new ones. + if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, causalityToken) != null) { + assignmentsFuture = completedFuture( + tableAssignmentsGetLocally(metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken)); + } else { + assignmentsFuture = distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneId) + .thenApply(dataNodes -> AffinityUtils.calculateAssignments( + dataNodes, + zoneDescriptor.partitions(), + zoneDescriptor.replicas() + ).stream().map(Assignments::of).collect(Collectors.toList())); + + assignmentsFuture.thenAccept(assignmentsList -> { + LOG.info(IgniteStringFormatter.format("Assignments calculated from data nodes [table={}, tableId={}, assignments={}, " + + "revision={}]", tableDescriptor.name(), tableId, assignmentListToString(assignmentsList), causalityToken)); + }); + } - CompletableFuture<List<Assignments>> assignmentsFutureAfterInvoke = - writeTableAssignmentsToMetastore(tableId, assignmentsFuture); + CompletableFuture<List<Assignments>> assignmentsFutureAfterInvoke = + writeTableAssignmentsToMetastore(tableId, assignmentsFuture); - return createTableLocally( - causalityToken, - tableDescriptor, - zoneDescriptor, - assignmentsFutureAfterInvoke, - onNodeRecovery - ); - }); + return createTableLocally( + causalityToken, + tableDescriptor, + zoneDescriptor, + assignmentsFutureAfterInvoke, + onNodeRecovery + ); } /** @@ -1300,15 +1319,17 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return failedFuture(e); } - startedTables.put(tableId, table); - return allOf(localPartsUpdateFuture, tablesByIdFuture).thenComposeAsync(ignore -> inBusyLock(busyLock, () -> { if (onNodeRecovery) { SchemaRegistry schemaRegistry = table.schemaView(); PartitionSet partitionSet = localPartsByTableId.get(tableId); + HybridTimestamp lwm = lowWatermark.getLowWatermark(); - registerIndexesToTableOnNodeRecovery(table, catalogService, partitionSet, schemaRegistry); + registerIndexesToTableOnNodeRecovery(table, catalogService, partitionSet, schemaRegistry, lwm); } + + startedTables.put(tableId, table); + return startLocalPartitionsAndClients(assignmentsFuture, table, zoneDescriptor.id()); } ), ioExecutor); @@ -1377,63 +1398,43 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** * Drops local structures for a table. * - * @param causalityToken Causality token. - * @param parameters Destroy table event parameters. + * @param tableId Table id to destroy. */ - private void dropTableLocally(long causalityToken, DestroyTableEventParameters parameters) { - int tableId = parameters.tableId(); - // TODO Drop partitions from parameters and use from storage. - int partitions = parameters.partitions(); - - localPartitionsVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(e); - } - - localPartsByTableId.remove(tableId); - - return nullCompletedFuture(); - })); - - tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(e); - } - - TableImpl table = tables.get(tableId); - - assert table != null : tableId; - - InternalTable internalTable = table.internalTable(); - - CompletableFuture<?>[] stopReplicaFutures = new CompletableFuture<?>[partitions]; - - // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions should be stopped on the assignments change - // event triggered by zone drop or alter. Stop replica asynchronously, out of metastorage event pipeline. - for (int partitionId = 0; partitionId < partitions; partitionId++) { - var replicationGroupId = new TablePartitionId(tableId, partitionId); - - stopReplicaFutures[partitionId] = stopPartition(replicationGroupId, table); - } + private CompletableFuture<Void> destroyTableLocally(int tableId) { + TableImpl table = startedTables.remove(tableId); + localPartsByTableId.remove(tableId); - // TODO: IGNITE-18703 Destroy raft log and meta - return allOf(stopReplicaFutures) - .thenComposeAsync( - unused -> allOf( - internalTable.storage().destroy(), - runAsync(() -> internalTable.txStateStorage().destroy(), ioExecutor) - ), - ioExecutor - ).thenAccept(ignore0 -> tables.remove(tableId)); - })); + assert table != null; - startedTables.remove(tableId); + InternalTable internalTable = table.internalTable(); + int partitions = internalTable.partitions(); + // TODO https://issues.apache.org/jira/browse/IGNITE-18991 Move assigment manipulations to Distribution zones. Set<ByteArray> assignmentKeys = IntStream.range(0, partitions) .mapToObj(p -> stablePartAssignmentsKey(new TablePartitionId(tableId, p))) .collect(toSet()); - metaStorageMgr.removeAll(assignmentKeys); + + CompletableFuture<?>[] stopReplicaFutures = new CompletableFuture<?>[partitions]; + + // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions should be stopped on the assignments change + // event triggered by zone drop or alter. Stop replica asynchronously, out of metastorage event pipeline. + for (int partitionId = 0; partitionId < partitions; partitionId++) { + var replicationGroupId = new TablePartitionId(tableId, partitionId); + + stopReplicaFutures[partitionId] = stopPartition(replicationGroupId, table); + } + + // TODO: IGNITE-18703 Destroy raft log and meta + return allOf(stopReplicaFutures) + .thenComposeAsync( + unused -> inBusyLockAsync(busyLock, () -> allOf( + internalTable.storage().destroy(), + runAsync(() -> inBusyLock(busyLock, () -> internalTable.txStateStorage().destroy()), ioExecutor) + )), + ioExecutor) + .thenAccept(ignore0 -> tables.remove(tableId)) + .thenAcceptAsync(ignore0 -> schemaManager.dropRegistryAsync(tableId), ioExecutor); } @Override @@ -2398,10 +2399,10 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return tables.stream().filter(table -> table.name().equals(name)).findAny().orElse(null); } - private void startTables(long recoveryRevision) { + private void startTables(long recoveryRevision, @Nullable HybridTimestamp lwm) { sharedTxStateStorage.start(); - int earliestCatalogVersion = catalogService.earliestCatalogVersion(); + int earliestCatalogVersion = catalogService.activeCatalogVersion(HybridTimestamp.hybridTimestampToLong(lwm)); int latestCatalogVersion = catalogService.latestCatalogVersion(); var startedTables = new IntOpenHashSet(); @@ -2440,4 +2441,23 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return anyOf(future, stopManagerFuture).thenApply(o -> (T) o); } + + /** Internal event. */ + private static class DestroyTableEvent { + final int catalogVersion; + final int tableId; + + DestroyTableEvent(int catalogVersion, int tableId) { + this.catalogVersion = catalogVersion; + this.tableId = tableId; + } + + public int catalogVersion() { + return catalogVersion; + } + + public int tableId() { + return tableId; + } + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUtils.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUtils.java index f9793904d8..eac28e464b 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUtils.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUtils.java @@ -17,10 +17,12 @@ package org.apache.ignite.internal.table.distributed.index; -import java.util.HashSet; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor; import org.apache.ignite.internal.storage.index.StorageIndexDescriptor; @@ -28,6 +30,7 @@ import org.apache.ignite.internal.storage.index.StorageIndexDescriptor.StorageCo import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor; import org.apache.ignite.internal.table.TableViewInternal; import org.apache.ignite.internal.table.distributed.PartitionSet; +import org.jetbrains.annotations.Nullable; /** Auxiliary class for working with indexes that can contain useful methods and constants. */ public class IndexUtils { @@ -71,38 +74,40 @@ public class IndexUtils { } /** - * Registers indexes to a table on node recovery.. + * Registers indexes to a table on node recovery. * * @param table Table into which the index will be registered. * @param catalogService Catalog service. * @param partitionSet Partitions for which index storages will need to be created if they are missing. * @param schemaRegistry Table schema register. + * @param lwm Low watermark. */ public static void registerIndexesToTableOnNodeRecovery( TableViewInternal table, CatalogService catalogService, PartitionSet partitionSet, - SchemaRegistry schemaRegistry + SchemaRegistry schemaRegistry, + @Nullable HybridTimestamp lwm ) { - int earliestCatalogVersion = catalogService.earliestCatalogVersion(); + int earliestCatalogVersion = catalogService.activeCatalogVersion(HybridTimestamp.hybridTimestampToLong(lwm)); int latestCatalogVersion = catalogService.latestCatalogVersion(); int tableId = table.tableId(); - var indexIds = new HashSet<Integer>(); + var indexIds = new IntOpenHashSet(); - for (int catalogVersion = earliestCatalogVersion; catalogVersion <= latestCatalogVersion; catalogVersion++) { + for (int catalogVersion = latestCatalogVersion; catalogVersion >= earliestCatalogVersion; catalogVersion--) { CatalogTableDescriptor tableDescriptor = catalogService.table(tableId, catalogVersion); if (tableDescriptor == null) { continue; } - for (CatalogIndexDescriptor indexDescriptor : catalogService.indexes(catalogVersion, tableId)) { - if (indexIds.add(indexDescriptor.id())) { - registerIndexToTable(table, tableDescriptor, indexDescriptor, partitionSet, schemaRegistry); - } - } + int ver0 = catalogVersion; + catalogService.indexes(catalogVersion, tableId).stream() + .filter(idx -> ver0 == latestCatalogVersion || idx.status() == CatalogIndexStatus.AVAILABLE) // Alive index + .filter(idx -> indexIds.add(idx.id())) // Filter duplicates + .forEach(idx -> registerIndexToTable(table, tableDescriptor, idx, partitionSet, schemaRegistry)); } } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index 90eb159423..3f277f204d 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -359,7 +359,7 @@ public class TableManagerTest extends IgniteAbstractTest { verify(txStateTableStorage, atMost(0)).destroy(); verify(replicaMgr, atMost(0)).stopReplica(any()); - assertTrue(CatalogTestUtils.waitCatalogCompaction(catalogManager, Long.MAX_VALUE)); + assertThat(fireDestroyEvent(), willCompleteSuccessfully()); verify(mvTableStorage, timeout(TimeUnit.SECONDS.toMillis(10))).destroy(); verify(txStateTableStorage, timeout(TimeUnit.SECONDS.toMillis(10))).destroy(); @@ -867,4 +867,8 @@ public class TableManagerTest extends IgniteAbstractTest { private Collection<CatalogTableDescriptor> allTableDescriptors() { return catalogManager.tables(catalogManager.latestCatalogVersion()); } + + private CompletableFuture<Void> fireDestroyEvent() { + return lowWatermark.updateAndNotify(clock.now()); + } }
