This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-21585 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 043661bf69ac13c2f214e1cab0a227f25aeca0ea Author: amashenkov <andrey.mashen...@gmail.com> AuthorDate: Tue Mar 5 13:42:32 2024 +0300 LowWatermark refactored, interface extracted, added dummy implementation. Subscribe IndexManager and TableManager on LWM updates. --- .../internal/catalog/CatalogManagerImpl.java | 7 + .../ignite/internal/catalog/CatalogService.java | 3 + .../ignite/internal/catalog/CatalogTestUtils.java | 27 --- .../ignite/client/handler/FakeCatalogService.java | 5 + .../ignite/internal/index/ItIndexManagerTest.java | 5 +- .../apache/ignite/internal/index/IndexManager.java | 111 +++++++-- .../ignite/internal/index/IndexManagerTest.java | 28 ++- .../runner/app/ItIgniteNodeRestartTest.java | 7 +- .../runner/app/PlatformTestNodeRunner.java | 3 - .../org/apache/ignite/internal/app/IgniteImpl.java | 15 +- .../ignite/internal/IgniteIntegrationTest.java | 12 - .../ignite/internal/schema/SchemaManager.java | 18 +- .../internal/sql/sqllogic/ItSqlLogicTest.java | 2 - .../rebalance/ItRebalanceDistributedTest.java | 9 +- .../internal/table/distributed/LowWatermark.java | 261 +-------------------- .../distributed/LowWatermarkChangedListener.java | 2 +- .../{LowWatermark.java => LowWatermarkImpl.java} | 12 +- .../internal/table/distributed/TableManager.java | 146 +++++++----- .../table/distributed/LowWatermarkTest.java | 8 +- .../table/distributed/TableManagerTest.java | 15 +- .../ignite/internal/table/TestLowWatermark.java | 69 ++++++ 21 files changed, 334 insertions(+), 431 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java index 293b1376c6..00d99f8670 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java @@ -298,6 +298,13 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata return catalogAt(timestamp).version(); } + @Override + public int earliestCatalogVersion(long timestamp) { + Entry<Long, Catalog> earliestEntry = catalogByTs.floorEntry(timestamp); + + return (earliestEntry == null) ? earliestCatalogVersion() : earliestEntry.getValue().version(); + } + @Override public int earliestCatalogVersion() { return catalogByVer.firstEntry().getKey(); diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java index d2c167c863..1477ee7a3a 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java @@ -100,6 +100,9 @@ public interface CatalogService extends EventProducer<CatalogEvent, CatalogEvent /** Returns the earliest registered version of the catalog. */ int earliestCatalogVersion(); + /** Returns the earliest registered version of the catalog, which is observable since given timestamp. */ + int earliestCatalogVersion(long timestamp); + /** Returns the latest registered version of the catalog. */ int latestCatalogVersion(); diff --git a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java index 10e6dd46eb..3f0c3596c8 100644 --- a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java +++ b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java @@ -19,12 +19,10 @@ package org.apache.ignite.internal.catalog; import static java.util.concurrent.CompletableFuture.allOf; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.fail; import java.util.List; import java.util.Set; @@ -343,31 +341,6 @@ public class CatalogTestUtils { return AlterZoneCommand.builder().zoneName(zoneName); } - /** - * Starts catalog compaction and waits it finished locally. - * - * @param catalogManager Catalog manager. - * @param timestamp Timestamp catalog should be compacted up to. - * @return {@code True} if a new snapshot has been successfully written, {@code false} otherwise. - */ - public static boolean waitCatalogCompaction(CatalogManager catalogManager, long timestamp) { - int version = catalogManager.activeCatalogVersion(timestamp); - - CompletableFuture<Boolean> operationFuture = ((CatalogManagerImpl) catalogManager).compactCatalog(timestamp); - - try { - boolean result = operationFuture.get(); - - if (result) { - waitForCondition(() -> catalogManager.earliestCatalogVersion() == version, 3_000); - } - } catch (Exception e) { - fail(e); - } - - return operationFuture.join(); - } - private static class TestUpdateLog implements UpdateLog { private final HybridClock clock; diff --git a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java index 2e74e02311..bd22a7e675 100644 --- a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java +++ b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java @@ -154,6 +154,11 @@ public class FakeCatalogService implements CatalogService { return 0; } + @Override + public int earliestCatalogVersion(long timestamp) { + return 0; + } + @Override public CompletableFuture<Void> catalogReadyFuture(int version) { return null; diff --git a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java index 208cd96d5e..be7ec779ba 100644 --- a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java +++ b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.IntStream; import org.apache.ignite.internal.ClusterPerClassIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.table.Table; import org.junit.jupiter.api.AfterEach; @@ -118,7 +119,9 @@ public class ItIndexManagerTest extends ClusterPerClassIntegrationTest { IntList list = new IntArrayList(); int tableId = table.tableId(); - acceptAliveIndexes(ignite.catalogManager(), (tbl, idx) -> { + HybridTimestamp lowWatermark = ignite.lowWatermark().getLowWatermark(); + + acceptAliveIndexes(ignite.catalogManager(), lowWatermark, (tbl, idx) -> { if (tableId == idx.tableId()) { list.add(idx.id()); } diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java index 069161857f..70c315830a 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java @@ -17,10 +17,14 @@ package org.apache.ignite.internal.index; +import static java.util.Comparator.comparingInt; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.failedFuture; +import static java.util.concurrent.CompletableFuture.runAsync; import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE; import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_DESTROY; +import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_REMOVED; +import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; @@ -30,6 +34,8 @@ import it.unimi.dsi.fastutil.ints.IntSet; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -43,10 +49,14 @@ import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters; import org.apache.ignite.internal.catalog.events.DestroyIndexEventParameters; +import org.apache.ignite.internal.catalog.events.IndexEventParameters; +import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters; import org.apache.ignite.internal.causality.IncrementalVersionedValue; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.IgniteComponent; @@ -67,9 +77,11 @@ import org.apache.ignite.internal.storage.index.StorageIndexDescriptor; import org.apache.ignite.internal.storage.index.StorageIndexDescriptor.StorageColumnDescriptor; import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor; import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.distributed.LowWatermark; import org.apache.ignite.internal.table.distributed.PartitionSet; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.jetbrains.annotations.Nullable; /** * An Ignite component that is responsible for handling index-related commands like CREATE or DROP @@ -111,6 +123,13 @@ public class IndexManager implements IgniteComponent { /** Table storages by ID for which indexes were created. */ private final Map<Integer, MvTableStorage> tableStoragesById = new ConcurrentHashMap<>(); + /** Low watermark. */ + private final LowWatermark lowWatermark; + + /** Deferred destruction queue. */ + private final Queue<DestroyIndexEventParameters> deferredQueue = + new PriorityQueue<>(comparingInt(IndexEventParameters::catalogVersion)); + /** * Constructor. * @@ -125,13 +144,15 @@ public class IndexManager implements IgniteComponent { CatalogService catalogService, MetaStorageManager metaStorageManager, ExecutorService ioExecutor, - Consumer<LongFunction<CompletableFuture<?>>> registry + Consumer<LongFunction<CompletableFuture<?>>> registry, + LowWatermark lowWatermark ) { this.schemaManager = schemaManager; this.tableManager = tableManager; this.catalogService = catalogService; this.metaStorageManager = metaStorageManager; this.ioExecutor = ioExecutor; + this.lowWatermark = lowWatermark; startVv = new IncrementalVersionedValue<>(registry); tableStoragesVv = new IncrementalVersionedValue<>(registry); @@ -141,10 +162,12 @@ public class IndexManager implements IgniteComponent { public CompletableFuture<Void> start() { LOG.debug("Index manager is about to start"); - startIndexes(); + startIndexes(lowWatermark.getLowWatermark()); catalogService.listen(INDEX_CREATE, (CreateIndexEventParameters parameters) -> onIndexCreate(parameters)); + catalogService.listen(INDEX_REMOVED, (RemoveIndexEventParameters parameters) -> onIndexRemoved(parameters)); catalogService.listen(INDEX_DESTROY, (DestroyIndexEventParameters parameters) -> onIndexDestroy(parameters)); + lowWatermark.addUpdateListener(this::onLwmChanged); LOG.info("Index manager started"); @@ -184,25 +207,63 @@ public class IndexManager implements IgniteComponent { return tableStoragesVv.get(causalityToken).thenApply(ignore -> tableStoragesById.get(tableId)); } + private CompletableFuture<Boolean> onIndexRemoved(RemoveIndexEventParameters parameters) { + return inBusyLock(busyLock, () -> { + int indexId = parameters.indexId(); + int version = parameters.catalogVersion() - 1; + + CatalogIndexDescriptor indexDescriptor = catalogService.index(indexId, version); + assert indexDescriptor != null : "index"; + + CatalogTableDescriptor tableDescriptor = catalogService.table(indexDescriptor.tableId(), version); + assert tableDescriptor != null : "table"; + + + CatalogZoneDescriptor zoneDescriptor = catalogService.zone(tableDescriptor.zoneId(), version); + assert zoneDescriptor != null : "zone"; + + int tableId = tableDescriptor.id(); + + synchronized (deferredQueue) { + deferredQueue.offer(new DestroyIndexEventParameters(-1L, -1, indexId, tableId, zoneDescriptor.partitions())); + } + + return falseCompletedFuture(); + }); + } + + private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) { + return runAsync(() -> { + int earliestVersion = catalogService.activeCatalogVersion(ts.longValue()); + + // TODO: any hint if there is smth to clean ??? + + synchronized ((deferredQueue)) { + DestroyIndexEventParameters next; + + while ((next = deferredQueue.peek()) != null && next.catalogVersion() < earliestVersion) { + next = deferredQueue.poll(); + + onIndexDestroy(next); + } + } + }, ioExecutor); + } + private CompletableFuture<Boolean> onIndexDestroy(DestroyIndexEventParameters parameters) { int indexId = parameters.indexId(); int tableId = parameters.tableId(); - long causalityToken = parameters.causalityToken(); - - CompletableFuture<TableViewInternal> tableFuture = tableManager.tableAsync(causalityToken, tableId); + TableViewInternal table = tableManager.cachedTable(tableId); - return inBusyLockAsync(busyLock, () -> tableStoragesVv.update( - causalityToken, - updater(ignore -> tableFuture.thenAccept(table -> inBusyLock(busyLock, () -> { - if (table != null) { - // In case of DROP TABLE the table will be removed first. - table.unregisterIndex(indexId); - } else { - tableStoragesById.remove(tableId); - } - }))) - )).thenApply(unused -> false); + return runAsync(() -> inBusyLock(busyLock, () -> { + if (table != null) { + // In case of DROP TABLE the table will be removed first. + table.unregisterIndex(indexId); + } else { + tableStoragesById.remove(tableId); + } + }), ioExecutor).thenApply(unused -> false); } private CompletableFuture<Boolean> onIndexCreate(CreateIndexEventParameters parameters) { @@ -315,7 +376,7 @@ public class IndexManager implements IgniteComponent { } } - private void startIndexes() { + private void startIndexes(@Nullable HybridTimestamp lwm) { CompletableFuture<Long> recoveryFinishedFuture = metaStorageManager.recoveryFinishedFuture(); assert recoveryFinishedFuture.isDone(); @@ -324,7 +385,7 @@ public class IndexManager implements IgniteComponent { List<CompletableFuture<?>> startIndexFutures = new ArrayList<>(); - acceptAliveIndexes(catalogService, (table, index) -> startIndexFutures.add(startIndexAsync(table, index, causalityToken))); + acceptAliveIndexes(catalogService, lwm, (table, index) -> startIndexFutures.add(startIndexAsync(table, index, causalityToken))); // Forces to wait until recovery is complete before the metastore watches are deployed to avoid races with other components. startVv.update(causalityToken, (unused, throwable) -> allOf(startIndexFutures.toArray(CompletableFuture[]::new))) @@ -416,14 +477,20 @@ public class IndexManager implements IgniteComponent { } /** - * Collects indexes (including deleted ones) for tables (tables from the latest version of the catalog) from the earliest to the latest - * version of the catalog that need to be started on node recovery. + * Collects indexes for tables from the earliest to the latest observable catalog version, which need to be started on node recovery. + * If low watermark is not set, then earliest catalog version will be used instead. * * @param catalogService Catalog service. + * @param lowWatermark Low watermark or {@code null} for calculating earliest available catalog version. + * @param consumer A consumer that accepts alive index' descriptor. */ - static void acceptAliveIndexes(CatalogService catalogService, BiConsumer<CatalogTableDescriptor, CatalogIndexDescriptor> consumer) { - int earliestCatalogVersion = catalogService.earliestCatalogVersion(); + static void acceptAliveIndexes( + CatalogService catalogService, + @Nullable HybridTimestamp lowWatermark, + BiConsumer<CatalogTableDescriptor, CatalogIndexDescriptor> consumer + ) { int latestCatalogVersion = catalogService.latestCatalogVersion(); + int earliestCatalogVersion = catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(lowWatermark)); IntSet processedObjects = new IntOpenHashSet(); catalogService.indexes(latestCatalogVersion).stream() diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java index 202f4a8267..bfcbcc8d30 100644 --- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java @@ -23,7 +23,6 @@ import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME; import static org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager; -import static org.apache.ignite.internal.catalog.CatalogTestUtils.waitCatalogCompaction; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_DATA_REGION; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName; import static org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME; @@ -47,7 +46,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; @@ -79,6 +77,7 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.impl.MetaStorageService; @@ -92,6 +91,7 @@ import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.TableTestUtils; import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.TestLowWatermark; import org.apache.ignite.internal.table.distributed.PartitionSet; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions; @@ -133,6 +133,8 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { private final Map<Integer, TableViewInternal> tableViewInternalByTableId = new ConcurrentHashMap<>(); + private TestLowWatermark lowWatermark = new TestLowWatermark(); + @BeforeEach public void setUp() { mockTableManager = mock(TableManager.class); @@ -230,7 +232,7 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { int tableId = indexDescriptor.tableId(); dropIndex(INDEX_NAME); - assertTrue(waitCatalogCompaction(catalogManager, Long.MAX_VALUE)); + assertThat(fireDestroyEvent(), willCompleteSuccessfully()); long causalityToken = 0L; // Use last token. MvTableStorage mvTableStorage = indexManager.getMvTableStorage(causalityToken, tableId).get(); @@ -248,7 +250,7 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { int tableId = indexDescriptor.tableId(); dropTable(TABLE_NAME); - assertTrue(waitCatalogCompaction(catalogManager, Long.MAX_VALUE)); + assertThat(fireDestroyEvent(), willCompleteSuccessfully()); long causalityToken = 0L; // Use last token. MvTableStorage mvTableStorage = indexManager.getMvTableStorage(causalityToken, tableId).get(); @@ -256,7 +258,6 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { verify(mvTableStorage).destroyIndex(indexId); } - @Test void testCollectIndexesForRecoveryForCreatedTables() { createTable(OTHER_TABLE_NAME); @@ -294,7 +295,7 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { createTable(OTHER_TABLE_NAME); dropTable(OTHER_TABLE_NAME); - assertTrue(waitCatalogCompaction(catalogManager, Long.MAX_VALUE)); + assertThat(fireDestroyEvent(), willCompleteSuccessfully()); Map<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>> collectedIndexes = collectIndexesForRecovery(); @@ -370,12 +371,12 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { dropIndexes(indexName3, indexName4, indexName5); removeIndex(catalogManager, removedIndexId); - assertTrue(waitCatalogCompaction(catalogManager, Long.MAX_VALUE)); + IgniteUtils.stopAll(indexManager, catalogManager, metaStorageManager); TableViewInternal tableViewInternal = tableViewInternalByTableId.get(tableId()); clearInvocations(tableViewInternal); + lowWatermark.update(clock.now()); - IgniteUtils.stopAll(indexManager, catalogManager, metaStorageManager); createAndStartComponents(); ArgumentCaptor<StorageHashIndexDescriptor> captor = ArgumentCaptor.forClass(StorageHashIndexDescriptor.class); @@ -461,7 +462,8 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { catalogManager, metaStorageManager, ForkJoinPool.commonPool(), - (LongFunction<CompletableFuture<?>> function) -> metaStorageManager.registerRevisionUpdateListener(function::apply) + (LongFunction<CompletableFuture<?>> function) -> metaStorageManager.registerRevisionUpdateListener(function::apply), + lowWatermark ); assertThat(allOf(metaStorageManager.start(), catalogManager.start(), indexManager.start()), willCompleteSuccessfully()); @@ -549,8 +551,14 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { private Map<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>> collectIndexesForRecovery() { Map<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>> res = new HashMap<>(); - IndexManager.acceptAliveIndexes(catalogManager, (k, v) -> res.computeIfAbsent(k, ignore -> new ArrayList<>()).add(v)); + HybridTimestamp lwm = lowWatermark.getLowWatermark(); + + IndexManager.acceptAliveIndexes(catalogManager, lwm, (k, v) -> res.computeIfAbsent(k, ignore -> new ArrayList<>()).add(v)); return res; } + + private CompletableFuture<Void> fireDestroyEvent() { + return lowWatermark.updateAndNotify(clock.now()); + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index f083095297..10204f5337 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -161,7 +161,7 @@ import org.apache.ignite.internal.storage.DataStorageModules; import org.apache.ignite.internal.systemview.SystemViewManagerImpl; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.TableViewInternal; -import org.apache.ignite.internal.table.distributed.LowWatermark; +import org.apache.ignite.internal.table.distributed.LowWatermarkImpl; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; @@ -525,7 +525,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { var sqlRef = new AtomicReference<IgniteSqlImpl>(); - LowWatermark lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(), hybridClock, txManager, vault, failureProcessor); + LowWatermarkImpl lowWatermark = new LowWatermarkImpl(name, gcConfig.lowWatermark(), hybridClock, txManager, vault, failureProcessor); TableManager tableManager = new TableManager( name, @@ -567,7 +567,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { catalogManager, metaStorageMgr, threadPoolsManager.tableIoExecutor(), - registry + registry, + lowWatermark ); var metricManager = new MetricManager(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java index 1580bde38b..14627286b1 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java @@ -61,7 +61,6 @@ import org.apache.ignite.IgnitionManager; import org.apache.ignite.InitParameters; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobExecutionContext; -import org.apache.ignite.internal.IgniteIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.binarytuple.BinaryTupleReader; import org.apache.ignite.internal.catalog.commands.ColumnParams; @@ -568,8 +567,6 @@ public class PlatformTestNodeRunner { session.execute(null, "DROP TABLE " + tableName + ""); } - IgniteIntegrationTest.forceCleanupAbandonedResources(context.ignite()); - return tableName; } } diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 5dc2b7ea5f..b4dc4bccb5 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -177,7 +177,7 @@ import org.apache.ignite.internal.storage.engine.StorageEngine; import org.apache.ignite.internal.storage.engine.ThreadAssertingStorageEngine; import org.apache.ignite.internal.systemview.SystemViewManagerImpl; import org.apache.ignite.internal.systemview.api.SystemViewManager; -import org.apache.ignite.internal.table.distributed.LowWatermark; +import org.apache.ignite.internal.table.distributed.LowWatermarkImpl; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; @@ -345,7 +345,7 @@ public class IgniteImpl implements Ignite { private final ClockWaiter clockWaiter; - private final LowWatermark lowWatermark; + private final LowWatermarkImpl lowWatermark; private final OutgoingSnapshotsManager outgoingSnapshotsManager; @@ -686,7 +686,7 @@ public class IgniteImpl implements Ignite { StorageUpdateConfiguration storageUpdateConfiguration = clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY); - lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(), clock, txManager, vaultMgr, failureProcessor); + lowWatermark = new LowWatermarkImpl(name, gcConfig.lowWatermark(), clock, txManager, vaultMgr, failureProcessor); distributedTblMgr = new TableManager( name, @@ -728,7 +728,8 @@ public class IgniteImpl implements Ignite { catalogManager, metaStorageMgr, threadPoolsManager.tableIoExecutor(), - registry + registry, + lowWatermark ); indexBuildingManager = new IndexBuildingManager( @@ -1450,4 +1451,10 @@ public class IgniteImpl implements Ignite { public RemotelyTriggeredResourceRegistry resourcesRegistry() { return resourcesRegistry; } + + /** Returns low watermark */ + @TestOnly + public LowWatermarkImpl lowWatermark() { + return lowWatermark; + } } diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java index 0ac6d4ce3e..9fa5710043 100644 --- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java +++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java @@ -17,9 +17,6 @@ package org.apache.ignite.internal; -import org.apache.ignite.Ignite; -import org.apache.ignite.internal.app.IgniteImpl; -import org.apache.ignite.internal.catalog.CatalogManagerImpl; import org.apache.ignite.internal.junit.StopAllIgnitesAfterTests; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; @@ -31,13 +28,4 @@ import org.junit.jupiter.api.extension.ExtendWith; // The order is important here. @ExtendWith({WorkDirectoryExtension.class, StopAllIgnitesAfterTests.class}) public abstract class IgniteIntegrationTest extends BaseIgniteAbstractTest { - /** - * Forcibly destroys partitions for dropped tables and indexes via triggering catalog compaction to the latest catalog version. - */ - public static void forceCleanupAbandonedResources(Ignite node) { - IgniteImpl node0 = (IgniteImpl) node; - CatalogManagerImpl catalogManager = (CatalogManagerImpl) node0.catalogManager(); - - catalogManager.compactCatalog(Long.MAX_VALUE); - } } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java index 3e9a6f7251..a824585df2 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java @@ -176,7 +176,7 @@ public class SchemaManager implements IgniteComponent { private CompletableFuture<Boolean> onTableDestroyed(CatalogEventParameters event) { DestroyTableEventParameters creationEvent = (DestroyTableEventParameters) event; - return dropRegistry(creationEvent.causalityToken(), creationEvent.tableId()).thenApply(ignored -> false); + return dropRegistry(creationEvent.tableId()).thenApply(ignored -> false); } private void setColumnMapping(SchemaDescriptor schema, int tableId) throws ExecutionException, InterruptedException { @@ -309,26 +309,18 @@ public class SchemaManager implements IgniteComponent { /** * Drops schema registry for the given table id (along with the corresponding schemas). * - * @param causalityToken Causality token. * @param tableId Table id. */ - private CompletableFuture<?> dropRegistry(long causalityToken, int tableId) { + private CompletableFuture<?> dropRegistry(int tableId) { if (!busyLock.enterBusy()) { throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException()); } try { - return registriesVv.update(causalityToken, (registries, e) -> inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(new IgniteInternalException( - format("Cannot remove a schema registry for the table [tblId={}]", tableId), e)); - } - - SchemaRegistryImpl removedRegistry = registriesById.remove(tableId); - removedRegistry.close(); + SchemaRegistryImpl removedRegistry = registriesById.remove(tableId); + removedRegistry.close(); - return nullCompletedFuture(); - })); + return nullCompletedFuture(); } finally { busyLock.leaveBusy(); } diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java index ad0123193d..581eba0f22 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java @@ -291,8 +291,6 @@ public class ItSqlLogicTest extends IgniteIntegrationTest { } } } - - forceCleanupAbandonedResources(CLUSTER_NODES.get(0)); } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 6c25e6a32f..31325fe6d8 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -168,7 +168,7 @@ import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.TableRaftService; import org.apache.ignite.internal.table.TableTestUtils; import org.apache.ignite.internal.table.TableViewInternal; -import org.apache.ignite.internal.table.distributed.LowWatermark; +import org.apache.ignite.internal.table.distributed.LowWatermarkImpl; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; @@ -955,7 +955,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { private final NetworkAddress networkAddress; - private final LowWatermark lowWatermark; + private final LowWatermarkImpl lowWatermark; /** The future have to be complete after the node start and all Meta storage watches are deployd. */ private CompletableFuture<Void> deployWatchesFut; @@ -1172,7 +1172,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { StorageUpdateConfiguration storageUpdateConfiguration = clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY); HybridClockImpl clock = new HybridClockImpl(); - lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(), clock, txManager, vaultManager, failureProcessor); + lowWatermark = new LowWatermarkImpl(name, gcConfig.lowWatermark(), clock, txManager, vaultManager, failureProcessor); tableManager = new TableManager( name, @@ -1248,7 +1248,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { catalogManager, metaStorageManager, threadPoolsManager.tableIoExecutor(), - registry + registry, + lowWatermark ); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java index 8e5cc3cf20..6b77076d8f 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java @@ -17,264 +17,19 @@ package org.apache.ignite.internal.table.distributed; -import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR; -import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; -import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.ignite.internal.failure.FailureContext; -import org.apache.ignite.internal.failure.FailureProcessor; -import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.lang.ByteArray; -import org.apache.ignite.internal.lang.NodeStoppingException; -import org.apache.ignite.internal.logger.IgniteLogger; -import org.apache.ignite.internal.logger.Loggers; -import org.apache.ignite.internal.manager.IgniteComponent; -import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; -import org.apache.ignite.internal.thread.NamedThreadFactory; -import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.internal.util.ByteUtils; -import org.apache.ignite.internal.util.IgniteSpinBusyLock; -import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.internal.vault.VaultEntry; -import org.apache.ignite.internal.vault.VaultManager; import org.jetbrains.annotations.Nullable; /** - * Class to manage the low watermark. - * - * <p>Low watermark is the node's local time, which ensures that read-only transactions have completed by this time, and new read-only - * transactions will only be created after this time, and we can safely delete obsolete/garbage data such as: obsolete versions of table - * rows, remote indexes, remote tables, etc. - * - * @see <a href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a> + * An interface for tracking Low watermark. */ -public class LowWatermark implements IgniteComponent { - private static final IgniteLogger LOG = Loggers.forClass(LowWatermark.class); - - static final ByteArray LOW_WATERMARK_VAULT_KEY = new ByteArray("low-watermark"); - - private final LowWatermarkConfiguration lowWatermarkConfig; - - private final HybridClock clock; - - private final TxManager txManager; - - private final VaultManager vaultManager; - - private final List<LowWatermarkChangedListener> updateListeners = new CopyOnWriteArrayList<>(); - - private final ScheduledExecutorService scheduledThreadPool; - - private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); - - private final AtomicBoolean closeGuard = new AtomicBoolean(); - - private volatile @Nullable HybridTimestamp lowWatermark; - - private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture = new AtomicReference<>(); - - private final FailureProcessor failureProcessor; - - /** - * Constructor. - * - * @param nodeName Node name. - * @param lowWatermarkConfig Low watermark configuration. - * @param clock A hybrid logical clock. - * @param txManager Transaction manager. - * @param vaultManager Vault manager. - * @param failureProcessor Failure processor tha is used to handle critical errors. - */ - public LowWatermark( - String nodeName, - LowWatermarkConfiguration lowWatermarkConfig, - HybridClock clock, - TxManager txManager, - VaultManager vaultManager, - FailureProcessor failureProcessor - ) { - this.lowWatermarkConfig = lowWatermarkConfig; - this.clock = clock; - this.txManager = txManager; - this.vaultManager = vaultManager; - this.failureProcessor = failureProcessor; - - scheduledThreadPool = Executors.newSingleThreadScheduledExecutor( - NamedThreadFactory.create(nodeName, "low-watermark-updater", LOG) - ); - } - - /** - * Starts the watermark manager. - */ - @Override - public CompletableFuture<Void> start() { - inBusyLock(busyLock, () -> { - lowWatermark = readLowWatermarkFromVault(); - }); - - return nullCompletedFuture(); - } - - /** - * Schedule watermark updates. - */ - public void scheduleUpdates() { - inBusyLock(busyLock, () -> { - HybridTimestamp lowWatermarkCandidate = lowWatermark; - - if (lowWatermarkCandidate == null) { - LOG.info("Previous value of the low watermark was not found, will schedule to update it"); - - scheduleUpdateLowWatermarkBusy(); - - return; - } - - LOG.info("Low watermark has been scheduled to be updated: {}", lowWatermarkCandidate); - - txManager.updateLowWatermark(lowWatermarkCandidate) - .thenComposeAsync(unused -> inBusyLock(busyLock, () -> notifyListeners(lowWatermarkCandidate)), scheduledThreadPool) - .whenComplete((unused, throwable) -> { - if (throwable == null) { - inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy); - } else if (!(throwable instanceof NodeStoppingException)) { - LOG.error("Error during the Watermark manager start", throwable); - - failureProcessor.process(new FailureContext(CRITICAL_ERROR, throwable)); - - inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy); - } - }); - }); - } - - private @Nullable HybridTimestamp readLowWatermarkFromVault() { - VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY); - - return vaultEntry == null ? null : ByteUtils.fromBytes(vaultEntry.value()); - } - - @Override - public void stop() { - if (!closeGuard.compareAndSet(false, true)) { - return; - } - - busyLock.block(); - - ScheduledFuture<?> lastScheduledTaskFuture = this.lastScheduledTaskFuture.get(); - - if (lastScheduledTaskFuture != null) { - lastScheduledTaskFuture.cancel(true); - } - - IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, TimeUnit.SECONDS); - } - - /** - * Returns the current low watermark, {@code null} means no low watermark has been assigned yet. - */ - public @Nullable HybridTimestamp getLowWatermark() { - return lowWatermark; - } - - void updateLowWatermark() { - inBusyLock(busyLock, () -> { - HybridTimestamp lowWatermarkCandidate = createNewLowWatermarkCandidate(); - - // Wait until all the read-only transactions are finished before the new candidate, since no new RO transactions could be - // created, then we can safely promote the candidate as a new low watermark, store it in vault, and we can safely start cleaning - // up the stale/junk data in the tables. - txManager.updateLowWatermark(lowWatermarkCandidate) - .thenComposeAsync(unused -> inBusyLock(busyLock, () -> { - vaultManager.put(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(lowWatermarkCandidate)); - - lowWatermark = lowWatermarkCandidate; - - return notifyListeners(lowWatermarkCandidate); - }), scheduledThreadPool) - .whenComplete((unused, throwable) -> { - if (throwable != null) { - if (!(throwable instanceof NodeStoppingException)) { - LOG.error("Failed to update low watermark, will schedule again: {}", throwable, lowWatermarkCandidate); - - inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy); - } - } else { - LOG.info("Successful low watermark update: {}", lowWatermarkCandidate); - - scheduleUpdateLowWatermarkBusy(); - } - }); - }); - } - - public void addUpdateListener(LowWatermarkChangedListener listener) { - updateListeners.add(listener); - } - - public void removeUpdateListener(LowWatermarkChangedListener listener) { - updateListeners.remove(listener); - } - - private CompletableFuture<Void> notifyListeners(HybridTimestamp lowWatermark) { - if (updateListeners.isEmpty()) { - return nullCompletedFuture(); - } - - ArrayList<CompletableFuture<?>> res = new ArrayList<>(); - for (LowWatermarkChangedListener updateListener : updateListeners) { - res.add(updateListener.onLwmChanged(lowWatermark)); - } - - return CompletableFuture.allOf(res.toArray(CompletableFuture[]::new)); - } - - private void scheduleUpdateLowWatermarkBusy() { - ScheduledFuture<?> previousScheduledFuture = this.lastScheduledTaskFuture.get(); - - assert previousScheduledFuture == null || previousScheduledFuture.isDone() : "previous scheduled task has not finished"; - - ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule( - this::updateLowWatermark, - lowWatermarkConfig.updateFrequency().value(), - TimeUnit.MILLISECONDS - ); - - boolean casResult = lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, newScheduledFuture); - - assert casResult : "only one scheduled task is expected"; - } - - HybridTimestamp createNewLowWatermarkCandidate() { - HybridTimestamp now = clock.now(); - - HybridTimestamp lowWatermarkCandidate = now.addPhysicalTime( - -lowWatermarkConfig.dataAvailabilityTime().value() - getMaxClockSkew() - ); - - HybridTimestamp lowWatermark = this.lowWatermark; - - assert lowWatermark == null || lowWatermarkCandidate.compareTo(lowWatermark) > 0 : - "lowWatermark=" + lowWatermark + ", lowWatermarkCandidate=" + lowWatermarkCandidate; +public interface LowWatermark { + /** Returns the current low watermark, {@code null} means no low watermark has been assigned yet. */ + @Nullable HybridTimestamp getLowWatermark(); - return lowWatermarkCandidate; - } + /** Subscribes on watermark changes. */ + void addUpdateListener(LowWatermarkChangedListener listener); - private long getMaxClockSkew() { - // TODO: IGNITE-19287 Add Implementation - return HybridTimestamp.CLOCK_SKEW; - } + /** Unsubscribes on watermark changes. */ + void removeUpdateListener(LowWatermarkChangedListener listener); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java index 2a738a7c46..72e395bbaf 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java @@ -21,7 +21,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.hlc.HybridTimestamp; /** - * LWM event listener interface. + * Low watermark event listener interface. * * @see LowWatermark */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java similarity index 98% copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java index 8e5cc3cf20..177ee9d652 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java @@ -59,8 +59,8 @@ import org.jetbrains.annotations.Nullable; * * @see <a href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a> */ -public class LowWatermark implements IgniteComponent { - private static final IgniteLogger LOG = Loggers.forClass(LowWatermark.class); +public class LowWatermarkImpl implements IgniteComponent, LowWatermark { + private static final IgniteLogger LOG = Loggers.forClass(LowWatermarkImpl.class); static final ByteArray LOW_WATERMARK_VAULT_KEY = new ByteArray("low-watermark"); @@ -96,7 +96,7 @@ public class LowWatermark implements IgniteComponent { * @param vaultManager Vault manager. * @param failureProcessor Failure processor tha is used to handle critical errors. */ - public LowWatermark( + public LowWatermarkImpl( String nodeName, LowWatermarkConfiguration lowWatermarkConfig, HybridClock clock, @@ -183,9 +183,7 @@ public class LowWatermark implements IgniteComponent { IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, TimeUnit.SECONDS); } - /** - * Returns the current low watermark, {@code null} means no low watermark has been assigned yet. - */ + @Override public @Nullable HybridTimestamp getLowWatermark() { return lowWatermark; } @@ -221,10 +219,12 @@ public class LowWatermark implements IgniteComponent { }); } + @Override public void addUpdateListener(LowWatermarkChangedListener listener) { updateListeners.add(listener); } + @Override public void removeUpdateListener(LowWatermarkChangedListener listener) { updateListeners.remove(listener); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 5b1820fc91..78e753ea5b 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableMap; +import static java.util.Comparator.comparingInt; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.anyOf; import static java.util.concurrent.CompletableFuture.completedFuture; @@ -71,6 +72,8 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -104,6 +107,7 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters; +import org.apache.ignite.internal.catalog.events.DropTableEventParameters; import org.apache.ignite.internal.catalog.events.RenameTableEventParameters; import org.apache.ignite.internal.causality.CompletionListener; import org.apache.ignite.internal.causality.IncrementalVersionedValue; @@ -288,6 +292,10 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** Started tables. */ private final Map<Integer, TableImpl> startedTables = new ConcurrentHashMap<>(); + /** Deferred destruction queue. */ + private final Queue<DestroyTableEventParameters> deferredQueue = + new PriorityQueue<>(comparingInt(DestroyTableEventParameters::catalogVersion)); + /** Local partitions. */ private final Map<Integer, PartitionSet> localPartsByTableId = new ConcurrentHashMap<>(); @@ -557,7 +565,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { long recoveryRevision = recoveryFinishFuture.join(); - startTables(recoveryRevision); + startTables(recoveryRevision, lowWatermark.getLowWatermark()); processAssignmentsOnRecovery(recoveryRevision); @@ -565,13 +573,10 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), stableAssignmentsRebalanceListener); metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX), assignmentsSwitchRebalanceListener); - catalogService.listen(CatalogEvent.TABLE_CREATE, parameters -> { - return onTableCreate((CreateTableEventParameters) parameters).thenApply(unused -> false); - }); - - catalogService.listen(CatalogEvent.TABLE_DESTROY, parameters -> { - return onTableDestroy(((DestroyTableEventParameters) parameters)).thenApply(unused -> false); - }); + catalogService.listen(CatalogEvent.TABLE_CREATE, parameters -> onTableCreate((CreateTableEventParameters) parameters)); + catalogService.listen(CatalogEvent.TABLE_DROP, parameters -> onTableDrop(((DropTableEventParameters) parameters))); + catalogService.listen(CatalogEvent.TABLE_DESTROY, parameters -> onTableDestroy(((DestroyTableEventParameters) parameters))); + lowWatermark.addUpdateListener(this::onLwmChanged); catalogService.listen(CatalogEvent.TABLE_ALTER, parameters -> { if (parameters instanceof RenameTableEventParameters) { @@ -640,8 +645,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } } - private CompletableFuture<?> onTableCreate(CreateTableEventParameters parameters) { - return createTableLocally(parameters.causalityToken(), parameters.catalogVersion(), parameters.tableDescriptor()); + private CompletableFuture<Boolean> onTableCreate(CreateTableEventParameters parameters) { + return createTableLocally(parameters.causalityToken(), parameters.catalogVersion(), parameters.tableDescriptor()) + .thenApply(unused -> false); } /** @@ -715,14 +721,49 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { }); } - private CompletableFuture<Void> onTableDestroy(DestroyTableEventParameters parameters) { + private CompletableFuture<Boolean> onTableDrop(DropTableEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { - dropTableLocally(parameters.causalityToken(), parameters); + int tableId = parameters.tableId(); + int version = parameters.catalogVersion() - 1; - return nullCompletedFuture(); + CatalogTableDescriptor tableDescriptor = catalogService.table(tableId, version); + assert tableDescriptor != null; + + CatalogZoneDescriptor zoneDescriptor = catalogService.zone(tableDescriptor.zoneId(), version); + assert zoneDescriptor != null; + + int partitions = zoneDescriptor.partitions(); + + synchronized (deferredQueue) { + deferredQueue.offer(new DestroyTableEventParameters(-1L, version, tableId, partitions)); + } + + return falseCompletedFuture(); }); } + private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) { + return runAsync(() -> { + int earliestVersion = catalogService.activeCatalogVersion(ts.longValue()); + + // TODO: any hint if there is smth to clean ??? + + synchronized ((deferredQueue)) { + DestroyTableEventParameters next; + + while ((next = deferredQueue.peek()) != null && next.catalogVersion() < earliestVersion) { + next = deferredQueue.poll(); + + onTableDestroy(next); + } + } + }, ioExecutor); + } + + private CompletableFuture<Boolean> onTableDestroy(DestroyTableEventParameters parameters) { + return inBusyLockAsync(busyLock, () -> destroyTableLocally(parameters).thenApply(unused -> false)); + } + private CompletableFuture<?> onTableRename(RenameTableEventParameters parameters) { return inBusyLockAsync(busyLock, () -> tablesVv.update( parameters.causalityToken(), @@ -1349,63 +1390,44 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** * Drops local structures for a table. * - * @param causalityToken Causality token. * @param parameters Destroy table event parameters. */ - private void dropTableLocally(long causalityToken, DestroyTableEventParameters parameters) { + private CompletableFuture<Void> destroyTableLocally(DestroyTableEventParameters parameters) { int tableId = parameters.tableId(); - // TODO Drop partitions from parameters and use from storage. - int partitions = parameters.partitions(); - localPartitionsVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(e); - } + TableImpl table = startedTables.remove(tableId); + localPartsByTableId.remove(tableId); - localPartsByTableId.remove(tableId); + assert table != null; - return nullCompletedFuture(); - })); - - tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(e); - } - - TableImpl table = tables.get(tableId); - - assert table != null : tableId; - - InternalTable internalTable = table.internalTable(); - - CompletableFuture<?>[] stopReplicaFutures = new CompletableFuture<?>[partitions]; - - // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions should be stopped on the assignments change - // event triggered by zone drop or alter. Stop replica asynchronously, out of metastorage event pipeline. - for (int partitionId = 0; partitionId < partitions; partitionId++) { - var replicationGroupId = new TablePartitionId(tableId, partitionId); - - stopReplicaFutures[partitionId] = stopPartition(replicationGroupId, table); - } - - // TODO: IGNITE-18703 Destroy raft log and meta - return allOf(stopReplicaFutures) - .thenComposeAsync( - unused -> allOf( - internalTable.storage().destroy(), - runAsync(() -> internalTable.txStateStorage().destroy(), ioExecutor) - ), - ioExecutor - ).thenAccept(ignore0 -> tables.remove(tableId)); - })); - - startedTables.remove(tableId); + InternalTable internalTable = table.internalTable(); + int partitions = internalTable.partitions(); + // TODO https://issues.apache.org/jira/browse/IGNITE-18991 Move assigment manipulations to Distribution zones. Set<ByteArray> assignmentKeys = IntStream.range(0, partitions) .mapToObj(p -> stablePartAssignmentsKey(new TablePartitionId(tableId, p))) .collect(toSet()); - metaStorageMgr.removeAll(assignmentKeys); + + CompletableFuture<?>[] stopReplicaFutures = new CompletableFuture<?>[partitions]; + + // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions should be stopped on the assignments change + // event triggered by zone drop or alter. Stop replica asynchronously, out of metastorage event pipeline. + for (int partitionId = 0; partitionId < partitions; partitionId++) { + var replicationGroupId = new TablePartitionId(tableId, partitionId); + + stopReplicaFutures[partitionId] = stopPartition(replicationGroupId, table); + } + + // TODO: IGNITE-18703 Destroy raft log and meta + return allOf(stopReplicaFutures) + .thenComposeAsync( + unused -> allOf( + internalTable.storage().destroy(), + runAsync(() -> internalTable.txStateStorage().destroy(), ioExecutor) + ), + ioExecutor + ).thenAccept(ignore0 -> tables.remove(tableId)); } @Override @@ -2336,11 +2358,13 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return tables.stream().filter(table -> table.name().equals(name)).findAny().orElse(null); } - private void startTables(long recoveryRevision) { + private void startTables(long recoveryRevision, @Nullable HybridTimestamp lwm) { sharedTxStateStorage.start(); - int earliestCatalogVersion = catalogService.earliestCatalogVersion(); int latestCatalogVersion = catalogService.latestCatalogVersion(); + int earliestCatalogVersion = (lwm == null) + ? catalogService.earliestCatalogVersion() + : catalogService.activeCatalogVersion(lwm.longValue()); var startedTables = new IntOpenHashSet(); List<CompletableFuture<?>> startTableFutures = new ArrayList<>(); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java index c35e203fdb..a21ab858b7 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.table.distributed; -import static org.apache.ignite.internal.table.distributed.LowWatermark.LOW_WATERMARK_VAULT_KEY; +import static org.apache.ignite.internal.table.distributed.LowWatermarkImpl.LOW_WATERMARK_VAULT_KEY; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.hamcrest.MatcherAssert.assertThat; @@ -58,7 +58,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InOrder; /** - * For {@link LowWatermark} testing. + * For {@link LowWatermarkImpl} testing. */ @ExtendWith(ConfigurationExtension.class) public class LowWatermarkTest extends BaseIgniteAbstractTest { @@ -73,14 +73,14 @@ public class LowWatermarkTest extends BaseIgniteAbstractTest { private LowWatermarkChangedListener listener; - private LowWatermark lowWatermark; + private LowWatermarkImpl lowWatermark; @BeforeEach void setUp() { listener = mock(LowWatermarkChangedListener.class); when(listener.onLwmChanged(any(HybridTimestamp.class))).thenReturn(nullCompletedFuture()); - lowWatermark = new LowWatermark("test", lowWatermarkConfig, clock, txManager, vaultManager, mock(FailureProcessor.class)); + lowWatermark = new LowWatermarkImpl("test", lowWatermarkConfig, clock, txManager, vaultManager, mock(FailureProcessor.class)); lowWatermark.addUpdateListener(listener); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index 7b041ea1a6..3f277f204d 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -118,6 +118,7 @@ import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorage import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration; import org.apache.ignite.internal.table.TableTestUtils; import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.TestLowWatermark; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; import org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService; import org.apache.ignite.internal.testframework.IgniteAbstractTest; @@ -129,7 +130,6 @@ import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; import org.apache.ignite.internal.util.CursorUtils; import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.TopologyService; @@ -243,8 +243,11 @@ public class TableManagerTest extends IgniteAbstractTest { private ExecutorService partitionOperationsExecutor; + private TestLowWatermark lowWatermark; + @BeforeEach void before() throws NodeStoppingException { + lowWatermark = new TestLowWatermark(); catalogMetastore = StandaloneMetaStorageManager.create(new SimpleInMemoryKeyValueStorage(NODE_NAME)); catalogManager = CatalogTestUtils.createTestCatalogManager(NODE_NAME, clock, catalogMetastore); @@ -356,7 +359,7 @@ public class TableManagerTest extends IgniteAbstractTest { verify(txStateTableStorage, atMost(0)).destroy(); verify(replicaMgr, atMost(0)).stopReplica(any()); - assertTrue(CatalogTestUtils.waitCatalogCompaction(catalogManager, Long.MAX_VALUE)); + assertThat(fireDestroyEvent(), willCompleteSuccessfully()); verify(mvTableStorage, timeout(TimeUnit.SECONDS.toMillis(10))).destroy(); verify(txStateTableStorage, timeout(TimeUnit.SECONDS.toMillis(10))).destroy(); @@ -744,8 +747,6 @@ public class TableManagerTest extends IgniteAbstractTest { */ private TableManager createTableManager(CompletableFuture<TableManager> tblManagerFut, Consumer<MvTableStorage> tableStorageDecorator, Consumer<TxStateTableStorage> txStateTableStorageDecorator) { - VaultManager vaultManager = mock(VaultManager.class); - TableManager tableManager = new TableManager( NODE_NAME, revisionUpdater, @@ -777,7 +778,7 @@ public class TableManagerTest extends IgniteAbstractTest { () -> mock(IgniteSql.class), new RemotelyTriggeredResourceRegistry(), mock(ScheduledExecutorService.class), - new LowWatermark(NODE_NAME, gcConfig.lowWatermark(), clock, tm, vaultManager, mock(FailureProcessor.class)) + lowWatermark ) { @Override @@ -866,4 +867,8 @@ public class TableManagerTest extends IgniteAbstractTest { private Collection<CatalogTableDescriptor> allTableDescriptors() { return catalogManager.tables(catalogManager.latestCatalogVersion()); } + + private CompletableFuture<Void> fireDestroyEvent() { + return lowWatermark.updateAndNotify(clock.now()); + } } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java new file mode 100644 index 0000000000..320c776ffe --- /dev/null +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.table.distributed.LowWatermark; +import org.apache.ignite.internal.table.distributed.LowWatermarkChangedListener; +import org.jetbrains.annotations.Nullable; + +/** + * Low watermark dummy implementation, which requires explicit {@link #updateAndNotify(HybridTimestamp)} method call to notify listeners. + * This implementation has no persistent state and notifies listeners instantly in same thread. + */ +public class TestLowWatermark implements LowWatermark { + private final List<LowWatermarkChangedListener> listeners = new CopyOnWriteArrayList<>(); + private volatile HybridTimestamp ts; + + @Override + public @Nullable HybridTimestamp getLowWatermark() { + return ts; + } + + @Override + public void addUpdateListener(LowWatermarkChangedListener listener) { + this.listeners.add(listener); + } + + @Override + public void removeUpdateListener(LowWatermarkChangedListener listener) { + this.listeners.remove(listener); + } + + /** + * Update low watermark and notify listeners. + * + * @param newTs New timestamp. + * @return Listener notification future. + */ + public CompletableFuture<Void> updateAndNotify(HybridTimestamp newTs) { + assert ts == null || ts.longValue() < newTs.longValue(); + + this.ts = newTs; + + return CompletableFuture.allOf(listeners.stream().map(l -> l.onLwmChanged(newTs)).toArray(CompletableFuture[]::new)); + } + + /** Set low watermark without listeners notification. */ + public void update(HybridTimestamp newTs) { + this.ts = newTs; + } +}