This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 37ab4c7ab3 IGNITE-21584 Make ExecutableTableRegistry method
synchronous (#4926)
37ab4c7ab3 is described below
commit 37ab4c7ab33c586edfa5d7eb3077ef0d8c0e408e
Author: korlov42 <[email protected]>
AuthorDate: Thu Dec 19 13:06:07 2024 +0200
IGNITE-21584 Make ExecutableTableRegistry method synchronous (#4926)
---
.../internal/sql/engine/SqlQueryProcessor.java | 2 +-
.../sql/engine/exec/ExecutableTableRegistry.java | 6 +-
.../engine/exec/ExecutableTableRegistryImpl.java | 53 +++++----
.../engine/exec/ExecutionDependencyResolver.java | 3 +-
.../exec/ExecutionDependencyResolverImpl.java | 20 +---
.../sql/engine/exec/ExecutionServiceImpl.java | 9 +-
.../sql/engine/prepare/KeyValueGetPlan.java | 127 ++++++++++-----------
.../sql/engine/prepare/KeyValueModifyPlan.java | 19 ++-
.../sql/engine/prepare/SelectCountPlan.java | 6 +-
.../exec/ExecutableTableRegistrySelfTest.java | 20 ++--
.../exec/ExecutionDependencyResolverSelfTest.java | 41 +++----
.../sql/engine/exec/ExecutionServiceImplTest.java | 29 +----
.../engine/exec/NoOpExecutableTableRegistry.java | 102 ++++++++++++-----
.../sql/engine/framework/TestBuilders.java | 6 +-
14 files changed, 211 insertions(+), 232 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index f6fad24987..7ae2a7eab4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -298,7 +298,7 @@ public class SqlQueryProcessor implements QueryProcessor,
SystemViewProvider {
);
var executableTableRegistry = new ExecutableTableRegistryImpl(
- tableManager, schemaManager, sqlSchemaManager, replicaService,
clockService, TABLE_CACHE_SIZE
+ tableManager, schemaManager, sqlSchemaManager, replicaService,
clockService, TABLE_CACHE_SIZE, CACHE_FACTORY
);
var tableFunctionRegistry = new TableFunctionRegistryImpl();
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistry.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistry.java
index af0c11cb61..ea5428eacc 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistry.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistry.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.sql.engine.exec;
-import java.util.concurrent.CompletableFuture;
-
/**
* Provides access to read/write APIs for SQL engine.
*/
@@ -29,7 +27,7 @@ public interface ExecutableTableRegistry {
*
* @param catalogVersion Version of the catalog.
* @param tableId Table Id.
- * @return An operation that returns executable table.
+ * @return An executable table.
*/
- CompletableFuture<ExecutableTable> getTable(int catalogVersion, int
tableId);
+ ExecutableTable getTable(int catalogVersion, int tableId);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
index 71a4c74db3..964a4287b7 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
@@ -17,10 +17,7 @@
package org.apache.ignite.internal.sql.engine.exec;
-import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -31,7 +28,10 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import org.apache.ignite.internal.sql.engine.util.cache.Cache;
+import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
/**
@@ -50,7 +50,7 @@ public class ExecutableTableRegistryImpl implements
ExecutableTableRegistry {
private final ClockService clockService;
/** Executable tables cache. */
- final ConcurrentMap<CacheKey, CompletableFuture<ExecutableTable>>
tableCache;
+ final Cache<CacheKey, ExecutableTable> tableCache;
/** Constructor. */
public ExecutableTableRegistryImpl(
@@ -59,7 +59,8 @@ public class ExecutableTableRegistryImpl implements
ExecutableTableRegistry {
SqlSchemaManager sqlSchemaManager,
ReplicaService replicaService,
ClockService clockService,
- int cacheSize
+ int cacheSize,
+ CacheFactory cacheFactory
) {
this.sqlSchemaManager = sqlSchemaManager;
@@ -67,40 +68,38 @@ public class ExecutableTableRegistryImpl implements
ExecutableTableRegistry {
this.schemaManager = schemaManager;
this.replicaService = replicaService;
this.clockService = clockService;
- this.tableCache = Caffeine.newBuilder()
- .maximumSize(cacheSize)
- .<CacheKey, ExecutableTable>buildAsync().asMap();
+ this.tableCache = cacheFactory.create(cacheSize);
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<ExecutableTable> getTable(int catalogVersion, int
tableId) {
+ public ExecutableTable getTable(int catalogVersion, int tableId) {
IgniteTable sqlTable = sqlSchemaManager.table(catalogVersion, tableId);
- return tableCache.computeIfAbsent(cacheKey(tableId,
sqlTable.version()), (k) -> loadTable(sqlTable));
+ return tableCache.get(cacheKey(tableId, sqlTable.version()), (k) ->
loadTable(sqlTable));
}
- // TODO https://issues.apache.org/jira/browse/IGNITE-21584 Remove future.
- private CompletableFuture<ExecutableTable> loadTable(IgniteTable sqlTable)
{
- return
CompletableFuture.completedFuture(tableManager.cachedTable(sqlTable.id()))
- .thenApply((table) -> {
- TableDescriptor tableDescriptor = sqlTable.descriptor();
+ private ExecutableTable loadTable(IgniteTable sqlTable) {
+ TableViewInternal table = tableManager.cachedTable(sqlTable.id());
- SchemaRegistry schemaRegistry =
schemaManager.schemaRegistry(sqlTable.id());
- SchemaDescriptor schemaDescriptor =
schemaRegistry.schema(sqlTable.version());
- TableRowConverterFactory converterFactory = new
TableRowConverterFactoryImpl(
- tableDescriptor, schemaRegistry, schemaDescriptor
- );
+ assert table != null : "Table not found: tableId=" + sqlTable.id();
- InternalTable internalTable = table.internalTable();
- ScannableTable scannableTable = new
ScannableTableImpl(internalTable, converterFactory);
- TableRowConverter rowConverter =
converterFactory.create(null);
+ TableDescriptor tableDescriptor = sqlTable.descriptor();
- UpdatableTableImpl updatableTable = new
UpdatableTableImpl(sqlTable.id(), tableDescriptor, internalTable.partitions(),
- internalTable, replicaService, clockService,
rowConverter);
+ SchemaRegistry schemaRegistry =
schemaManager.schemaRegistry(sqlTable.id());
+ SchemaDescriptor schemaDescriptor =
schemaRegistry.schema(sqlTable.version());
+ TableRowConverterFactory converterFactory = new
TableRowConverterFactoryImpl(
+ tableDescriptor, schemaRegistry, schemaDescriptor
+ );
- return new ExecutableTableImpl(scannableTable,
updatableTable, sqlTable.partitionCalculator());
- });
+ InternalTable internalTable = table.internalTable();
+ ScannableTable scannableTable = new ScannableTableImpl(internalTable,
converterFactory);
+ TableRowConverter rowConverter = converterFactory.create(null);
+
+ UpdatableTableImpl updatableTable = new
UpdatableTableImpl(sqlTable.id(), tableDescriptor, internalTable.partitions(),
+ internalTable, replicaService, clockService, rowConverter);
+
+ return new ExecutableTableImpl(scannableTable, updatableTable,
sqlTable.partitionCalculator());
}
private static final class ExecutableTableImpl implements ExecutableTable {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolver.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolver.java
index 35def703ea..e989b41cf0 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolver.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolver.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.sql.engine.exec;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
/**
@@ -29,5 +28,5 @@ public interface ExecutionDependencyResolver {
/**
* Resolves dependencies required to execute the given list of relations.
*/
- CompletableFuture<ResolvedDependencies>
resolveDependencies(Iterable<IgniteRel> rels, int catalogVersion);
+ ResolvedDependencies resolveDependencies(Iterable<IgniteRel> rels, int
catalogVersion);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverImpl.java
index 135441e809..3a1ccb159d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverImpl.java
@@ -17,13 +17,8 @@
package org.apache.ignite.internal.sql.engine.exec;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
@@ -59,8 +54,8 @@ public class ExecutionDependencyResolverImpl implements
ExecutionDependencyResol
* {@inheritDoc}
*/
@Override
- public CompletableFuture<ResolvedDependencies>
resolveDependencies(Iterable<IgniteRel> rels, int catalogVersion) {
- Map<Integer, CompletableFuture<ExecutableTable>> tableMap = new
HashMap<>();
+ public ResolvedDependencies resolveDependencies(Iterable<IgniteRel> rels,
int catalogVersion) {
+ Map<Integer, ExecutableTable> tableMap = new HashMap<>();
Map<Integer, ScannableDataSource> dataSources = new HashMap<>();
IgniteRelShuttle shuttle = new IgniteRelShuttle() {
@@ -139,15 +134,6 @@ public class ExecutionDependencyResolverImpl implements
ExecutionDependencyResol
shuttle.visit(rel);
}
- List<CompletableFuture<ExecutableTable>> fs = new
ArrayList<>(tableMap.values());
-
- return CompletableFuture.allOf(fs.toArray(new CompletableFuture<?>[0]))
- .thenApply(r -> {
- Map<Integer, ExecutableTable> map = tableMap.entrySet()
- .stream()
- .collect(Collectors.toMap(Entry::getKey, e ->
e.getValue().join()));
-
- return new ResolvedDependencies(map, dataSources);
- });
+ return new ResolvedDependencies(tableMap, dataSources);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index bc40d88a52..a78d2f6fd6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -49,7 +49,6 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -965,15 +964,11 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
TxAttributes txAttributes
) {
try {
- // Because fragment execution runs on specific thread selected
by taskExecutor,
- // we should complete dependency resolution on the same thread
- // that is going to be used for fragment execution.
ExecutionContext<RowT> context = createContext(initiatorNode,
desc, txAttributes);
- Executor exec = (r) -> context.execute(r::run, err ->
handleError(err, initiatorNode, desc.fragmentId()));
IgniteRel treeRoot =
relationalTreeFromJsonString(catalogVersion, fragmentString);
- dependencyResolver.resolveDependencies(List.of(treeRoot),
catalogVersion)
- .thenComposeAsync(deps -> executeFragment(treeRoot,
deps, context), exec)
+ ResolvedDependencies resolvedDependencies =
dependencyResolver.resolveDependencies(List.of(treeRoot), catalogVersion);
+ executeFragment(treeRoot, resolvedDependencies, context)
.exceptionally(ex -> {
handleError(ex, initiatorNode, desc.fragmentId());
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
index 1c7f95238a..c866e095ae 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
@@ -38,6 +38,7 @@ import
org.apache.ignite.internal.sql.engine.InternalSqlRowImpl;
import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.ExecutablePlan;
+import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
@@ -134,70 +135,68 @@ public class KeyValueGetPlan implements ExplainablePlan,
ExecutablePlan {
@Nullable QueryPrefetchCallback firstPageReadyCallback
) {
IgniteTable sqlTable = table();
-
- CompletableFuture<Iterator<InternalSqlRow>> result =
tableRegistry.getTable(catalogVersion, sqlTable.id())
- .thenCompose(execTable -> {
-
- ImmutableBitSet requiredColumns =
lookupNode.requiredColumns();
- RexNode filterExpr = lookupNode.condition();
- List<RexNode> projectionExpr = lookupNode.projects();
- List<RexNode> keyExpressions = lookupNode.keyExpressions();
-
- RelDataType rowType =
sqlTable.getRowType(Commons.typeFactory(), requiredColumns);
-
- Supplier<RowT> keySupplier = ctx.expressionFactory()
- .rowSource(keyExpressions);
- Predicate<RowT> filter = filterExpr == null ? null :
ctx.expressionFactory()
- .predicate(filterExpr, rowType);
- Function<RowT, RowT> projection = projectionExpr == null ?
null : ctx.expressionFactory()
- .project(projectionExpr, rowType);
-
- RowHandler<RowT> rowHandler = ctx.rowHandler();
- RowSchema rowSchema =
TypeUtils.rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
- RowFactory<RowT> rowFactory =
rowHandler.factory(rowSchema);
-
- RelDataType resultType = lookupNode.getRowType();
- BiFunction<Integer, Object, Object> internalTypeConverter
= TypeUtils.resultTypeConverter(ctx, resultType);
-
- ScannableTable scannableTable = execTable.scannableTable();
- Function<RowT, Iterator<InternalSqlRow>> postProcess = row
-> {
- if (row == null) {
- return Collections.emptyIterator();
- }
-
- if (filter != null && !filter.test(row)) {
- return Collections.emptyIterator();
- }
-
- if (projection != null) {
- row = projection.apply(row);
- }
-
- return List.<InternalSqlRow>of(
- new InternalSqlRowImpl<>(row, rowHandler,
internalTypeConverter)
- ).iterator();
- };
-
- CompletableFuture<RowT> lookupResult =
scannableTable.primaryKeyLookup(
- ctx, tx, rowFactory, keySupplier.get(),
requiredColumns.toBitSet()
- );
-
- if (projection == null && filter == null) {
- // no arbitrary computations, should be safe to
proceed execution on
- // thread that completes the future
- return lookupResult.thenApply(postProcess);
- } else {
- Executor executor = task -> ctx.execute(task::run,
error -> {
- // this executor is used to process future chain,
so any unhandled exception
- // should be wrapped with CompletionException and
returned as a result, implying
- // no error handler should be called.
- // But just in case there is error in future
processing pipeline let's log error
- LOG.error("Unexpected error", error);
- });
-
- return lookupResult.thenApplyAsync(postProcess,
executor);
- }
- });
+ ExecutableTable execTable = tableRegistry.getTable(catalogVersion,
sqlTable.id());
+
+ ImmutableBitSet requiredColumns = lookupNode.requiredColumns();
+ RexNode filterExpr = lookupNode.condition();
+ List<RexNode> projectionExpr = lookupNode.projects();
+ List<RexNode> keyExpressions = lookupNode.keyExpressions();
+
+ RelDataType rowType = sqlTable.getRowType(Commons.typeFactory(),
requiredColumns);
+
+ Supplier<RowT> keySupplier = ctx.expressionFactory()
+ .rowSource(keyExpressions);
+ Predicate<RowT> filter = filterExpr == null ? null :
ctx.expressionFactory()
+ .predicate(filterExpr, rowType);
+ Function<RowT, RowT> projection = projectionExpr == null ? null :
ctx.expressionFactory()
+ .project(projectionExpr, rowType);
+
+ RowHandler<RowT> rowHandler = ctx.rowHandler();
+ RowSchema rowSchema =
TypeUtils.rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
+ RowFactory<RowT> rowFactory = rowHandler.factory(rowSchema);
+
+ RelDataType resultType = lookupNode.getRowType();
+ BiFunction<Integer, Object, Object> internalTypeConverter =
TypeUtils.resultTypeConverter(ctx, resultType);
+
+ ScannableTable scannableTable = execTable.scannableTable();
+ Function<RowT, Iterator<InternalSqlRow>> postProcess = row -> {
+ if (row == null) {
+ return Collections.emptyIterator();
+ }
+
+ if (filter != null && !filter.test(row)) {
+ return Collections.emptyIterator();
+ }
+
+ if (projection != null) {
+ row = projection.apply(row);
+ }
+
+ return List.<InternalSqlRow>of(
+ new InternalSqlRowImpl<>(row, rowHandler,
internalTypeConverter)
+ ).iterator();
+ };
+
+ CompletableFuture<RowT> lookupResult = scannableTable.primaryKeyLookup(
+ ctx, tx, rowFactory, keySupplier.get(),
requiredColumns.toBitSet()
+ );
+
+ CompletableFuture<Iterator<InternalSqlRow>> result;
+ if (projection == null && filter == null) {
+ // no arbitrary computations, should be safe to proceed execution
on
+ // thread that completes the future
+ result = lookupResult.thenApply(postProcess);
+ } else {
+ Executor executor = task -> ctx.execute(task::run, error -> {
+ // this executor is used to process future chain, so any
unhandled exception
+ // should be wrapped with CompletionException and returned as
a result, implying
+ // no error handler should be called.
+ // But just in case there is error in future processing
pipeline let's log error
+ LOG.error("Unexpected error", error);
+ });
+
+ result = lookupResult.thenApplyAsync(postProcess, executor);
+ }
if (firstPageReadyCallback != null) {
result.whenComplete((res, err) ->
firstPageReadyCallback.onPrefetchComplete(err));
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
index e2dd818b30..be69317f3c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
@@ -29,6 +29,7 @@ import
org.apache.ignite.internal.sql.engine.InternalSqlRowSingleLong;
import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.ExecutablePlan;
+import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.UpdatableTable;
@@ -119,20 +120,18 @@ public class KeyValueModifyPlan implements
ExplainablePlan, ExecutablePlan {
@Nullable QueryPrefetchCallback firstPageReadyCallback
) {
IgniteTable sqlTable = table();
+ ExecutableTable execTable = tableRegistry.getTable(catalogVersion,
sqlTable.id());
- CompletableFuture<Iterator<InternalSqlRow>> result =
tableRegistry.getTable(catalogVersion, sqlTable.id())
- .thenCompose(execTable -> {
- List<RexNode> expressions = modifyNode.expressions();
+ List<RexNode> expressions = modifyNode.expressions();
- Supplier<RowT> rowSupplier = ctx.expressionFactory()
- .rowSource(expressions);
+ Supplier<RowT> rowSupplier = ctx.expressionFactory()
+ .rowSource(expressions);
- UpdatableTable updatableTable = execTable.updatableTable();
+ UpdatableTable updatableTable = execTable.updatableTable();
- return updatableTable.insert(
- tx, ctx, rowSupplier.get()
- ).thenApply(none -> List.<InternalSqlRow>of(new
InternalSqlRowSingleLong(1L)).iterator());
- });
+ CompletableFuture<Iterator<InternalSqlRow>> result =
updatableTable.insert(
+ tx, ctx, rowSupplier.get()
+ ).thenApply(none -> List.<InternalSqlRow>of(new
InternalSqlRowSingleLong(1L)).iterator());
if (firstPageReadyCallback != null) {
result.whenComplete((res, err) ->
firstPageReadyCallback.onPrefetchComplete(err));
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/SelectCountPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/SelectCountPlan.java
index 266f9ad59b..6bbc02afc2 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/SelectCountPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/SelectCountPlan.java
@@ -37,6 +37,7 @@ import
org.apache.ignite.internal.sql.engine.InternalSqlRowImpl;
import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.ExecutablePlan;
+import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
@@ -102,8 +103,9 @@ public class SelectCountPlan implements ExplainablePlan,
ExecutablePlan {
IgniteTable igniteTable = optTable.unwrap(IgniteTable.class);
assert igniteTable != null;
- CompletableFuture<Long> countFut =
tableRegistry.getTable(catalogVersion, igniteTable.id())
- .thenCompose(execTable ->
execTable.scannableTable().estimatedSize());
+ ExecutableTable execTable = tableRegistry.getTable(catalogVersion,
igniteTable.id());
+
+ CompletableFuture<Long> countFut =
execTable.scannableTable().estimatedSize();
Executor resultExecutor = task -> ctx.execute(task::run, error -> {
LOG.error("Unexpected error", error);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java
index aabe02c006..89f7faac20 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.Map;
import java.util.Spliterators;
-import java.util.concurrent.CompletableFuture;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -40,6 +39,7 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.IgniteTableImpl;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
@@ -94,8 +94,7 @@ public class ExecutableTableRegistrySelfTest extends
BaseIgniteAbstractTest {
int tableId = 1;
- CompletableFuture<ExecutableTable> f = tester.getTable(tableId);
- ExecutableTable executableTable = f.join();
+ ExecutableTable executableTable = tester.getTable(tableId);
assertNotNull(executableTable.scannableTable());
assertNotNull(executableTable.updatableTable());
@@ -107,13 +106,9 @@ public class ExecutableTableRegistrySelfTest extends
BaseIgniteAbstractTest {
int cacheSize = 2;
Tester tester = new Tester(cacheSize);
- CompletableFuture<ExecutableTable> f1 = tester.getTable(1);
- CompletableFuture<ExecutableTable> f2 = tester.getTable(2);
- CompletableFuture<ExecutableTable> f3 = tester.getTable(3);
-
- f1.join();
- f2.join();
- f3.join();
+ tester.getTable(1);
+ tester.getTable(2);
+ tester.getTable(3);
boolean done = IgniteTestUtils.waitForCondition(() ->
tester.registry.tableCache.size() == cacheSize, 15_000);
assertTrue(done, "Failed to clear the cache");
@@ -142,11 +137,12 @@ public class ExecutableTableRegistrySelfTest extends
BaseIgniteAbstractTest {
sqlSchemaManager,
replicaService,
new TestClockService(clock),
- cacheSize
+ cacheSize,
+ CaffeineCacheFactory.INSTANCE
);
}
- CompletableFuture<ExecutableTable> getTable(int tableId) {
+ ExecutableTable getTable(int tableId) {
int schemaVersion = 1;
int tableVersion = 10;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
index 3ea585858f..0ca941510c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
@@ -18,8 +18,6 @@
package org.apache.ignite.internal.sql.engine.exec;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
@@ -29,8 +27,6 @@ import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
@@ -42,6 +38,7 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.type.NativeTypes;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
@@ -87,9 +84,7 @@ public class ExecutionDependencyResolverSelfTest extends
AbstractPlannerTest {
tester.setDependencies(t1Id, table1, update1);
tester.setDependencies(t2Id, table2, update2);
- CompletableFuture<ResolvedDependencies> f =
tester.resolveDependencies("SELECT * FROM test1 JOIN test2 ON test1.id =
test2.id");
-
- ResolvedDependencies deps = f.join();
+ ResolvedDependencies deps = tester.resolveDependencies("SELECT * FROM
test1 JOIN test2 ON test1.id = test2.id");
tester.checkDependencies(deps, t1Id);
tester.checkDependencies(deps, t2Id);
@@ -109,8 +104,8 @@ public class ExecutionDependencyResolverSelfTest extends
AbstractPlannerTest {
Tester tester = new Tester(createSchema(table));
tester.setDependencies(t1Id, table1, update1);
- CompletableFuture<ResolvedDependencies> f =
tester.resolveDependencies("SELECT * FROM test1 WHERE id=1");
- tester.checkDependencies(f.join(), t1Id);
+ ResolvedDependencies deps = tester.resolveDependencies("SELECT * FROM
test1 WHERE id=1");
+ tester.checkDependencies(deps, t1Id);
verify(registry, times(1)).getTable(anyInt(), eq(t1Id));
}
@@ -131,10 +126,8 @@ public class ExecutionDependencyResolverSelfTest extends
AbstractPlannerTest {
tester.setDependencies(t1Id, table1, update1);
tester.setDependencies(t2Id, table2, update2);
- CompletableFuture<ResolvedDependencies> f = tester.resolveDependencies(
+ ResolvedDependencies deps = tester.resolveDependencies(
"MERGE INTO test2 dst USING test1 src ON dst.id = src.id WHEN
MATCHED THEN UPDATE SET val = src.val");
-
- ResolvedDependencies deps = f.join();
tester.checkDependencies(deps, t1Id);
tester.checkDependencies(deps, t2Id);
@@ -155,9 +148,7 @@ public class ExecutionDependencyResolverSelfTest extends
AbstractPlannerTest {
tester.setDependencies(t1Id, table1, update1);
- CompletableFuture<ResolvedDependencies> f =
tester.resolveDependencies("SELECT (SELECT id FROM test1) FROM test1");
-
- ResolvedDependencies deps = f.join();
+ ResolvedDependencies deps = tester.resolveDependencies("SELECT (SELECT
id FROM test1) FROM test1");
tester.checkDependencies(deps, t1Id);
verify(registry, times(1)).getTable(anyInt(), anyInt());
@@ -177,9 +168,12 @@ public class ExecutionDependencyResolverSelfTest extends
AbstractPlannerTest {
RuntimeException err = new RuntimeException("Broken");
tester.setError(t1Id, err);
- CompletableFuture<ResolvedDependencies> f =
tester.resolveDependencies("SELECT * FROM test1");
- CompletionException wrapped = assertThrows(CompletionException.class,
f::join);
- assertSame(err, wrapped.getCause());
+ //noinspection ThrowableNotThrown
+ IgniteTestUtils.assertThrows(
+ RuntimeException.class,
+ () -> tester.resolveDependencies("SELECT * FROM test1"),
+ err.getMessage()
+ );
}
private class Tester {
@@ -197,19 +191,14 @@ public class ExecutionDependencyResolverSelfTest extends
AbstractPlannerTest {
deps.put(tableId, executableTable);
- CompletableFuture<ExecutableTable> f =
CompletableFuture.completedFuture(executableTable);
-
- when(registry.getTable(anyInt(), eq(tableId))).thenReturn(f);
+ when(registry.getTable(anyInt(),
eq(tableId))).thenReturn(executableTable);
}
void setError(int tableId, Throwable err) {
- CompletableFuture<ExecutableTable> f = new CompletableFuture<>();
- f.completeExceptionally(err);
-
- when(registry.getTable(anyInt(), eq(tableId))).thenReturn(f);
+ when(registry.getTable(anyInt(), eq(tableId))).thenThrow(err);
}
- CompletableFuture<ResolvedDependencies> resolveDependencies(String
sql) {
+ ResolvedDependencies resolveDependencies(String sql) {
ExecutionDependencyResolver resolver = new
ExecutionDependencyResolverImpl(registry, null);
IgniteRel rel;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 8c0959ebb5..ec189755bb 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -45,8 +45,6 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -887,8 +885,6 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
@Test
public void testTimeoutKvGet() {
- int deadlineMillis = 500;
-
// Use a separate context, so planning won't timeout.
SqlOperationContext planCtx = createContext();
QueryPlan plan = prepare("SELECT * FROM test_tbl WHERE id = 1",
planCtx);
@@ -896,12 +892,8 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
assertInstanceOf(KeyValueGetPlan.class, plan);
ExecutionServiceImpl<?> execService = executionServices.get(0);
- NoOpExecutableTableRegistry tableRegistry =
(NoOpExecutableTableRegistry) execService.tableRegistry();
- Duration delay = Duration.of(deadlineMillis * 2, ChronoUnit.MILLIS);
- tableRegistry.setGetTableDelay(delay);
-
- awaitExecutionTimeout(execService, plan, deadlineMillis,
SqlException.class);
+ awaitExecutionTimeout(execService, plan, 500, SqlException.class);
}
@Test
@@ -912,13 +904,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
assertInstanceOf(KeyValueModifyPlan.class, plan);
- int deadlineMillis = 500;
-
ExecutionServiceImpl<?> execService = executionServices.get(0);
- NoOpExecutableTableRegistry tableRegistry =
(NoOpExecutableTableRegistry) execService.tableRegistry();
-
- Duration delay = Duration.of(deadlineMillis * 2, ChronoUnit.MILLIS);
- tableRegistry.setGetTableDelay(delay);
awaitExecutionTimeout(execService, plan, 500, SqlException.class);
}
@@ -931,15 +917,8 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
assertInstanceOf(DdlPlan.class, plan);
- int deadlineMillis = 500;
-
ExecutionServiceImpl<?> execService = executionServices.get(0);
- NoOpExecutableTableRegistry tableRegistry =
(NoOpExecutableTableRegistry) execService.tableRegistry();
-
- Duration delay = Duration.of(deadlineMillis * 2, ChronoUnit.MILLIS);
- tableRegistry.setGetTableDelay(delay);
-
DdlCommandHandler ddlCommandHandler = execService.ddlCommandHandler();
when(ddlCommandHandler.handle(any(CatalogCommand.class))).thenReturn(new
CompletableFuture<>());
@@ -990,13 +969,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
assertInstanceOf(SelectCountPlan.class, plan);
- int deadlineMillis = 500;
-
ExecutionServiceImpl<?> execService = executionServices.get(0);
- NoOpExecutableTableRegistry tableRegistry =
(NoOpExecutableTableRegistry) execService.tableRegistry();
-
- Duration delay = Duration.of(deadlineMillis * 2, ChronoUnit.MILLIS);
- tableRegistry.setGetTableDelay(delay);
Function<QueryCancel, SqlOperationContext> implicitTx = (cancel) ->
operationContext()
.cancel(cancel)
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java
index 15e472a1f5..3b700add39 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java
@@ -17,42 +17,26 @@
package org.apache.ignite.internal.sql.engine.exec;
-import java.time.Duration;
-import java.util.Objects;
+import java.util.BitSet;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Flow.Publisher;
import java.util.function.Supplier;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.SubscriptionUtils;
+import org.jetbrains.annotations.Nullable;
/** Stub implementation for {@link ExecutableTableRegistry}. */
public final class NoOpExecutableTableRegistry implements
ExecutableTableRegistry {
-
- private static final IgniteLogger LOG =
Loggers.forClass(NoOpExecutableTableRegistry.class);
-
- private volatile Duration delay = Duration.ZERO;
-
- /** Sets a delay for {@link #getTable(int, int)} operation. */
- public void setGetTableDelay(Duration delay) {
- this.delay = Objects.requireNonNull(delay, "delay");
- }
-
/** {@inheritDoc} */
@Override
- public CompletableFuture<ExecutableTable> getTable(int catalogVersion, int
tableId) {
- Duration delay = this.delay;
-
- CompletableFuture<ExecutableTable> f = new CompletableFuture<>();
-
- LOG.info("Requested tableId={} in catalogVersion={} with delay={}",
tableId, catalogVersion, delay);
-
- f.completeOnTimeout(new NoOpExecutableTable(tableId),
delay.toMillis(), TimeUnit.MILLISECONDS).thenRun(() -> {
- LOG.info("Return tableId={} in catalogVersion={} after delay={}",
tableId, catalogVersion, delay);
- });
-
- return f;
+ public ExecutableTable getTable(int catalogVersion, int tableId) {
+ return new NoOpExecutableTable(tableId);
}
private static final class NoOpExecutableTable implements ExecutableTable {
@@ -66,13 +50,73 @@ public final class NoOpExecutableTableRegistry implements
ExecutableTableRegistr
/** {@inheritDoc} */
@Override
public ScannableTable scannableTable() {
- throw noDependency();
+ return new ScannableTable() {
+ @Override
+ public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx,
PartitionWithConsistencyToken partWithConsistencyToken,
+ RowFactory<RowT> rowFactory, @Nullable BitSet
requiredColumns) {
+ return SubscriptionUtils.fromIterable(new
CompletableFuture<>());
+ }
+
+ @Override
+ public <RowT> Publisher<RowT>
indexRangeScan(ExecutionContext<RowT> ctx,
+ PartitionWithConsistencyToken
partWithConsistencyToken, RowFactory<RowT> rowFactory, int indexId,
+ List<String> columns, @Nullable RangeCondition<RowT>
cond, @Nullable BitSet requiredColumns) {
+ return SubscriptionUtils.fromIterable(new
CompletableFuture<>());
+ }
+
+ @Override
+ public <RowT> Publisher<RowT>
indexLookup(ExecutionContext<RowT> ctx,
+ PartitionWithConsistencyToken
partWithConsistencyToken, RowFactory<RowT> rowFactory, int indexId,
+ List<String> columns, RowT key, @Nullable BitSet
requiredColumns) {
+ return SubscriptionUtils.fromIterable(new
CompletableFuture<>());
+ }
+
+ @Override
+ public <RowT> CompletableFuture<@Nullable RowT>
primaryKeyLookup(ExecutionContext<RowT> ctx,
+ @Nullable InternalTransaction explicitTx,
RowFactory<RowT> rowFactory, RowT key, @Nullable BitSet requiredColumns) {
+ return new CompletableFuture<>();
+ }
+
+ @Override
+ public CompletableFuture<Long> estimatedSize() {
+ return new CompletableFuture<>();
+ }
+ };
}
/** {@inheritDoc} */
@Override
public UpdatableTable updatableTable() {
- throw noDependency();
+ return new UpdatableTable() {
+ @Override
+ public TableDescriptor descriptor() {
+ return null;
+ }
+
+ @Override
+ public <RowT> CompletableFuture<?>
insertAll(ExecutionContext<RowT> ectx, List<RowT> rows,
+ ColocationGroup colocationGroup) {
+ return new CompletableFuture<>();
+ }
+
+ @Override
+ public <RowT> CompletableFuture<Void> insert(@Nullable
InternalTransaction explicitTx, ExecutionContext<RowT> ectx,
+ RowT row) {
+ return new CompletableFuture<>();
+ }
+
+ @Override
+ public <RowT> CompletableFuture<?>
upsertAll(ExecutionContext<RowT> ectx, List<RowT> rows,
+ ColocationGroup colocationGroup) {
+ return null;
+ }
+
+ @Override
+ public <RowT> CompletableFuture<?>
deleteAll(ExecutionContext<RowT> ectx, List<RowT> rows,
+ ColocationGroup colocationGroup) {
+ return new CompletableFuture<>();
+ }
+ };
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 422ec89914..baeb93cf38 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -1457,12 +1457,12 @@ public class TestBuilders {
}
@Override
- public CompletableFuture<ExecutableTable> getTable(int catalogVersion,
int tableId) {
+ public ExecutableTable getTable(int catalogVersion, int tableId) {
IgniteTable table = schemaManager.table(catalogVersion, tableId);
assert table != null;
- return CompletableFuture.completedFuture(new ExecutableTable() {
+ return new ExecutableTable() {
@Override
public ScannableTable scannableTable() {
ScannableTable scannableTable =
tablesByName.apply(table.name());
@@ -1490,7 +1490,7 @@ public class TestBuilders {
public Supplier<PartitionCalculator> partitionCalculator() {
return table.partitionCalculator();
}
- });
+ };
}
}