This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new f8b9bd1c56 IGNITE-21396: NPE SqlQueryProcessor during concurrent SELECT and DROP TABLE. (#3131) f8b9bd1c56 is described below commit f8b9bd1c560f996af78cf49a09e2e1505d6138a4 Author: Max Zhuravkov <shh...@gmail.com> AuthorDate: Fri Feb 2 15:18:55 2024 +0200 IGNITE-21396: NPE SqlQueryProcessor during concurrent SELECT and DROP TABLE. (#3131) --- .../internal/sql/engine/ItCreateTableDdlTest.java | 24 ++++++++++ .../internal/sql/engine/SqlQueryProcessor.java | 20 ++------ .../engine/exec/ExecutableTableRegistryImpl.java | 3 ++ .../internal/sql/engine/schema/IgniteTable.java | 7 +++ .../sql/engine/schema/IgniteTableImpl.java | 11 ++++- .../sql/engine/schema/SqlSchemaManagerImpl.java | 27 ++++++++--- .../exec/ExecutableTableRegistrySelfTest.java | 2 +- .../internal/sql/engine/framework/TestTable.java | 2 +- .../sql/engine/prepare/TypeCoercionTest.java | 5 ++ .../engine/schema/SqlSchemaManagerImplTest.java | 56 +++++++++++++++++++++- 10 files changed, 131 insertions(+), 26 deletions(-) diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java index 1df8ae00a1..4ffbe37093 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java @@ -21,6 +21,7 @@ import static org.apache.ignite.internal.catalog.commands.CatalogUtils.SYSTEM_SC import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException; import static org.apache.ignite.internal.table.TableTestUtils.getTableStrict; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; @@ -30,6 +31,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.internal.app.IgniteImpl; @@ -44,6 +47,7 @@ import org.apache.ignite.internal.type.NativeType; import org.apache.ignite.internal.type.NativeTypeSpec; import org.apache.ignite.lang.ErrorGroups.Sql; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -336,4 +340,24 @@ public class ItCreateTableDdlTest extends BaseSqlIntegrationTest { private static Stream<Arguments> reservedSchemaNames() { return SYSTEM_SCHEMAS.stream().map(Arguments::of); } + + @Disabled("https://issues.apache.org/jira/browse/IGNITE-20680") + @Test + public void concurrentDrop() { + sql("CREATE TABLE test (key INT PRIMARY KEY)"); + + var stopFlag = new AtomicBoolean(); + + CompletableFuture<Void> selectFuture = CompletableFuture.runAsync(() -> { + while (!stopFlag.get()) { + sql("SELECT COUNT(*) FROM test"); + } + }); + + sql("DROP TABLE test"); + + stopFlag.set(true); + + assertThat(selectFuture, willCompleteSuccessfully()); + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 01db78c4c4..4187870568 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -37,7 +37,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.UUID; @@ -54,10 +53,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.tools.Frameworks; -import org.apache.ignite.internal.catalog.Catalog; import org.apache.ignite.internal.catalog.CatalogManager; -import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; -import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -310,7 +306,7 @@ public class SqlQueryProcessor implements QueryProcessor { var executionTargetProvider = new ExecutionTargetProvider() { @Override public CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory factory, IgniteTable table) { - return primaryReplicas(table.id()) + return primaryReplicas(table) .thenApply(replicas -> factory.partitioned(replicas)); } @@ -371,16 +367,8 @@ public class SqlQueryProcessor implements QueryProcessor { // need to be refactored after TODO: https://issues.apache.org/jira/browse/IGNITE-20925 /** Get primary replicas. */ - private CompletableFuture<List<NodeWithConsistencyToken>> primaryReplicas(int tableId) { - int catalogVersion = catalogManager.latestCatalogVersion(); - - Catalog catalog = catalogManager.catalog(catalogVersion); - - CatalogTableDescriptor tblDesc = Objects.requireNonNull(catalog.table(tableId), "table"); - - CatalogZoneDescriptor zoneDesc = Objects.requireNonNull(catalog.zone(tblDesc.zoneId()), "zone"); - - int partitions = zoneDesc.partitions(); + private CompletableFuture<List<NodeWithConsistencyToken>> primaryReplicas(IgniteTable table) { + int partitions = table.partitions(); List<CompletableFuture<NodeWithConsistencyToken>> result = new ArrayList<>(partitions); @@ -389,7 +377,7 @@ public class SqlQueryProcessor implements QueryProcessor { // no need to wait all partitions after pruning was implemented. for (int partId = 0; partId < partitions; ++partId) { int partitionId = partId; - ReplicationGroupId partGroupId = new TablePartitionId(tableId, partitionId); + ReplicationGroupId partGroupId = new TablePartitionId(table.id(), partitionId); CompletableFuture<ReplicaMeta> f = placementDriver.awaitPrimaryReplica( partGroupId, diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java index 03cfc6e7ff..4bc47473ca 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java @@ -80,6 +80,9 @@ public class ExecutableTableRegistryImpl implements ExecutableTableRegistry { TableDescriptor tableDescriptor = sqlTable.descriptor(); SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(sqlTable.id()); + // TODO Can be removed after https://issues.apache.org/jira/browse/IGNITE-20680 + assert schemaRegistry != null : "SchemaRegistry does not exist: " + sqlTable.id(); + SchemaDescriptor schemaDescriptor = schemaRegistry.schema(sqlTable.version()); TableRowConverterFactory converterFactory = new TableRowConverterFactoryImpl(schemaRegistry, schemaDescriptor); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTable.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTable.java index 4a2efe0b10..56959335b9 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTable.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTable.java @@ -30,4 +30,11 @@ public interface IgniteTable extends IgniteDataSource { * @return Indexes for the current table. */ Map<String, IgniteIndex> indexes(); + + /** + * Returns the number of partitions for this table. + * + * @return Number of partitions. + */ + int partitions(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java index 83982657ae..1e412f468b 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java @@ -34,12 +34,15 @@ public class IgniteTableImpl extends AbstractIgniteDataSource implements IgniteT private final Map<String, IgniteIndex> indexMap; + private final int partitions; + /** Constructor. */ public IgniteTableImpl(String name, int id, int version, TableDescriptor desc, - Statistic statistic, Map<String, IgniteIndex> indexMap) { + Statistic statistic, Map<String, IgniteIndex> indexMap, int partitions) { super(name, id, version, desc, statistic); this.indexMap = indexMap; + this.partitions = partitions; } /** {@inheritDoc} */ @@ -48,6 +51,12 @@ public class IgniteTableImpl extends AbstractIgniteDataSource implements IgniteT return indexMap; } + /** {@inheritDoc} */ + @Override + public int partitions() { + return partitions; + } + /** {@inheritDoc} */ @Override protected TableScan toRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable relOptTbl, List<RelHint> hints) { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java index 8c83c9b88b..30c088513d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescripto import org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescriptor.SystemViewType; import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.schema.DefaultValueGenerator; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type; @@ -143,7 +144,14 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager { throw new IgniteInternalException(Common.INTERNAL_ERR, "Table with given id not found: " + tableId); } - return createTable(tableDescriptor, createTableDescriptorForTable(tableDescriptor), Map.of()); + int zoneId = tableDescriptor.zoneId(); + CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId); + + if (zoneDescriptor == null) { + throw new IgniteInternalException(Common.INTERNAL_ERR, "Zone with given id not found: " + zoneId); + } + + return createTable(tableDescriptor, createTableDescriptorForTable(tableDescriptor), Map.of(), zoneDescriptor.partitions()); }); } @@ -157,14 +165,15 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager { SchemaPlus rootSchema = Frameworks.createRootSchema(false); for (CatalogSchemaDescriptor schemaDescriptor : catalog.schemas()) { - IgniteSchema igniteSchema = createSqlSchema(catalog.version(), schemaDescriptor); + IgniteSchema igniteSchema = createSqlSchema(catalog, schemaDescriptor); rootSchema.add(igniteSchema.getName(), igniteSchema); } return rootSchema; } - private static IgniteSchema createSqlSchema(int catalogVersion, CatalogSchemaDescriptor schemaDescriptor) { + private static IgniteSchema createSqlSchema(Catalog catalog, CatalogSchemaDescriptor schemaDescriptor) { + int catalogVersion = catalog.version(); String schemaName = schemaDescriptor.name(); int numTables = schemaDescriptor.tables().length; @@ -208,7 +217,11 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager { Map<String, IgniteIndex> tableIndexMap = schemaTableIndexes.getOrDefault(tableId, Collections.emptyMap()); - IgniteTable schemaTable = createTable(tableDescriptor, descriptor, tableIndexMap); + int zoneId = tableDescriptor.zoneId(); + CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId); + assert zoneDescriptor != null : "Zone is not found in schema: " + zoneId; + + IgniteTable schemaTable = createTable(tableDescriptor, descriptor, tableIndexMap, zoneDescriptor.partitions()); schemaDataSources.add(schemaTable); } @@ -358,7 +371,8 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager { private static IgniteTable createTable( CatalogTableDescriptor catalogTableDescriptor, TableDescriptor tableDescriptor, - Map<String, IgniteIndex> indexes + Map<String, IgniteIndex> indexes, + int parititions ) { int tableId = catalogTableDescriptor.id(); String tableName = catalogTableDescriptor.name(); @@ -373,7 +387,8 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager { catalogTableDescriptor.tableVersion(), tableDescriptor, statistic, - indexes + indexes, + parititions ); } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java index ac7f6f3514..b9f34d6477 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java @@ -147,7 +147,7 @@ public class ExecutableTableRegistrySelfTest extends BaseIgniteAbstractTest { when(schemaRegistry.schema(tableVersion)).thenReturn(schemaDescriptor); IgniteTable sqlTable = new IgniteTableImpl( - table.name(), tableId, tableVersion, descriptor, new TestStatistic(1_000.0), Map.of() + table.name(), tableId, tableVersion, descriptor, new TestStatistic(1_000.0), Map.of(), 1 ); when(sqlSchemaManager.table(schemaVersion, tableId)).thenReturn(sqlTable); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java index 50ab6d1f42..6a16c2a48e 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java @@ -61,7 +61,7 @@ public class TestTable extends IgniteTableImpl { Map<String, DataProvider<?>> dataProviders ) { super(name, tableId, 1, descriptor, new TestStatistic(rowCnt), - indexList.stream().collect(Collectors.toUnmodifiableMap(IgniteIndex::name, Function.identity()))); + indexList.stream().collect(Collectors.toUnmodifiableMap(IgniteIndex::name, Function.identity())), 1); this.dataProviders = dataProviders; } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/TypeCoercionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/TypeCoercionTest.java index 55c07a6faf..4c51b57e9a 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/TypeCoercionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/TypeCoercionTest.java @@ -650,6 +650,11 @@ public class TypeCoercionTest extends AbstractPlannerTest { return Map.of(); } + @Override + public int partitions() { + return 1; + } + @Override public String name() { return name; diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java index 5a8f5d5f62..8cabfe34be 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java @@ -48,12 +48,14 @@ import org.apache.calcite.schema.Table; import org.apache.ignite.internal.catalog.CatalogCommand; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.CatalogTestUtils; +import org.apache.ignite.internal.catalog.commands.CatalogUtils; import org.apache.ignite.internal.catalog.commands.ColumnParams; import org.apache.ignite.internal.catalog.commands.CreateHashIndexCommand; import org.apache.ignite.internal.catalog.commands.CreateSortedIndexCommand; import org.apache.ignite.internal.catalog.commands.CreateSystemViewCommand; import org.apache.ignite.internal.catalog.commands.CreateTableCommand; import org.apache.ignite.internal.catalog.commands.CreateTableCommandBuilder; +import org.apache.ignite.internal.catalog.commands.CreateZoneCommand; import org.apache.ignite.internal.catalog.commands.DefaultValue; import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand; import org.apache.ignite.internal.catalog.commands.StartBuildingIndexCommand; @@ -64,6 +66,7 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescripto import org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescriptor.SystemViewType; import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.schema.DefaultValueGenerator; @@ -174,10 +177,61 @@ public class SqlSchemaManagerImplTest extends BaseIgniteAbstractTest { assertThat(schema.getName(), equalTo(schemaDescriptor.name())); for (CatalogTableDescriptor tableDescriptor : schemaDescriptor.tables()) { - assertThat(schema.getTable(tableDescriptor.name()), notNullValue()); + int zoneId = tableDescriptor.zoneId(); + CatalogZoneDescriptor zoneDescriptor = catalogManager.zone(zoneId, versionAfter); + assertNotNull(zoneDescriptor, "Zone does not exist: " + zoneId); + + Table table = schema.getTable(tableDescriptor.name()); + assertThat(table, notNullValue()); + + IgniteTable igniteTable = assertInstanceOf(IgniteTable.class, table); + + assertEquals(zoneDescriptor.partitions(), igniteTable.partitions(), + "Number of partitions is not correct: " + tableDescriptor.name()); + + assertEquals(CatalogUtils.DEFAULT_PARTITION_COUNT, igniteTable.partitions(), + "Number of partitions is not correct: " + tableDescriptor.name()); } } + /** Create a table with a zone. */ + @Test + public void testTableWithZone() { + int partitions = 10; + + await(catalogManager.execute(CreateZoneCommand.builder() + .partitions(partitions) + .zoneName("ABC") + .build()) + ); + + await(catalogManager.execute(CreateTableCommand.builder() + .schemaName(PUBLIC_SCHEMA_NAME) + .tableName("T") + .columns(List.of( + ColumnParams.builder().name("ID").type(ColumnType.INT32).nullable(false).build(), + ColumnParams.builder().name("VAL").type(ColumnType.INT32).build() + )) + .primaryKeyColumns(List.of("ID")) + .zone("ABC") + .build())); + + int version = catalogManager.latestCatalogVersion(); + + SchemaPlus rootSchema = sqlSchemaManager.schema(version); + assertNotNull(rootSchema); + + SchemaPlus schemaPlus = rootSchema.getSubSchema(PUBLIC_SCHEMA_NAME); + assertNotNull(schemaPlus); + + IgniteSchema schema = unwrapSchema(schemaPlus); + Table table = schema.getTable("T"); + assertNotNull(table); + + IgniteTable igniteTable = assertInstanceOf(IgniteTable.class, table); + assertEquals(partitions, igniteTable.partitions()); + } + /** * Table column types. */