This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-20121 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 47414442113519484804a3dd4841551edbe9df98 Author: amashenkov <andrey.mashen...@gmail.com> AuthorDate: Mon Feb 19 17:24:26 2024 +0300 IGNITE-20680 --- .../catalog/commands/RemoveIndexCommand.java | 7 ++- .../commands/RemoveIndexCommandValidationTest.java | 9 ++++ .../handler/ClientPrimaryReplicaTracker.java | 25 +++------ .../CausalityDataNodesEngine.java | 4 +- .../ignite/internal/index/ItBuildIndexTest.java | 2 +- .../runner/app/PlatformTestNodeRunner.java | 3 ++ .../schemasync/ItSchemaSyncAndReplicationTest.java | 4 +- .../ignite/internal/schema/SchemaManager.java | 10 +++- .../ignite/internal/schema/SchemaManagerTest.java | 17 ++++--- .../internal/sql/engine/ItCreateTableDdlTest.java | 24 --------- .../internal/sql/sqllogic/ItSqlLogicTest.java | 11 ++++ .../engine/exec/ExecutableTableRegistryImpl.java | 6 +-- .../exec/ExecutableTableRegistrySelfTest.java | 2 +- .../apache/ignite/internal/table/TableImpl.java | 2 +- .../internal/table/distributed/TableManager.java | 59 ++++++++++------------ .../table/distributed/TableManagerTest.java | 19 ++++--- 16 files changed, 105 insertions(+), 99 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommand.java index 2e34f847b7..ef547d58a5 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommand.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommand.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.catalog.commands; -import static org.apache.ignite.internal.catalog.commands.CatalogUtils.indexOrThrow; import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING; import java.util.List; @@ -58,7 +57,11 @@ public class RemoveIndexCommand implements CatalogCommand { @Override public List<UpdateEntry> get(Catalog catalog) { - CatalogIndexDescriptor index = indexOrThrow(catalog, indexId); + CatalogIndexDescriptor index = catalog.index(indexId); + + if (index == null) { + return List.of(); + } if (index.status() != STOPPING) { throw new CatalogValidationException("Cannot remove index {} because its status is {}", indexId, index.status()); diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommandValidationTest.java index 2d37e6a21b..9cce80879e 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommandValidationTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommandValidationTest.java @@ -22,6 +22,8 @@ import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus. import org.apache.ignite.internal.catalog.CatalogCommand; import org.apache.ignite.internal.catalog.CatalogValidationException; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; /** Tests to verify validation of {@link RemoveIndexCommand}. */ public class RemoveIndexCommandValidationTest extends AbstractChangeIndexStatusCommandValidationTest { @@ -44,4 +46,11 @@ public class RemoveIndexCommandValidationTest extends AbstractChangeIndexStatusC String expectedExceptionMessageSubstringForWrongStatus() { return "Cannot remove index"; } + + @Disabled("https://issues.apache.org/jira/browse/IGNITE-20121") + @Test + @Override + void exceptionIsThrownIfIndexWithGivenIdNotFound() { + super.exceptionIsThrownIfIndexWithGivenIdNotFound(); + } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java index a6a7b1d335..0fc1407e39 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java @@ -32,7 +32,7 @@ import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; -import org.apache.ignite.internal.catalog.events.DropTableEventParameters; +import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters; import org.apache.ignite.internal.event.EventListener; import org.apache.ignite.internal.event.EventParameters; import org.apache.ignite.internal.hlc.HybridClock; @@ -250,7 +250,7 @@ public class ClientPrimaryReplicaTracker implements EventListener<EventParameter maxStartTime.set(clock.nowLong()); placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, (EventListener) this); - catalogService.listen(CatalogEvent.TABLE_DROP, (EventListener) this); + catalogService.listen(CatalogEvent.TABLE_DESTROY, (EventListener) this); } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -261,7 +261,7 @@ public class ClientPrimaryReplicaTracker implements EventListener<EventParameter busyLock.block(); - catalogService.removeListener(CatalogEvent.TABLE_DROP, (EventListener) this); + catalogService.removeListener(CatalogEvent.TABLE_DESTROY, (EventListener) this); placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, (EventListener) this); primaryReplicas.clear(); } @@ -280,8 +280,8 @@ public class ClientPrimaryReplicaTracker implements EventListener<EventParameter } private CompletableFuture<Boolean> notifyInternal(EventParameters parameters) { - if (parameters instanceof DropTableEventParameters) { - removeTable((DropTableEventParameters) parameters); + if (parameters instanceof DestroyTableEventParameters) { + removeTable((DestroyTableEventParameters) parameters); return falseCompletedFuture(); } @@ -304,18 +304,9 @@ public class ClientPrimaryReplicaTracker implements EventListener<EventParameter return falseCompletedFuture(); // false: don't remove listener. } - private void removeTable(DropTableEventParameters dropTableEvent) { - // Use previous version of the catalog to get the dropped table. - int prevCatalogVersion = dropTableEvent.catalogVersion() - 1; - - CatalogTableDescriptor table = catalogService.table(dropTableEvent.tableId(), prevCatalogVersion); - assert table != null : "Table from DropTableEventParameters not found: " + dropTableEvent.tableId(); - - CatalogZoneDescriptor zone = catalogService.zone(table.zoneId(), prevCatalogVersion); - assert zone != null : "Zone from DropTableEventParameters not found: " + table.zoneId(); - - for (int partition = 0; partition < zone.partitions(); partition++) { - TablePartitionId tablePartitionId = new TablePartitionId(dropTableEvent.tableId(), partition); + private void removeTable(DestroyTableEventParameters event) { + for (int partition = 0; partition < event.partitions(); partition++) { + TablePartitionId tablePartitionId = new TablePartitionId(event.tableId(), partition); primaryReplicas.remove(tablePartitionId); } } diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java index c77539256e..40bd117016 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java @@ -319,8 +319,8 @@ public class CausalityDataNodesEngine { CatalogZoneDescriptor entryNewerCfg = null; // Iterate over zone configurations from newest to oldest. - for (int i = catalogVersion; i >= 0; i--) { - CatalogZoneDescriptor entryOlderCfg = catalogManager.catalog(i).zone(zoneId); + for (int i = catalogVersion; i >= catalogManager.earliestCatalogVersion(); i--) { + CatalogZoneDescriptor entryOlderCfg = catalogManager.zone(zoneId, i); if (entryOlderCfg == null) { break; diff --git a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java index 58220a4e66..fb0dc064dd 100644 --- a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java +++ b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java @@ -281,7 +281,7 @@ public class ItBuildIndexTest extends BaseSqlIntegrationTest { } private static void checkIndexBuild(int partitions, int replicas, String indexName) throws Exception { - // TODO: IGNITE-19150 We are waiting for schema synchronization to avoid races to create and destroy indexes + // TODO: IGNITE-20121 We are waiting for schema synchronization to avoid races to create and destroy indexes Map<Integer, List<Ignite>> nodesWithBuiltIndexesByPartitionId = waitForIndexBuild(TABLE_NAME, indexName); // Check that the number of nodes with built indexes is equal to the number of replicas. diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java index 14627286b1..e54442ccf6 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java @@ -63,6 +63,7 @@ import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobExecutionContext; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.binarytuple.BinaryTupleReader; +import org.apache.ignite.internal.catalog.CatalogManagerImpl; import org.apache.ignite.internal.catalog.commands.ColumnParams; import org.apache.ignite.internal.catalog.commands.DefaultValue; import org.apache.ignite.internal.client.proto.ColumnTypeConverter; @@ -567,6 +568,8 @@ public class PlatformTestNodeRunner { session.execute(null, "DROP TABLE " + tableName + ""); } + ((CatalogManagerImpl) ((IgniteImpl) context.ignite()).catalogManager()).compactCatalog(Long.MAX_VALUE); + return tableName; } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java index b1fbca2662..7a08ee79b3 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java @@ -149,8 +149,8 @@ class ItSchemaSyncAndReplicationTest extends ClusterPerTestIntegrationTest { } private static MvPartitionStorage solePartitionStorage(IgniteImpl node) { - // We use this api because there is no waiting for schemas to sync. - TableViewInternal table = ((TableManager) node.tables()).getTable(TABLE_NAME); + TableViewInternal table = ((TableManager) node.tables()).startedTables() + .stream().filter(t -> TABLE_NAME.equals(t.name())).findAny().orElse(null); assertNotNull(table); diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java index e8df5e5700..3855b3fe7f 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CatalogEventParameters; import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; +import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters; import org.apache.ignite.internal.catalog.events.TableEventParameters; import org.apache.ignite.internal.causality.IncrementalVersionedValue; import org.apache.ignite.internal.lang.IgniteInternalException; @@ -85,6 +86,7 @@ public class SchemaManager implements IgniteComponent { public CompletableFuture<Void> start() { catalogService.listen(CatalogEvent.TABLE_CREATE, this::onTableCreated); catalogService.listen(CatalogEvent.TABLE_ALTER, this::onTableAltered); + catalogService.listen(CatalogEvent.TABLE_DESTROY, this::onTableDestroyed); registerExistingTables(); @@ -185,6 +187,12 @@ public class SchemaManager implements IgniteComponent { } } + private CompletableFuture<Boolean> onTableDestroyed(CatalogEventParameters event) { + DestroyTableEventParameters creationEvent = (DestroyTableEventParameters) event; + + return dropRegistry(creationEvent.causalityToken(), creationEvent.tableId()).thenApply(ignored -> false); + } + private void setColumnMapping(SchemaDescriptor schema, int tableId) throws ExecutionException, InterruptedException { if (schema.version() == CatalogTableDescriptor.INITIAL_TABLE_VERSION) { return; @@ -325,7 +333,7 @@ public class SchemaManager implements IgniteComponent { * @param causalityToken Causality token. * @param tableId Table id. */ - public CompletableFuture<?> dropRegistry(long causalityToken, int tableId) { + private CompletableFuture<?> dropRegistry(long causalityToken, int tableId) { if (!busyLock.enterBusy()) { throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException()); } diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java index fd155536fc..83013878a5 100644 --- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java +++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.catalog.events.AddColumnEventParameters; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CatalogEventParameters; import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; +import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters; import org.apache.ignite.internal.event.EventListener; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager; @@ -102,7 +103,7 @@ class SchemaManagerTest extends BaseIgniteAbstractTest { doNothing().when(catalogService).listen(eq(CatalogEvent.TABLE_CREATE), tableCreatedListener.capture()); doNothing().when(catalogService).listen(eq(CatalogEvent.TABLE_ALTER), tableAlteredListener.capture()); - doNothing().when(catalogService).listen(eq(CatalogEvent.TABLE_DROP), tableDestroyedListener.capture()); + doNothing().when(catalogService).listen(eq(CatalogEvent.TABLE_DESTROY), tableDestroyedListener.capture()); schemaManager = new SchemaManager(registry, catalogService, metaStorageManager); schemaManager.start(); @@ -254,16 +255,20 @@ class SchemaManagerTest extends BaseIgniteAbstractTest { void destroyTableMakesRegistryUnavailable() { createSomeTable(); - assertThat(schemaManager.dropRegistry(CAUSALITY_TOKEN_2, TABLE_ID), willCompleteSuccessfully()); + DestroyTableEventParameters event = new DestroyTableEventParameters( + CAUSALITY_TOKEN_2, + CATALOG_VERSION_2, + TABLE_ID, + 1 + ); + + assertThat(tableDestroyedListener().notify(event), willBe(false)); completeCausalityToken(CAUSALITY_TOKEN_2); CompletableFuture<SchemaRegistry> future = schemaManager.schemaRegistry(CAUSALITY_TOKEN_2, TABLE_ID); assertThat(future, is(completedFuture())); - - SchemaRegistry schemaRegistry = future.join(); - - assertThat(schemaRegistry, is(nullValue())); + assertThat(future, willBe(nullValue())); } @Test diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java index 4ffbe37093..1df8ae00a1 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java @@ -21,7 +21,6 @@ import static org.apache.ignite.internal.catalog.commands.CatalogUtils.SYSTEM_SC import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException; import static org.apache.ignite.internal.table.TableTestUtils.getTableStrict; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; @@ -31,8 +30,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.internal.app.IgniteImpl; @@ -47,7 +44,6 @@ import org.apache.ignite.internal.type.NativeType; import org.apache.ignite.internal.type.NativeTypeSpec; import org.apache.ignite.lang.ErrorGroups.Sql; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -340,24 +336,4 @@ public class ItCreateTableDdlTest extends BaseSqlIntegrationTest { private static Stream<Arguments> reservedSchemaNames() { return SYSTEM_SCHEMAS.stream().map(Arguments::of); } - - @Disabled("https://issues.apache.org/jira/browse/IGNITE-20680") - @Test - public void concurrentDrop() { - sql("CREATE TABLE test (key INT PRIMARY KEY)"); - - var stopFlag = new AtomicBoolean(); - - CompletableFuture<Void> selectFuture = CompletableFuture.runAsync(() -> { - while (!stopFlag.get()) { - sql("SELECT COUNT(*) FROM test"); - } - }); - - sql("DROP TABLE test"); - - stopFlag.set(true); - - assertThat(selectFuture, willCompleteSuccessfully()); - } } diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java index 581eba0f22..3f537bc816 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java @@ -42,6 +42,8 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgnitionManager; import org.apache.ignite.InitParameters; import org.apache.ignite.internal.IgniteIntegrationTest; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.catalog.CatalogManagerImpl; import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.lang.IgniteSystemProperties; import org.apache.ignite.internal.logger.IgniteLogger; @@ -291,9 +293,18 @@ public class ItSqlLogicTest extends IgniteIntegrationTest { } } } + + compactCatalog(CLUSTER_NODES.get(0)); } } + private static void compactCatalog(Ignite node) { + IgniteImpl node0 = (IgniteImpl) node; + CatalogManagerImpl catalogManager = (CatalogManagerImpl) node0.catalogManager(); + + catalogManager.compactCatalog(Long.MAX_VALUE); + } + private static void config() { SqlLogicTestEnvironment env = ItSqlLogicTest.class.getAnnotation(SqlLogicTestEnvironment.class); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java index 4f99004ab0..cf335d8854 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java @@ -74,15 +74,13 @@ public class ExecutableTableRegistryImpl implements ExecutableTableRegistry { return tableCache.computeIfAbsent(cacheKey(tableId, sqlTable.version()), (k) -> loadTable(sqlTable)); } + //TODO Create a ticket to get rid of future because physical table must exists if we found catalog version, where table is alive. private CompletableFuture<ExecutableTable> loadTable(IgniteTable sqlTable) { - return tableManager.tableAsync(sqlTable.id()) + return CompletableFuture.completedFuture(tableManager.getTable(sqlTable.id())) .thenApply((table) -> { TableDescriptor tableDescriptor = sqlTable.descriptor(); SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(sqlTable.id()); - // TODO Can be removed after https://issues.apache.org/jira/browse/IGNITE-20680 - assert schemaRegistry != null : "SchemaRegistry does not exist: " + sqlTable.id(); - SchemaDescriptor schemaDescriptor = schemaRegistry.schema(sqlTable.version()); TableRowConverterFactory converterFactory = new TableRowConverterFactoryImpl( sqlTable.keyColumns(), schemaRegistry, schemaDescriptor diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java index f2cecfab0f..cd16f076d4 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java @@ -144,7 +144,7 @@ public class ExecutableTableRegistrySelfTest extends BaseIgniteAbstractTest { SchemaDescriptor schemaDescriptor = newDescriptor(schemaVersion); - when(tableManager.tableAsync(tableId)).thenReturn(CompletableFuture.completedFuture(table)); + when(tableManager.getTable(tableId)).thenReturn(table); when(schemaManager.schemaRegistry(tableId)).thenReturn(schemaRegistry); when(schemaRegistry.schema(tableVersion)).thenReturn(schemaDescriptor); when(descriptor.iterator()).thenReturn(Collections.emptyIterator()); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java index 6a9c6f631d..15c16db8c0 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java @@ -320,7 +320,7 @@ public class TableImpl implements TableViewInternal { completeWaitIndex(indexId); - // TODO: IGNITE-19150 Also need to destroy the index storages + // TODO: IGNITE-20121 Also need to destroy the index storages } private void awaitIndexes() { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 204d251380..c22530e54b 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -54,6 +54,7 @@ import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermin import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; @@ -96,7 +97,7 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; -import org.apache.ignite.internal.catalog.events.DropTableEventParameters; +import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters; import org.apache.ignite.internal.catalog.events.RenameTableEventParameters; import org.apache.ignite.internal.causality.CompletionListener; import org.apache.ignite.internal.causality.IncrementalVersionedValue; @@ -568,8 +569,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return onTableCreate((CreateTableEventParameters) parameters).thenApply(unused -> false); }); - catalogService.listen(CatalogEvent.TABLE_DROP, parameters -> { - return onTableDelete(((DropTableEventParameters) parameters)).thenApply(unused -> false); + catalogService.listen(CatalogEvent.TABLE_DESTROY, parameters -> { + return onTableDestroy(((DestroyTableEventParameters) parameters)).thenApply(unused -> false); }); catalogService.listen(CatalogEvent.TABLE_ALTER, parameters -> { @@ -714,17 +715,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { }); } - private CompletableFuture<Void> onTableDelete(DropTableEventParameters parameters) { + private CompletableFuture<Void> onTableDestroy(DestroyTableEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { - long causalityToken = parameters.causalityToken(); - int catalogVersion = parameters.catalogVersion(); - - int tableId = parameters.tableId(); - - CatalogTableDescriptor tableDescriptor = getTableDescriptor(tableId, catalogVersion - 1); - CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion - 1); - - dropTableLocally(causalityToken, tableDescriptor, zoneDescriptor); + dropTableLocally(parameters.causalityToken(), parameters); return nullCompletedFuture(); }); @@ -1393,12 +1386,11 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { * Drops local structures for a table. * * @param causalityToken Causality token. - * @param tableDescriptor Catalog table descriptor. - * @param zoneDescriptor Catalog distributed zone descriptor. + * @param parameters Destroy table event parameters. */ - private void dropTableLocally(long causalityToken, CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor) { - int tableId = tableDescriptor.id(); - int partitions = zoneDescriptor.partitions(); + private void dropTableLocally(long causalityToken, DestroyTableEventParameters parameters) { + int tableId = parameters.tableId(); + int partitions = parameters.partitions(); localPartsByTableIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> { if (e != null) { @@ -1445,8 +1437,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { ).thenApply(v -> map); })); - schemaManager.dropRegistry(causalityToken, tableId); - startedTables.remove(tableId); Set<ByteArray> assignmentKeys = IntStream.range(0, partitions) @@ -2339,22 +2329,24 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } /** - * Returns a table instance if it exists, {@code null} otherwise. + * Returns started table instance by given id. * * @param tableId Table id. */ - public @Nullable TableViewInternal getTable(int tableId) { - return startedTables.get(tableId); + public TableViewInternal getTable(int tableId) { + TableImpl table = startedTables.get(tableId); + + assert table != null : tableId; + + return table; } /** - * Returns a table instance if it exists, {@code null} otherwise. - * - * @param name Table name. + * Returns started tables instances. */ @TestOnly - public @Nullable TableViewInternal getTable(String name) { - return findTableImplByName(startedTables.values(), name); + public Collection<TableViewInternal> startedTables() { + return Collections.unmodifiableCollection(startedTables.values()); } private CatalogTableDescriptor getTableDescriptor(int tableId, int catalogVersion) { @@ -2381,13 +2373,18 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private void startTables(long recoveryRevision) { sharedTxStateStorage.start(); - int catalogVersion = catalogService.latestCatalogVersion(); + int earliestCatalogVersion = catalogService.earliestCatalogVersion(); + int latestCatalogVersion = catalogService.latestCatalogVersion(); + var startedTables = new IntOpenHashSet(); List<CompletableFuture<?>> startTableFutures = new ArrayList<>(); // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones from volt and metastore - for (CatalogTableDescriptor tableDescriptor : catalogService.tables(catalogVersion)) { - startTableFutures.add(createTableLocally(recoveryRevision, catalogVersion, tableDescriptor, true)); + for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion; ver--) { + int ver0 = ver; + catalogService.tables(ver).stream() + .filter(tbl -> startedTables.add(tbl.id())) + .forEach(tableDescriptor -> startTableFutures.add(createTableLocally(recoveryRevision, ver0, tableDescriptor, true))); } // Forces you to wait until recovery is complete before the metastore watches is deployed to avoid races with catalog listeners. diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index 2953e4b096..6f8ecba8c6 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -45,6 +45,7 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -344,13 +345,18 @@ public class TableManagerTest extends IgniteAbstractTest { dropTable(DYNAMIC_TABLE_FOR_DROP_NAME); - verify(mvTableStorage, timeout(TimeUnit.SECONDS.toMillis(10))).destroy(); - verify(txStateTableStorage, timeout(TimeUnit.SECONDS.toMillis(10))).destroy(); - verify(replicaMgr, times(PARTITIONS)).stopReplica(any()); - assertNull(tableManager.table(DYNAMIC_TABLE_FOR_DROP_NAME)); - assertEquals(0, tableManager.tables().size()); + + verify(mvTableStorage, atMost(0)).destroy(); + verify(txStateTableStorage, atMost(0)).destroy(); + verify(replicaMgr, atMost(0)).stopReplica(any()); + + assertTrue(CatalogTestUtils.waitCatalogCompaction(catalogManager, Long.MAX_VALUE)); + + verify(mvTableStorage, timeout(TimeUnit.SECONDS.toMillis(10))).destroy(); + verify(txStateTableStorage, timeout(TimeUnit.SECONDS.toMillis(10))).destroy(); + verify(replicaMgr, timeout(TimeUnit.SECONDS.toMillis(10)).times(PARTITIONS)).stopReplica(any()); } /** @@ -378,8 +384,7 @@ public class TableManagerTest extends IgniteAbstractTest { assertNotNull(table); assertNotEquals(oldTableId, table.tableId()); - // TODO IGNITE-20680 ensure old table is available - // assertNotNull(tableManager.getTable(oldTableId)); + assertNotNull(tableManager.getTable(oldTableId)); assertNotNull(tableManager.getTable(table.tableId())); assertNotSame(tableManager.getTable(oldTableId), tableManager.getTable(table.tableId())); }