This is an automated email from the ASF dual-hosted git repository. korlov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 8ebd9e842a IGNITE-20763 Sql. Clean up query execution (#2770) 8ebd9e842a is described below commit 8ebd9e842a2aaec5e1ae7e3b5e1cb36a040dc565 Author: korlov42 <kor...@gridgain.com> AuthorDate: Thu Nov 2 13:41:09 2023 +0200 IGNITE-20763 Sql. Clean up query execution (#2770) --- .../org/apache/ignite/internal/logger/Loggers.java | 2 +- .../apache/ignite/internal/logger/VoidLogger.java | 4 +- .../internal/benchmark/SqlOneNodeBenchmark.java | 25 ++-- .../internal/sql/engine/SqlQueryProcessor.java | 7 +- .../internal/sql/engine/exec/ExecutionContext.java | 21 +--- .../sql/engine/exec/ExecutionServiceImpl.java | 139 +++++++++++---------- .../sql/engine/exec/exp/ExpressionFactoryImpl.java | 37 +++--- .../sql/engine/exec/rel/AsyncRootNode.java | 14 ++- .../internal/sql/engine/prepare/IgnitePlanner.java | 2 +- .../sql/engine/prepare/PlanningContext.java | 2 +- .../internal/sql/engine/rex/IgniteRexBuilder.java | 4 +- .../sql/engine/schema/SqlSchemaManagerImpl.java | 5 + .../sql/engine/type/IgniteTypeFactory.java | 34 ++--- .../sql/engine/util/AbstractQueryContext.java | 40 ------ .../internal/sql/engine/util/BaseQueryContext.java | 103 ++++----------- .../ignite/internal/sql/engine/util/Commons.java | 7 ++ .../sql/engine/exec/ExecutionServiceImplTest.java | 8 +- .../sql/engine/exec/RuntimeSortedIndexTest.java | 4 - .../engine/exec/exp/ExpressionFactoryImplTest.java | 2 +- .../sql/engine/exec/rel/AbstractExecutionTest.java | 4 - .../engine/exec/rel/MergeJoinExecutionTest.java | 8 +- .../sql/engine/framework/TestBuilders.java | 2 - .../internal/sql/engine/framework/TestNode.java | 2 + .../sql/engine/planner/AbstractPlannerTest.java | 2 +- .../internal/sql/engine/planner/PlannerTest.java | 3 +- .../ddl/AbstractDdlSqlToCommandConverterTest.java | 2 + 26 files changed, 201 insertions(+), 282 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/logger/Loggers.java b/modules/core/src/main/java/org/apache/ignite/internal/logger/Loggers.java index 33e64d839d..5cbe8aec7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/logger/Loggers.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/logger/Loggers.java @@ -82,6 +82,6 @@ public final class Loggers { * @return Void logger. */ public static IgniteLogger voidLogger() { - return new VoidLogger(); + return VoidLogger.INSTANCE; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/logger/VoidLogger.java b/modules/core/src/main/java/org/apache/ignite/internal/logger/VoidLogger.java index bb7b901b78..233473d73c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/logger/VoidLogger.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/logger/VoidLogger.java @@ -24,10 +24,12 @@ import org.jetbrains.annotations.Nullable; * Logger which does not output anything. */ class VoidLogger extends IgniteLogger { + static final VoidLogger INSTANCE = new VoidLogger(); + /** * Creates null logger. */ - VoidLogger() { + private VoidLogger() { super(System.getLogger(VoidLogger.class.getName())); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java index 711ef16845..0762242cbd 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java @@ -45,12 +45,12 @@ import org.openjdk.jmh.runner.options.OptionsBuilder; * Benchmark that runs sql queries via embedded client on single node cluster. */ @State(Scope.Benchmark) -@OutputTimeUnit(TimeUnit.MILLISECONDS) -@BenchmarkMode(Mode.AverageTime) -@Warmup(iterations = 3, time = 5) -@Measurement(iterations = 5, time = 5) -@Threads(1) @Fork(1) +@Threads(1) +@Warmup(iterations = 10, time = 2) +@Measurement(iterations = 20, time = 2) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) @SuppressWarnings({"WeakerAccess", "unused"}) public class SqlOneNodeBenchmark extends AbstractOneNodeBenchmark { private static final int TABLE_SIZE = 30_000; @@ -122,8 +122,6 @@ public class SqlOneNodeBenchmark extends AbstractOneNodeBenchmark { /** Benchmark that measures performance of `SELECT *` query over entire table. */ @Benchmark - @Warmup(iterations = 3, time = 5) - @Measurement(iterations = 5, time = 5) public void selectAll(Blackhole bh) { try (var rs = session.execute(null, "SELECT * FROM usertable")) { while (rs.hasNext()) { @@ -132,6 +130,19 @@ public class SqlOneNodeBenchmark extends AbstractOneNodeBenchmark { } } + /** + * Benchmark to measure overhead of query initialisation. + */ + @Benchmark + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void selectAllFromSystemRange(Blackhole bh) { + try (var rs = session.execute(null, "SELECT * FROM TABLE(system_range(0, 1))")) { + while (rs.hasNext()) { + bh.consume(rs.next()); + } + } + } + /** * Benchmark's entry point. */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 59b17b5c7d..d0f43740f5 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -45,8 +46,6 @@ import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; -import org.apache.ignite.internal.logger.IgniteLogger; -import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metrics.MetricManager; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.schema.SchemaManager; @@ -113,8 +112,6 @@ import org.jetbrains.annotations.TestOnly; * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ public class SqlQueryProcessor implements QueryProcessor { - private static final IgniteLogger LOG = Loggers.forClass(SqlQueryProcessor.class); - /** Size of the cache for query plans. */ private static final int PLAN_CACHE_SIZE = 1024; @@ -453,7 +450,7 @@ public class SqlQueryProcessor implements QueryProcessor { .thenCompose(schema -> { BaseQueryContext ctx = BaseQueryContext.builder() .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build()) - .logger(LOG) + .queryId(UUID.randomUUID()) .cancel(queryCancel) .parameters(params) .build(); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java index 91c5bdfc83..6f883d516d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.sql.engine.exec; +import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG; import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import java.lang.reflect.Type; @@ -41,8 +42,6 @@ import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl; import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup; import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; -import org.apache.ignite.internal.sql.engine.util.AbstractQueryContext; -import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; @@ -50,7 +49,7 @@ import org.jetbrains.annotations.Nullable; /** * Runtime context allowing access to the tables in a database. */ -public class ExecutionContext<RowT> extends AbstractQueryContext implements DataContext { +public class ExecutionContext<RowT> implements DataContext { private static final IgniteLogger LOG = Loggers.forClass(ExecutionContext.class); private static final TimeZone TIME_ZONE = TimeZone.getDefault(); // TODO DistributedSqlConfiguration#timeZone @@ -60,8 +59,6 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data */ private static final Locale LOCALE = Locale.ENGLISH; - private final BaseQueryContext qctx; - private final QueryTaskExecutor executor; private final UUID qryId; @@ -94,7 +91,6 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data * Constructor. * * @param executor Task executor. - * @param qctx Base query context. * @param qryId Query ID. * @param description Partitions information. * @param handler Row handler. @@ -102,7 +98,6 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data */ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") public ExecutionContext( - BaseQueryContext qctx, QueryTaskExecutor executor, UUID qryId, ClusterNode localNode, @@ -112,10 +107,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data Map<String, Object> params, TxAttributes txAttributes ) { - super(qctx); - this.executor = executor; - this.qctx = qctx; this.qryId = qryId; this.description = description; this.handler = handler; @@ -126,8 +118,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data expressionFactory = new ExpressionFactoryImpl<>( this, - this.qctx.typeFactory(), - this.qctx.config().getParserConfig().conformance() + FRAMEWORK_CONFIG.getParserConfig().conformance() ); long ts = System.currentTimeMillis(); @@ -215,19 +206,19 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data /** {@inheritDoc} */ @Override public SchemaPlus getRootSchema() { - throw new AssertionError("getRootSchema"); + throw new AssertionError("should not be called"); } /** {@inheritDoc} */ @Override public IgniteTypeFactory getTypeFactory() { - return qctx.typeFactory(); + return IgniteTypeFactory.INSTANCE; } /** {@inheritDoc} */ @Override public QueryProvider getQueryProvider() { - return null; // TODO + throw new AssertionError("should not be called"); } /** {@inheritDoc} */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index 1eafd66386..8f8103c6d4 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -35,9 +35,9 @@ import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -228,7 +228,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve BaseQueryContext ctx, MultiStepPlan plan ) { - DistributedQueryManager queryManager = new DistributedQueryManager(true, ctx); + DistributedQueryManager queryManager = new DistributedQueryManager(localNode.name(), true, ctx); DistributedQueryManager old = queryManagerMap.put(ctx.queryId(), queryManager); @@ -248,7 +248,6 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve .defaultSchema(sqlSchemaManager.schema(schemaVersion)) .build() ) - .logger(LOG) .build(); } @@ -435,22 +434,22 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve } private void submitFragment(String nodeName, QueryStartRequest msg) { - DistributedQueryManager queryManager = getOrCreateQueryManager(msg); + DistributedQueryManager queryManager = getOrCreateQueryManager(nodeName, msg); queryManager.submitFragment(nodeName, msg.schemaVersion(), msg.root(), msg.fragmentDescription(), msg.txAttributes()); } private void handleError(Throwable ex, String nodeName, QueryStartRequest msg) { - DistributedQueryManager queryManager = getOrCreateQueryManager(msg); + DistributedQueryManager queryManager = getOrCreateQueryManager(nodeName, msg); queryManager.handleError(ex, nodeName, msg.fragmentDescription().fragmentId()); } - private DistributedQueryManager getOrCreateQueryManager(QueryStartRequest msg) { + private DistributedQueryManager getOrCreateQueryManager(String coordinatorNodeName, QueryStartRequest msg) { return queryManagerMap.computeIfAbsent(msg.queryId(), key -> { BaseQueryContext ctx = createQueryContext(key, msg.schemaVersion(), msg.parameters()); - return new DistributedQueryManager(ctx); + return new DistributedQueryManager(coordinatorNodeName, ctx); }); } @@ -460,6 +459,8 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve private class DistributedQueryManager { private final boolean coordinator; + private final String coordinatorNodeName; + private final BaseQueryContext ctx; private final CompletableFuture<Void> cancelFut = new CompletableFuture<>(); @@ -468,29 +469,38 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve private final Map<RemoteFragmentKey, CompletableFuture<Void>> remoteFragmentInitCompletion = new ConcurrentHashMap<>(); - private final Queue<AbstractNode<RowT>> localFragments = new LinkedBlockingQueue<>(); + private final Queue<AbstractNode<RowT>> localFragments = new ConcurrentLinkedQueue<>(); - private final CompletableFuture<AsyncRootNode<RowT, List<Object>>> root; + private final @Nullable CompletableFuture<AsyncRootNode<RowT, List<Object>>> root; private volatile Long rootFragmentId = null; - private DistributedQueryManager(boolean coordinator, BaseQueryContext ctx) { + private DistributedQueryManager( + String coordinatorNodeName, + boolean coordinator, + BaseQueryContext ctx + ) { this.ctx = ctx; this.coordinator = coordinator; + this.coordinatorNodeName = coordinatorNodeName; - var root = new CompletableFuture<AsyncRootNode<RowT, List<Object>>>(); + if (coordinator) { + var root = new CompletableFuture<AsyncRootNode<RowT, List<Object>>>(); - root.exceptionally(t -> { - this.close(true); + root.exceptionally(t -> { + this.close(true); - return null; - }); + return null; + }); - this.root = root; + this.root = root; + } else { + this.root = null; + } } - private DistributedQueryManager(BaseQueryContext ctx) { - this(false, ctx); + private DistributedQueryManager(String coordinatorNodeName, BaseQueryContext ctx) { + this(coordinatorNodeName, false, ctx); } private List<AbstractNode<?>> localFragments() { @@ -604,7 +614,6 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve private ExecutionContext<RowT> createContext(String initiatorNodeName, FragmentDescription desc, TxAttributes txAttributes) { return new ExecutionContext<>( - ctx, taskExecutor, ctx.queryId(), localNode, @@ -623,25 +632,25 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve FragmentDescription desc, TxAttributes txAttributes ) { - CompletableFuture<?> start = new CompletableFuture<>(); // Because fragment execution runs on specific thread selected by taskExecutor, // we should complete dependency resolution on the same thread // that is going to be used for fragment execution. ExecutionContext<RowT> context = createContext(initiatorNode, desc, txAttributes); Executor exec = (r) -> context.execute(r::run, err -> handleError(err, initiatorNode, desc.fragmentId())); - start.thenCompose(none -> { + try { IgniteRel treeRoot = relationalTreeFromJsonString(schemaVersion, fragmentString, ctx); - return dependencyResolver.resolveDependencies(List.of(treeRoot), schemaVersion) - .thenComposeAsync(deps -> executeFragment(treeRoot, deps, context), exec); - }).exceptionally(ex -> { - handleError(ex, initiatorNode, desc.fragmentId()); - - return null; - }); + dependencyResolver.resolveDependencies(List.of(treeRoot), schemaVersion) + .thenComposeAsync(deps -> executeFragment(treeRoot, deps, context), exec) + .exceptionally(ex -> { + handleError(ex, initiatorNode, desc.fragmentId()); - start.complete(null); + return null; + }); + } catch (Exception ex) { + handleError(ex, initiatorNode, desc.fragmentId()); + } } private void handleError(Throwable ex, String initiatorNode, long fragmentId) { @@ -664,6 +673,8 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve } private AsyncCursor<List<Object>> execute(InternalTransaction tx, MultiStepPlan multiStepPlan) { + assert root != null; + mappingService.map(multiStepPlan).whenCompleteAsync((mappedFragments, mappingErr) -> { if (mappingErr != null) { if (!root.completeExceptionally(mappingErr)) { @@ -863,35 +874,37 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve return cancelFut; } - CompletableFuture<Void> start = closeExecNode(cancel); + CompletableFuture<Void> start = new CompletableFuture<>(); - start - .thenCompose(tmp -> { - CompletableFuture<Void> cancelResult = coordinator - ? awaitFragmentInitialisationAndClose() - : closeLocalFragments(); + CompletableFuture<Void> stage; - var finalStepFut = cancelResult.whenComplete((r, e) -> { - if (e != null) { - Throwable ex = ExceptionUtils.unwrapCause(e); + if (coordinator) { + stage = start.thenCompose(ignored -> closeRootNode(cancel)) + .thenCompose(ignored -> awaitFragmentInitialisationAndClose()); + } else { + stage = start.thenCompose(ignored -> messageService.send(coordinatorNodeName, FACTORY.queryCloseMessage() + .queryId(ctx.queryId()) + .build())) + .thenCompose(ignored -> closeLocalFragments()); + } - LOG.warn("Fragment closing processed with errors: [queryId={}]", ex, ctx.queryId()); - } + stage.whenComplete((r, e) -> { + if (e != null) { + Throwable ex = ExceptionUtils.unwrapCause(e); - queryManagerMap.remove(ctx.queryId()); + LOG.warn("Fragment closing processed with errors: [queryId={}]", ex, ctx.queryId()); + } - try { - ctx.cancel().cancel(); - } catch (Exception th) { - LOG.debug("Exception raised while cancel", th); - } + queryManagerMap.remove(ctx.queryId()); - cancelFut.complete(null); - }); + try { + ctx.cancel().cancel(); + } catch (Exception th) { + LOG.debug("Exception raised while cancel", th); + } - return cancelResult.thenCombine(finalStepFut, (none1, none2) -> null); - }) - .thenRun(() -> localFragments.forEach(f -> f.context().cancel())); + cancelFut.complete(null); + }).thenRun(() -> localFragments.forEach(f -> f.context().cancel())); start.completeAsync(() -> null, taskExecutor); @@ -899,14 +912,12 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve } private CompletableFuture<Void> closeLocalFragments() { - QueryCancelledException ex = new QueryCancelledException(); - List<CompletableFuture<?>> localFragmentCompletions = new ArrayList<>(); for (AbstractNode<?> node : localFragments) { assert !node.context().isCancelled() : "node context is cancelled, but node still processed"; localFragmentCompletions.add( - node.context().submit(() -> node.onError(ex), node::onError) + node.context().submit(node::close, node::onError) ); } @@ -960,22 +971,24 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve * @param cancel Forces execution to terminate with {@link QueryCancelledException}. * @return Completable future that should run asynchronously. */ - private CompletableFuture<Void> closeExecNode(boolean cancel) { - CompletableFuture<Void> start = new CompletableFuture<>(); + private CompletableFuture<Void> closeRootNode(boolean cancel) { + assert root != null; - if (!root.completeExceptionally(new QueryCancelledException()) && !root.isCompletedExceptionally()) { - AsyncRootNode<RowT, List<Object>> node = root.getNow(null); + if (!root.isDone()) { + root.completeExceptionally(new QueryCancelledException()); + } - if (!cancel) { - CompletableFuture<Void> closeFut = node.closeAsync(); + if (!root.isCompletedExceptionally()) { + AsyncRootNode<RowT, List<Object>> node = root.getNow(null); - return start.thenCompose(v -> closeFut); + if (cancel) { + node.onError(new QueryCancelledException()); } - node.onError(new QueryCancelledException()); + return node.closeAsync(); } - return start; + return Commons.completedFuture(); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java index 7b4b57acc9..8a0560ea1b 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java @@ -98,30 +98,25 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> { .<String, Scalar>build() .asMap(); - private final IgniteTypeFactory typeFactory; + private static final IgniteTypeFactory TYPE_FACTORY = IgniteTypeFactory.INSTANCE; + private static final RexBuilder REX_BUILDER = IgniteRexBuilder.INSTANCE; + private static final RelDataType NULL_TYPE = TYPE_FACTORY.createSqlType(SqlTypeName.NULL); + private static final RelDataType EMPTY_TYPE = new RelDataTypeFactory.Builder(TYPE_FACTORY).build(); private final SqlConformance conformance; - private final RexBuilder rexBuilder; - - private final RelDataType emptyType; - - private final RelDataType nullType; - private final ExecutionContext<RowT> ctx; /** * Constructor. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ - public ExpressionFactoryImpl(ExecutionContext<RowT> ctx, IgniteTypeFactory typeFactory, SqlConformance conformance) { + public ExpressionFactoryImpl( + ExecutionContext<RowT> ctx, + SqlConformance conformance + ) { this.ctx = ctx; - this.typeFactory = typeFactory; this.conformance = conformance; - - rexBuilder = new IgniteRexBuilder(this.typeFactory); - emptyType = new RelDataTypeFactory.Builder(this.typeFactory).build(); - nullType = typeFactory.createSqlType(SqlTypeName.NULL); } /** {@inheritDoc} */ @@ -254,7 +249,7 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> { /** {@inheritDoc} */ @Override public Supplier<RowT> rowSource(List<RexNode> values) { - List<RelDataType> typeList = Commons.transform(values, v -> v != null ? v.getType() : nullType); + List<RelDataType> typeList = Commons.transform(values, v -> v != null ? v.getType() : NULL_TYPE); RowSchema rowSchema = TypeUtils.rowSchemaFromRelTypes(typeList); return new ValuesImpl(scalar(values, null), ctx.rowHandler().factory(rowSchema)); @@ -292,7 +287,7 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> { List<Class<?>> types = new ArrayList<>(columns); for (RelDataType type : RelOptUtil.getFieldTypeList(rowType)) { - types.add(Primitives.wrap((Class<?>) typeFactory.getJavaClass(type))); + types.add(Primitives.wrap((Class<?>) TYPE_FACTORY.getJavaClass(type))); } List<RowT> rows = new ArrayList<>(values.size() / columns); @@ -517,10 +512,10 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> { private Scalar compile(List<RexNode> nodes, RelDataType type, boolean biInParams) { if (type == null) { - type = emptyType; + type = EMPTY_TYPE; } - RexProgramBuilder programBuilder = new RexProgramBuilder(type, rexBuilder); + RexProgramBuilder programBuilder = new RexProgramBuilder(type, REX_BUILDER); for (RexNode node : nodes) { assert node != null : "unexpected nullable node"; @@ -556,7 +551,7 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> { Function1<String, InputGetter> correlates = new CorrelatesBuilder(builder, ctx, hnd).build(nodes); - List<Expression> projects = RexToLixTranslator.translateProjects(program, typeFactory, conformance, + List<Expression> projects = RexToLixTranslator.translateProjects(program, TYPE_FACTORY, conformance, builder, null, null, ctx, inputGetter, correlates); for (int i = 0; i < projects.size(); i++) { @@ -655,8 +650,8 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> { this.scalar = scalar; hnd = ctx.rowHandler(); - RelDataType booleanType = typeFactory.createSqlType(SqlTypeName.BOOLEAN); - RelDataType nullableType = typeFactory.createTypeWithNullability(booleanType, true); + RelDataType booleanType = TYPE_FACTORY.createSqlType(SqlTypeName.BOOLEAN); + RelDataType nullableType = TYPE_FACTORY.createTypeWithNullability(booleanType, true); RowSchema schema = TypeUtils.rowSchemaFromRelTypes(List.of(nullableType)); out = hnd.factory(schema).create(); @@ -992,7 +987,7 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> { public Expression field(BlockBuilder list, int index, Type desiredType) { Expression fldExpression = fillExpressions(list, index); - Type fieldType = typeFactory.getJavaClass(rowType.getFieldList().get(index).getType()); + Type fieldType = TYPE_FACTORY.getJavaClass(rowType.getFieldList().get(index).getType()); if (desiredType == null) { desiredType = fieldType; diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java index b1350a8227..acd4a13864 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java @@ -151,14 +151,16 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async if (!closed) { Throwable th = ex.get(); - if (th == null) { - th = new QueryCancelledException(); - } + if (!pendingRequests.isEmpty()) { + if (th == null) { + th = new QueryCancelledException(); + } - Throwable th0 = th; + Throwable th0 = th; - pendingRequests.forEach(req -> req.fut.completeExceptionally(th0)); - pendingRequests.clear(); + pendingRequests.forEach(req -> req.fut.completeExceptionally(th0)); + pendingRequests.clear(); + } source.context().execute(() -> { try { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java index 61f9a5db3e..2870c97cca 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java @@ -156,7 +156,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander { rexExecutor = frameworkCfg.getExecutor(); traitDefs = frameworkCfg.getTraitDefs(); - rexBuilder = new IgniteRexBuilder(typeFactory); + rexBuilder = IgniteRexBuilder.INSTANCE; } /** {@inheritDoc} */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanningContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanningContext.java index 050921eff4..c74d75191e 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanningContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanningContext.java @@ -129,7 +129,7 @@ public final class PlanningContext implements Context { /** Get type factory. */ public IgniteTypeFactory typeFactory() { - return unwrap(BaseQueryContext.class).typeFactory(); + return IgniteTypeFactory.INSTANCE; } /** Get new catalog reader. */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rex/IgniteRexBuilder.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rex/IgniteRexBuilder.java index 8885fd025d..c9914c787d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rex/IgniteRexBuilder.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rex/IgniteRexBuilder.java @@ -22,19 +22,21 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; import org.apache.ignite.internal.sql.engine.type.IgniteCustomType; +import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.jetbrains.annotations.Nullable; /** * {@link RexBuilder} that provides support for {@link IgniteCustomType custom data types}. */ public class IgniteRexBuilder extends RexBuilder { + public static final IgniteRexBuilder INSTANCE = new IgniteRexBuilder(IgniteTypeFactory.INSTANCE); /** * Creates a RexBuilder. * * @param typeFactory Type factory */ - public IgniteRexBuilder(RelDataTypeFactory typeFactory) { + private IgniteRexBuilder(RelDataTypeFactory typeFactory) { super(typeFactory); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java index 19f9d5e707..5c6ac58a02 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.schema.DefaultValueGenerator; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; +import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.cache.Cache; import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory; import org.apache.ignite.lang.ErrorGroups.Common; @@ -96,6 +97,10 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager { @Override public CompletableFuture<Void> schemaReadyFuture(int version) { // SqlSchemaManager creates SQL schema lazily on-demand, thus waiting for Catalog version is enough. + if (catalogManager.latestCatalogVersion() >= version) { + return Commons.completedFuture(); + } + return catalogManager.catalogReadyFuture(version); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/type/IgniteTypeFactory.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/type/IgniteTypeFactory.java index a0a2b90192..fac70f4244 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/type/IgniteTypeFactory.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/type/IgniteTypeFactory.java @@ -69,30 +69,30 @@ public class IgniteTypeFactory extends JavaTypeFactoryImpl { private static final SqlIntervalQualifier INTERVAL_QUALIFIER_DAY_TIME = new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO); + public static final IgniteTypeFactory INSTANCE = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE); + + /** Contains java types internally mapped into appropriate rel types. */ + private final Map<Class<?>, Supplier<RelDataType>> implementedJavaTypes = new IdentityHashMap<>(); + /** Default charset. */ private final Charset charset; /** A registry that contains custom data types. **/ private final CustomDataTypes customDataTypes; - /** Contains java types internally mapped into appropriate rel types. */ - private static final Map<Class<?>, Supplier<RelDataType>> implementedJavaTypes = new IdentityHashMap<>(); - { - { - implementedJavaTypes.put(LocalDate.class, () -> - createTypeWithNullability(createSqlType(SqlTypeName.DATE), true)); - implementedJavaTypes.put(LocalTime.class, () -> - createTypeWithNullability(createSqlType(SqlTypeName.TIME), true)); - implementedJavaTypes.put(LocalDateTime.class, () -> - createTypeWithNullability(createSqlType(SqlTypeName.TIMESTAMP), true)); - implementedJavaTypes.put(Instant.class, () -> - createTypeWithNullability(createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE), true)); - implementedJavaTypes.put(Duration.class, () -> - createTypeWithNullability(createSqlIntervalType(INTERVAL_QUALIFIER_DAY_TIME), true)); - implementedJavaTypes.put(Period.class, () -> - createTypeWithNullability(createSqlIntervalType(INTERVAL_QUALIFIER_YEAR_MONTH), true)); - } + implementedJavaTypes.put(LocalDate.class, () -> + createTypeWithNullability(createSqlType(SqlTypeName.DATE), true)); + implementedJavaTypes.put(LocalTime.class, () -> + createTypeWithNullability(createSqlType(SqlTypeName.TIME), true)); + implementedJavaTypes.put(LocalDateTime.class, () -> + createTypeWithNullability(createSqlType(SqlTypeName.TIMESTAMP), true)); + implementedJavaTypes.put(Instant.class, () -> + createTypeWithNullability(createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE), true)); + implementedJavaTypes.put(Duration.class, () -> + createTypeWithNullability(createSqlIntervalType(INTERVAL_QUALIFIER_DAY_TIME), true)); + implementedJavaTypes.put(Period.class, () -> + createTypeWithNullability(createSqlIntervalType(INTERVAL_QUALIFIER_YEAR_MONTH), true)); } /** diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/AbstractQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/AbstractQueryContext.java deleted file mode 100644 index 34e986bbf3..0000000000 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/AbstractQueryContext.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.sql.engine.util; - -import org.apache.calcite.plan.Context; - -/** - * Abstract query context. - */ -public class AbstractQueryContext implements Context { - private final Context parentCtx; - - public AbstractQueryContext(Context parentCtx) { - this.parentCtx = parentCtx; - } - - /** {@inheritDoc} */ - @Override public <C> C unwrap(Class<C> cls) { - if (cls == getClass()) { - return cls.cast(this); - } - - return parentCtx.unwrap(cls); - } -} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java index bf5eb82144..e27f2a0c86 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.sql.engine.util; import static org.apache.calcite.tools.Frameworks.createRootSchema; +import static org.apache.ignite.internal.sql.engine.util.Commons.DISTRIBUTED_TRAITS_SET; import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG; import com.google.common.collect.Multimap; @@ -30,6 +31,7 @@ import org.apache.calcite.config.CalciteConnectionConfig; import org.apache.calcite.config.CalciteConnectionConfigImpl; import org.apache.calcite.config.CalciteConnectionProperty; import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptSchema; @@ -42,13 +44,10 @@ import org.apache.calcite.rel.metadata.MetadataDef; import org.apache.calcite.rel.metadata.MetadataHandler; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.rel.metadata.UnboundMetadata; -import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; -import org.apache.ignite.internal.logger.IgniteLogger; -import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.sql.engine.QueryCancel; import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback; import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory; @@ -56,21 +55,18 @@ import org.apache.ignite.internal.sql.engine.rex.IgniteRexBuilder; import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.util.ArrayUtils; +import org.jetbrains.annotations.Nullable; /** * Base query context. */ -public final class BaseQueryContext extends AbstractQueryContext { - public static final CalciteConnectionConfig CALCITE_CONNECTION_CONFIG; +public final class BaseQueryContext implements Context { + private static final CalciteConnectionConfig CALCITE_CONNECTION_CONFIG; public static final RelOptCluster CLUSTER; - private static final IgniteTypeFactory TYPE_FACTORY; - private static final IgniteCostFactory COST_FACTORY = new IgniteCostFactory(); - private static final BaseQueryContext EMPTY_CONTEXT; - static { Properties props = new Properties(); @@ -83,15 +79,10 @@ public final class BaseQueryContext extends AbstractQueryContext { CALCITE_CONNECTION_CONFIG = new CalciteConnectionConfigImpl(props); - RelDataTypeSystem typeSys = CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class, FRAMEWORK_CONFIG.getTypeSystem()); - - TYPE_FACTORY = createTypeFactory(typeSys); - - RexBuilder defaultRexBuilder = createRexBuilder(TYPE_FACTORY); + RexBuilder defaultRexBuilder = IgniteRexBuilder.INSTANCE; - EMPTY_CONTEXT = builder().build(); - - VolcanoPlanner planner = new VolcanoPlanner(COST_FACTORY, EMPTY_CONTEXT) { + BaseQueryContext emptyContext = builder().queryId(new UUID(0L, 0L)).build(); + VolcanoPlanner planner = new VolcanoPlanner(COST_FACTORY, emptyContext) { @Override public void registerSchema(RelOptSchema schema) { // This method in VolcanoPlanner stores schema in hash map. It can be invoked during relational @@ -101,7 +92,7 @@ public final class BaseQueryContext extends AbstractQueryContext { }; // Dummy planner must contain all trait definitions to create singleton cluster with all default traits. - for (RelTraitDef<?> def : EMPTY_CONTEXT.config().getTraitDefs()) { + for (RelTraitDef<?> def : DISTRIBUTED_TRAITS_SET) { planner.addRelTraitDef(def); } @@ -139,13 +130,9 @@ public final class BaseQueryContext extends AbstractQueryContext { CLUSTER = cluster; } - private final FrameworkConfig cfg; - - private final IgniteLogger log; - - private final IgniteTypeFactory typeFactory; + private final Context parentCtx; - private final RexBuilder rexBuilder; + private final FrameworkConfig cfg; private final QueryCancel cancel; @@ -165,33 +152,30 @@ public final class BaseQueryContext extends AbstractQueryContext { FrameworkConfig cfg, QueryCancel cancel, Object[] parameters, - IgniteLogger log, QueryPrefetchCallback prefetchCallback ) { - super(Contexts.chain(cfg.getContext())); + this.parentCtx = Contexts.chain(cfg.getContext()); // link frameworkConfig#context() to this. this.cfg = Frameworks.newConfigBuilder(cfg).context(this).build(); this.queryId = queryId; - this.log = log; this.cancel = cancel; this.parameters = parameters; this.prefetchCallback = prefetchCallback; - - typeFactory = TYPE_FACTORY; - - assert TYPE_FACTORY.getTypeSystem() == cfg.getTypeSystem(); - - rexBuilder = createRexBuilder(typeFactory); } public static Builder builder() { return new Builder(); } - public static BaseQueryContext empty() { - return EMPTY_CONTEXT; + /** {@inheritDoc} */ + @Override public <C> @Nullable C unwrap(Class<C> cls) { + if (cls == getClass()) { + return cls.cast(this); + } + + return parentCtx.unwrap(cls); } public UUID queryId() { @@ -206,22 +190,10 @@ public final class BaseQueryContext extends AbstractQueryContext { return cfg; } - public IgniteLogger logger() { - return log; - } - public SchemaPlus schema() { return cfg.getDefaultSchema(); } - public IgniteTypeFactory typeFactory() { - return typeFactory; - } - - public RexBuilder rexBuilder() { - return rexBuilder; - } - public int schemaVersion() { return Objects.requireNonNull(schema().unwrap(IgniteSchema.class)).version(); } @@ -248,27 +220,13 @@ public final class BaseQueryContext extends AbstractQueryContext { return catalogReader = new CalciteCatalogReader( CalciteSchema.from(rootSchema), CalciteSchema.from(dfltSchema).path(null), - typeFactory(), CALCITE_CONNECTION_CONFIG); + IgniteTypeFactory.INSTANCE, CALCITE_CONNECTION_CONFIG); } public QueryCancel cancel() { return cancel; } - /** - * Creates a builder object filled with current context attributes. - * - * @return Prefilled builder. - */ - public Builder toBuilder() { - return builder() - .queryId(queryId) - .frameworkConfig(cfg) - .logger(log) - .cancel(cancel) - .parameters(parameters); - } - /** * Query context builder. */ @@ -277,16 +235,14 @@ public final class BaseQueryContext extends AbstractQueryContext { private static final FrameworkConfig EMPTY_CONFIG = Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) .defaultSchema(createRootSchema(false)) - .traitDefs(Commons.DISTRIBUTED_TRAITS_SET) + .traitDefs(DISTRIBUTED_TRAITS_SET) .build(); private FrameworkConfig frameworkCfg = EMPTY_CONFIG; private QueryCancel cancel = new QueryCancel(); - private IgniteLogger log = Loggers.voidLogger(); - - private UUID queryId = UUID.randomUUID(); + private UUID queryId; private Object[] parameters = ArrayUtils.OBJECT_EMPTY_ARRAY; @@ -302,11 +258,6 @@ public final class BaseQueryContext extends AbstractQueryContext { return this; } - public Builder logger(IgniteLogger log) { - this.log = Objects.requireNonNull(log); - return this; - } - public Builder queryId(UUID queryId) { this.queryId = Objects.requireNonNull(queryId); return this; @@ -323,15 +274,7 @@ public final class BaseQueryContext extends AbstractQueryContext { } public BaseQueryContext build() { - return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log, prefetchCallback); + return new BaseQueryContext(Objects.requireNonNull(queryId, "queryId"), frameworkCfg, cancel, parameters, prefetchCallback); } } - - private static IgniteTypeFactory createTypeFactory(RelDataTypeSystem typeSystem) { - return new IgniteTypeFactory(typeSystem); - } - - private static IgniteRexBuilder createRexBuilder(IgniteTypeFactory typeFactory) { - return new IgniteRexBuilder(typeFactory); - } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java index 85bab1af85..950968be67 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java @@ -46,6 +46,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.calcite.DataContexts; @@ -116,6 +117,8 @@ import org.jetbrains.annotations.Nullable; * Utility methods. */ public final class Commons { + private static final CompletableFuture<Void> COMPLETED_FUTURE = CompletableFuture.completedFuture(null); + public static final String IMPLICIT_PK_COL_NAME = "__p_key"; public static final int IN_BUFFER_SIZE = 512; @@ -176,6 +179,10 @@ public final class Commons { private Commons() { } + public static CompletableFuture<Void> completedFuture() { + return COMPLETED_FUTURE; + } + private static SqlTypeCoercionRule standardCompatibleCoercionRules() { return SqlTypeCoercionRule.instance(IgniteCustomAssigmentsRules.instance().getTypeMapping()); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java index b2fbfd294d..c8a84e9d13 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java @@ -408,7 +408,7 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { await(batchFut.exceptionally(ex -> { assertInstanceOf(CompletionException.class, ex); - assertInstanceOf(RemoteFragmentExecutionException.class, ex.getCause()); + assertInstanceOf(QueryCancelledException.class, ex.getCause()); assertNull(ex.getCause().getCause()); return null; @@ -604,6 +604,7 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { ExecutionService execService = executionServices.get(0); Function<QueryPrefetchCallback, BaseQueryContext> createCtx = (callback) -> BaseQueryContext.builder() + .queryId(UUID.randomUUID()) .cancel(new QueryCancel()) .prefetchCallback(callback) .frameworkConfig( @@ -611,7 +612,6 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { .defaultSchema(wrap(schema)) .build() ) - .logger(log) .build(); QueryPrefetchCallback prefetchListener = new QueryPrefetchCallback() { @@ -663,6 +663,7 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { IgniteInternalException expectedException = new IgniteInternalException(Common.INTERNAL_ERR, "Expected exception"); BaseQueryContext ctx = BaseQueryContext.builder() + .queryId(UUID.randomUUID()) .cancel(new QueryCancel()) .prefetchCallback(prefetchFut::completeExceptionally) .frameworkConfig( @@ -670,7 +671,6 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { .defaultSchema(wrap(schema)) .build() ) - .logger(log) .build(); testCluster.node(nodeNames.get(2)).interceptor((nodeName, msg, original) -> { @@ -784,13 +784,13 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { private BaseQueryContext createContext() { return BaseQueryContext.builder() + .queryId(UUID.randomUUID()) .cancel(new QueryCancel()) .frameworkConfig( Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) .defaultSchema(wrap(schema)) .build() ) - .logger(log) .build(); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java index 2266e87547..fcc1734e2d 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java @@ -36,7 +36,6 @@ import org.apache.calcite.util.ImmutableIntList; import org.apache.calcite.util.Pair; import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; -import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.testframework.IgniteAbstractTest; @@ -114,9 +113,6 @@ public class RuntimeSortedIndexTest extends IgniteAbstractTest { private RuntimeSortedIndex<Object[]> generate(RelDataType rowType, final List<Integer> idxCols, int notUnique) { RuntimeSortedIndex<Object[]> idx = new RuntimeSortedIndex<>( new ExecutionContext<>( - BaseQueryContext.builder() - .logger(log) - .build(), null, UUID.randomUUID(), new ClusterNodeImpl("1", "fake-test-node", NetworkAddress.from("127.0.0.1:1111")), diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java index 46d1c20084..b9716646cb 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java @@ -93,7 +93,7 @@ public class ExpressionFactoryImplTest extends BaseIgniteAbstractTest { .executor(Mockito.mock(QueryTaskExecutor.class)) .build(); - expFactory = new ExpressionFactoryImpl<>(ctx, typeFactory, SqlConformanceEnum.DEFAULT); + expFactory = new ExpressionFactoryImpl<>(ctx, SqlConformanceEnum.DEFAULT); } @Test diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java index fb3a07f4b2..c24276cfcf 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java @@ -49,7 +49,6 @@ import org.apache.ignite.internal.sql.engine.exec.TxAttributes; import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription; import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction; -import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.thread.LogUncaughtExceptionHandler; @@ -112,9 +111,6 @@ public abstract class AbstractExecutionTest<T> extends IgniteAbstractTest { FragmentDescription fragmentDesc = new FragmentDescription(0, true, Long2ObjectMaps.emptyMap(), null, null); return new ExecutionContext<>( - BaseQueryContext.builder() - .logger(log) - .build(), taskExecutor, UUID.randomUUID(), new ClusterNodeImpl("1", "fake-test-node", NetworkAddress.from("127.0.0.1:1111")), diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java index 76ed7d7a99..fe45def647 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java @@ -45,8 +45,6 @@ import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl; import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; -import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; -import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.type.NativeTypes; import org.junit.jupiter.params.ParameterizedTest; @@ -485,13 +483,11 @@ public class MergeJoinExecutionTest extends AbstractExecutionTest<Object[]> { RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING)); ScanNode<Object[]> rightNode = new ScanNode<>(ctx, Arrays.asList(right)); - IgniteTypeFactory typeFactory = Commons.typeFactory(); - ExecutionContext<Object[]> ectx = - new ExecutionContext<>(BaseQueryContext.builder().logger(log).build(), null, null, null, + new ExecutionContext<>(null, null, null, null, null, ArrayRowHandler.INSTANCE, null, null); - ExpressionFactoryImpl<Object[]> expFactory = new ExpressionFactoryImpl<>(ectx, typeFactory, SqlConformanceEnum.DEFAULT); + ExpressionFactoryImpl<Object[]> expFactory = new ExpressionFactoryImpl<>(ectx, SqlConformanceEnum.DEFAULT); RelFieldCollation colLeft = new RelFieldCollation(2, Direction.ASCENDING, NullDirection.FIRST); RelFieldCollation colRight = new RelFieldCollation(0, Direction.ASCENDING, NullDirection.FIRST); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java index c94b2470b8..2fc650a84f 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java @@ -87,7 +87,6 @@ import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl; import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; -import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory; import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory; import org.apache.ignite.internal.systemview.SystemViewManagerImpl; @@ -453,7 +452,6 @@ public class TestBuilders { @Override public ExecutionContext<Object[]> build() { return new ExecutionContext<>( - BaseQueryContext.builder().build(), Objects.requireNonNull(executor, "executor"), queryId, Objects.requireNonNull(node, "node"), diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java index 9cbe9ee1a7..043d1d5b7e 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java @@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import org.apache.calcite.tools.Frameworks; import org.apache.ignite.internal.manager.IgniteComponent; @@ -224,6 +225,7 @@ public class TestNode implements LifecycleAware { private BaseQueryContext createContext() { return BaseQueryContext.builder() + .queryId(UUID.randomUUID()) .cancel(new QueryCancel()) .frameworkConfig( Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java index cab0e9e5de..7e662c00b1 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java @@ -269,13 +269,13 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest { } return BaseQueryContext.builder() + .queryId(UUID.randomUUID()) .frameworkConfig( newConfigBuilder(FRAMEWORK_CONFIG) .defaultSchema(dfltSchema) .sqlToRelConverterConfig(relConvCfg) .build() ) - .logger(log) .parameters(params) .build(); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java index 7e4ef3c06e..7d588f119c 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.function.Predicate; import java.util.function.UnaryOperator; import org.apache.calcite.plan.RelOptUtil; @@ -119,7 +120,7 @@ public class PlannerTest extends AbstractPlannerTest { PlanningContext ctx = PlanningContext.builder() .parentContext(BaseQueryContext.builder() - .logger(log) + .queryId(UUID.randomUUID()) .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG) .defaultSchema(schema) .costFactory(new IgniteCostFactory(1, 100, 1, 1)) diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/ddl/AbstractDdlSqlToCommandConverterTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/ddl/AbstractDdlSqlToCommandConverterTest.java index 698ddff2a5..52aa0fcaa1 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/ddl/AbstractDdlSqlToCommandConverterTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/ddl/AbstractDdlSqlToCommandConverterTest.java @@ -22,6 +22,7 @@ import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFI import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; @@ -59,6 +60,7 @@ class AbstractDdlSqlToCommandConverterTest extends BaseIgniteAbstractTest { return PlanningContext.builder() .parentContext(BaseQueryContext.builder() + .queryId(UUID.randomUUID()) .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG) .defaultSchema(schema) .build())