This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ebd9e842a IGNITE-20763 Sql. Clean up query execution (#2770)
8ebd9e842a is described below

commit 8ebd9e842a2aaec5e1ae7e3b5e1cb36a040dc565
Author: korlov42 <kor...@gridgain.com>
AuthorDate: Thu Nov 2 13:41:09 2023 +0200

    IGNITE-20763 Sql. Clean up query execution (#2770)
---
 .../org/apache/ignite/internal/logger/Loggers.java |   2 +-
 .../apache/ignite/internal/logger/VoidLogger.java  |   4 +-
 .../internal/benchmark/SqlOneNodeBenchmark.java    |  25 ++--
 .../internal/sql/engine/SqlQueryProcessor.java     |   7 +-
 .../internal/sql/engine/exec/ExecutionContext.java |  21 +---
 .../sql/engine/exec/ExecutionServiceImpl.java      | 139 +++++++++++----------
 .../sql/engine/exec/exp/ExpressionFactoryImpl.java |  37 +++---
 .../sql/engine/exec/rel/AsyncRootNode.java         |  14 ++-
 .../internal/sql/engine/prepare/IgnitePlanner.java |   2 +-
 .../sql/engine/prepare/PlanningContext.java        |   2 +-
 .../internal/sql/engine/rex/IgniteRexBuilder.java  |   4 +-
 .../sql/engine/schema/SqlSchemaManagerImpl.java    |   5 +
 .../sql/engine/type/IgniteTypeFactory.java         |  34 ++---
 .../sql/engine/util/AbstractQueryContext.java      |  40 ------
 .../internal/sql/engine/util/BaseQueryContext.java | 103 ++++-----------
 .../ignite/internal/sql/engine/util/Commons.java   |   7 ++
 .../sql/engine/exec/ExecutionServiceImplTest.java  |   8 +-
 .../sql/engine/exec/RuntimeSortedIndexTest.java    |   4 -
 .../engine/exec/exp/ExpressionFactoryImplTest.java |   2 +-
 .../sql/engine/exec/rel/AbstractExecutionTest.java |   4 -
 .../engine/exec/rel/MergeJoinExecutionTest.java    |   8 +-
 .../sql/engine/framework/TestBuilders.java         |   2 -
 .../internal/sql/engine/framework/TestNode.java    |   2 +
 .../sql/engine/planner/AbstractPlannerTest.java    |   2 +-
 .../internal/sql/engine/planner/PlannerTest.java   |   3 +-
 .../ddl/AbstractDdlSqlToCommandConverterTest.java  |   2 +
 26 files changed, 201 insertions(+), 282 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/logger/Loggers.java 
b/modules/core/src/main/java/org/apache/ignite/internal/logger/Loggers.java
index 33e64d839d..5cbe8aec7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/logger/Loggers.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/logger/Loggers.java
@@ -82,6 +82,6 @@ public final class Loggers {
      * @return Void logger.
      */
     public static IgniteLogger voidLogger() {
-        return new VoidLogger();
+        return VoidLogger.INSTANCE;
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/logger/VoidLogger.java 
b/modules/core/src/main/java/org/apache/ignite/internal/logger/VoidLogger.java
index bb7b901b78..233473d73c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/logger/VoidLogger.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/logger/VoidLogger.java
@@ -24,10 +24,12 @@ import org.jetbrains.annotations.Nullable;
  * Logger which does not output anything.
  */
 class VoidLogger extends IgniteLogger {
+    static final VoidLogger INSTANCE = new VoidLogger();
+
     /**
      * Creates null logger.
      */
-    VoidLogger() {
+    private VoidLogger() {
         super(System.getLogger(VoidLogger.class.getName()));
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java
index 711ef16845..0762242cbd 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java
@@ -45,12 +45,12 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
  * Benchmark that runs sql queries via embedded client on single node cluster.
  */
 @State(Scope.Benchmark)
-@OutputTimeUnit(TimeUnit.MILLISECONDS)
-@BenchmarkMode(Mode.AverageTime)
-@Warmup(iterations = 3, time = 5)
-@Measurement(iterations = 5, time = 5)
-@Threads(1)
 @Fork(1)
+@Threads(1)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
 @SuppressWarnings({"WeakerAccess", "unused"})
 public class SqlOneNodeBenchmark extends AbstractOneNodeBenchmark {
     private static final int TABLE_SIZE = 30_000;
@@ -122,8 +122,6 @@ public class SqlOneNodeBenchmark extends 
AbstractOneNodeBenchmark {
 
     /** Benchmark that measures performance of `SELECT *` query over entire 
table. */
     @Benchmark
-    @Warmup(iterations = 3, time = 5)
-    @Measurement(iterations = 5, time = 5)
     public void selectAll(Blackhole bh) {
         try (var rs = session.execute(null, "SELECT * FROM usertable")) {
             while (rs.hasNext()) {
@@ -132,6 +130,19 @@ public class SqlOneNodeBenchmark extends 
AbstractOneNodeBenchmark {
         }
     }
 
+    /**
+     * Benchmark to measure overhead of query initialisation.
+     */
+    @Benchmark
+    @OutputTimeUnit(TimeUnit.MICROSECONDS)
+    public void selectAllFromSystemRange(Blackhole bh) {
+        try (var rs = session.execute(null, "SELECT * FROM 
TABLE(system_range(0, 1))")) {
+            while (rs.hasNext()) {
+                bh.consume(rs.next());
+            }
+        }
+    }
+
     /**
      * Benchmark's entry point.
      */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 59b17b5c7d..d0f43740f5 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -45,8 +46,6 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.SchemaManager;
@@ -113,8 +112,6 @@ import org.jetbrains.annotations.TestOnly;
  *  TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
 public class SqlQueryProcessor implements QueryProcessor {
-    private static final IgniteLogger LOG = 
Loggers.forClass(SqlQueryProcessor.class);
-
     /** Size of the cache for query plans. */
     private static final int PLAN_CACHE_SIZE = 1024;
 
@@ -453,7 +450,7 @@ public class SqlQueryProcessor implements QueryProcessor {
                     .thenCompose(schema -> {
                         BaseQueryContext ctx = BaseQueryContext.builder()
                                 
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build())
-                                .logger(LOG)
+                                .queryId(UUID.randomUUID())
                                 .cancel(queryCancel)
                                 .parameters(params)
                                 .build();
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 91c5bdfc83..6f883d516d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
+import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import java.lang.reflect.Type;
@@ -41,8 +42,6 @@ import 
org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
-import org.apache.ignite.internal.sql.engine.util.AbstractQueryContext;
-import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
@@ -50,7 +49,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Runtime context allowing access to the tables in a database.
  */
-public class ExecutionContext<RowT> extends AbstractQueryContext implements 
DataContext {
+public class ExecutionContext<RowT> implements DataContext {
     private static final IgniteLogger LOG = 
Loggers.forClass(ExecutionContext.class);
 
     private static final TimeZone TIME_ZONE = TimeZone.getDefault(); // TODO 
DistributedSqlConfiguration#timeZone
@@ -60,8 +59,6 @@ public class ExecutionContext<RowT> extends 
AbstractQueryContext implements Data
      */
     private static final Locale LOCALE = Locale.ENGLISH;
 
-    private final BaseQueryContext qctx;
-
     private final QueryTaskExecutor executor;
 
     private final UUID qryId;
@@ -94,7 +91,6 @@ public class ExecutionContext<RowT> extends 
AbstractQueryContext implements Data
      * Constructor.
      *
      * @param executor Task executor.
-     * @param qctx Base query context.
      * @param qryId Query ID.
      * @param description Partitions information.
      * @param handler Row handler.
@@ -102,7 +98,6 @@ public class ExecutionContext<RowT> extends 
AbstractQueryContext implements Data
      */
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     public ExecutionContext(
-            BaseQueryContext qctx,
             QueryTaskExecutor executor,
             UUID qryId,
             ClusterNode localNode,
@@ -112,10 +107,7 @@ public class ExecutionContext<RowT> extends 
AbstractQueryContext implements Data
             Map<String, Object> params,
             TxAttributes txAttributes
     ) {
-        super(qctx);
-
         this.executor = executor;
-        this.qctx = qctx;
         this.qryId = qryId;
         this.description = description;
         this.handler = handler;
@@ -126,8 +118,7 @@ public class ExecutionContext<RowT> extends 
AbstractQueryContext implements Data
 
         expressionFactory = new ExpressionFactoryImpl<>(
                 this,
-                this.qctx.typeFactory(),
-                this.qctx.config().getParserConfig().conformance()
+                FRAMEWORK_CONFIG.getParserConfig().conformance()
         );
 
         long ts = System.currentTimeMillis();
@@ -215,19 +206,19 @@ public class ExecutionContext<RowT> extends 
AbstractQueryContext implements Data
     /** {@inheritDoc} */
     @Override
     public SchemaPlus getRootSchema() {
-        throw new AssertionError("getRootSchema");
+        throw new AssertionError("should not be called");
     }
 
     /** {@inheritDoc} */
     @Override
     public IgniteTypeFactory getTypeFactory() {
-        return qctx.typeFactory();
+        return IgniteTypeFactory.INSTANCE;
     }
 
     /** {@inheritDoc} */
     @Override
     public QueryProvider getQueryProvider() {
-        return null; // TODO
+        throw new AssertionError("should not be called");
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 1eafd66386..8f8103c6d4 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -35,9 +35,9 @@ import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
@@ -228,7 +228,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             BaseQueryContext ctx,
             MultiStepPlan plan
     ) {
-        DistributedQueryManager queryManager = new 
DistributedQueryManager(true, ctx);
+        DistributedQueryManager queryManager = new 
DistributedQueryManager(localNode.name(), true, ctx);
 
         DistributedQueryManager old = queryManagerMap.put(ctx.queryId(), 
queryManager);
 
@@ -248,7 +248,6 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                                 
.defaultSchema(sqlSchemaManager.schema(schemaVersion))
                                 .build()
                 )
-                .logger(LOG)
                 .build();
     }
 
@@ -435,22 +434,22 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     }
 
     private void submitFragment(String nodeName, QueryStartRequest msg) {
-        DistributedQueryManager queryManager = getOrCreateQueryManager(msg);
+        DistributedQueryManager queryManager = 
getOrCreateQueryManager(nodeName, msg);
 
         queryManager.submitFragment(nodeName, msg.schemaVersion(), msg.root(), 
msg.fragmentDescription(), msg.txAttributes());
     }
 
     private void handleError(Throwable ex, String nodeName, QueryStartRequest 
msg) {
-        DistributedQueryManager queryManager = getOrCreateQueryManager(msg);
+        DistributedQueryManager queryManager = 
getOrCreateQueryManager(nodeName, msg);
 
         queryManager.handleError(ex, nodeName, 
msg.fragmentDescription().fragmentId());
     }
 
-    private DistributedQueryManager getOrCreateQueryManager(QueryStartRequest 
msg) {
+    private DistributedQueryManager getOrCreateQueryManager(String 
coordinatorNodeName, QueryStartRequest msg) {
         return queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
             BaseQueryContext ctx = createQueryContext(key, 
msg.schemaVersion(), msg.parameters());
 
-            return new DistributedQueryManager(ctx);
+            return new DistributedQueryManager(coordinatorNodeName, ctx);
         });
     }
 
@@ -460,6 +459,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     private class DistributedQueryManager {
         private final boolean coordinator;
 
+        private final String coordinatorNodeName;
+
         private final BaseQueryContext ctx;
 
         private final CompletableFuture<Void> cancelFut = new 
CompletableFuture<>();
@@ -468,29 +469,38 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
         private final Map<RemoteFragmentKey, CompletableFuture<Void>> 
remoteFragmentInitCompletion = new ConcurrentHashMap<>();
 
-        private final Queue<AbstractNode<RowT>> localFragments = new 
LinkedBlockingQueue<>();
+        private final Queue<AbstractNode<RowT>> localFragments = new 
ConcurrentLinkedQueue<>();
 
-        private final CompletableFuture<AsyncRootNode<RowT, List<Object>>> 
root;
+        private final @Nullable CompletableFuture<AsyncRootNode<RowT, 
List<Object>>> root;
 
         private volatile Long rootFragmentId = null;
 
-        private DistributedQueryManager(boolean coordinator, BaseQueryContext 
ctx) {
+        private DistributedQueryManager(
+                String coordinatorNodeName,
+                boolean coordinator,
+                BaseQueryContext ctx
+        ) {
             this.ctx = ctx;
             this.coordinator = coordinator;
+            this.coordinatorNodeName = coordinatorNodeName;
 
-            var root = new CompletableFuture<AsyncRootNode<RowT, 
List<Object>>>();
+            if (coordinator) {
+                var root = new CompletableFuture<AsyncRootNode<RowT, 
List<Object>>>();
 
-            root.exceptionally(t -> {
-                this.close(true);
+                root.exceptionally(t -> {
+                    this.close(true);
 
-                return null;
-            });
+                    return null;
+                });
 
-            this.root = root;
+                this.root = root;
+            } else {
+                this.root = null;
+            }
         }
 
-        private DistributedQueryManager(BaseQueryContext ctx) {
-            this(false, ctx);
+        private DistributedQueryManager(String coordinatorNodeName, 
BaseQueryContext ctx) {
+            this(coordinatorNodeName, false, ctx);
         }
 
         private List<AbstractNode<?>> localFragments() {
@@ -604,7 +614,6 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
         private ExecutionContext<RowT> createContext(String initiatorNodeName, 
FragmentDescription desc, TxAttributes txAttributes) {
             return new ExecutionContext<>(
-                    ctx,
                     taskExecutor,
                     ctx.queryId(),
                     localNode,
@@ -623,25 +632,25 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 FragmentDescription desc,
                 TxAttributes txAttributes
         ) {
-            CompletableFuture<?> start = new CompletableFuture<>();
             // Because fragment execution runs on specific thread selected by 
taskExecutor,
             // we should complete dependency resolution on the same thread
             // that is going to be used for fragment execution.
             ExecutionContext<RowT> context = createContext(initiatorNode, 
desc, txAttributes);
             Executor exec = (r) -> context.execute(r::run, err -> 
handleError(err, initiatorNode, desc.fragmentId()));
 
-            start.thenCompose(none -> {
+            try {
                 IgniteRel treeRoot = 
relationalTreeFromJsonString(schemaVersion, fragmentString, ctx);
 
-                return 
dependencyResolver.resolveDependencies(List.of(treeRoot), schemaVersion)
-                        .thenComposeAsync(deps -> executeFragment(treeRoot, 
deps, context), exec);
-            }).exceptionally(ex -> {
-                handleError(ex, initiatorNode, desc.fragmentId());
-
-                return null;
-            });
+                dependencyResolver.resolveDependencies(List.of(treeRoot), 
schemaVersion)
+                        .thenComposeAsync(deps -> executeFragment(treeRoot, 
deps, context), exec)
+                        .exceptionally(ex -> {
+                            handleError(ex, initiatorNode, desc.fragmentId());
 
-            start.complete(null);
+                            return null;
+                        });
+            } catch (Exception ex) {
+                handleError(ex, initiatorNode, desc.fragmentId());
+            }
         }
 
         private void handleError(Throwable ex, String initiatorNode, long 
fragmentId) {
@@ -664,6 +673,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         }
 
         private AsyncCursor<List<Object>> execute(InternalTransaction tx, 
MultiStepPlan multiStepPlan) {
+            assert root != null;
+
             
mappingService.map(multiStepPlan).whenCompleteAsync((mappedFragments, 
mappingErr) -> {
                 if (mappingErr != null) {
                     if (!root.completeExceptionally(mappingErr)) {
@@ -863,35 +874,37 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 return cancelFut;
             }
 
-            CompletableFuture<Void> start = closeExecNode(cancel);
+            CompletableFuture<Void> start = new CompletableFuture<>();
 
-            start
-                    .thenCompose(tmp -> {
-                        CompletableFuture<Void> cancelResult = coordinator
-                                ? awaitFragmentInitialisationAndClose()
-                                : closeLocalFragments();
+            CompletableFuture<Void> stage;
 
-                        var finalStepFut = cancelResult.whenComplete((r, e) -> 
{
-                            if (e != null) {
-                                Throwable ex = ExceptionUtils.unwrapCause(e);
+            if (coordinator) {
+                stage = start.thenCompose(ignored -> closeRootNode(cancel))
+                        .thenCompose(ignored -> 
awaitFragmentInitialisationAndClose());
+            } else {
+                stage = start.thenCompose(ignored -> 
messageService.send(coordinatorNodeName, FACTORY.queryCloseMessage()
+                                .queryId(ctx.queryId())
+                                .build()))
+                        .thenCompose(ignored -> closeLocalFragments());
+            }
 
-                                LOG.warn("Fragment closing processed with 
errors: [queryId={}]", ex, ctx.queryId());
-                            }
+            stage.whenComplete((r, e) -> {
+                if (e != null) {
+                    Throwable ex = ExceptionUtils.unwrapCause(e);
 
-                            queryManagerMap.remove(ctx.queryId());
+                    LOG.warn("Fragment closing processed with errors: 
[queryId={}]", ex, ctx.queryId());
+                }
 
-                            try {
-                                ctx.cancel().cancel();
-                            } catch (Exception th) {
-                                LOG.debug("Exception raised while cancel", th);
-                            }
+                queryManagerMap.remove(ctx.queryId());
 
-                            cancelFut.complete(null);
-                        });
+                try {
+                    ctx.cancel().cancel();
+                } catch (Exception th) {
+                    LOG.debug("Exception raised while cancel", th);
+                }
 
-                        return cancelResult.thenCombine(finalStepFut, (none1, 
none2) -> null);
-                    })
-                    .thenRun(() -> localFragments.forEach(f -> 
f.context().cancel()));
+                cancelFut.complete(null);
+            }).thenRun(() -> localFragments.forEach(f -> 
f.context().cancel()));
 
             start.completeAsync(() -> null, taskExecutor);
 
@@ -899,14 +912,12 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         }
 
         private CompletableFuture<Void> closeLocalFragments() {
-            QueryCancelledException ex = new QueryCancelledException();
-
             List<CompletableFuture<?>> localFragmentCompletions = new 
ArrayList<>();
             for (AbstractNode<?> node : localFragments) {
                 assert !node.context().isCancelled() : "node context is 
cancelled, but node still processed";
 
                 localFragmentCompletions.add(
-                        node.context().submit(() -> node.onError(ex), 
node::onError)
+                        node.context().submit(node::close, node::onError)
                 );
             }
 
@@ -960,22 +971,24 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
          * @param cancel Forces execution to terminate with {@link 
QueryCancelledException}.
          * @return Completable future that should run asynchronously.
          */
-        private CompletableFuture<Void> closeExecNode(boolean cancel) {
-            CompletableFuture<Void> start = new CompletableFuture<>();
+        private CompletableFuture<Void> closeRootNode(boolean cancel) {
+            assert root != null;
 
-            if (!root.completeExceptionally(new QueryCancelledException()) && 
!root.isCompletedExceptionally()) {
-                AsyncRootNode<RowT, List<Object>> node = root.getNow(null);
+            if (!root.isDone()) {
+                root.completeExceptionally(new QueryCancelledException());
+            }
 
-                if (!cancel) {
-                    CompletableFuture<Void> closeFut = node.closeAsync();
+            if (!root.isCompletedExceptionally()) {
+                AsyncRootNode<RowT, List<Object>> node = root.getNow(null);
 
-                    return start.thenCompose(v -> closeFut);
+                if (cancel) {
+                    node.onError(new QueryCancelledException());
                 }
 
-                node.onError(new QueryCancelledException());
+                return node.closeAsync();
             }
 
-            return start;
+            return Commons.completedFuture();
         }
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
index 7b4b57acc9..8a0560ea1b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
@@ -98,30 +98,25 @@ public class ExpressionFactoryImpl<RowT> implements 
ExpressionFactory<RowT> {
             .<String, Scalar>build()
             .asMap();
 
-    private final IgniteTypeFactory typeFactory;
+    private static final IgniteTypeFactory TYPE_FACTORY = 
IgniteTypeFactory.INSTANCE;
+    private static final RexBuilder REX_BUILDER = IgniteRexBuilder.INSTANCE;
+    private static final RelDataType NULL_TYPE = 
TYPE_FACTORY.createSqlType(SqlTypeName.NULL);
+    private static final RelDataType EMPTY_TYPE = new 
RelDataTypeFactory.Builder(TYPE_FACTORY).build();
 
     private final SqlConformance conformance;
 
-    private final RexBuilder rexBuilder;
-
-    private final RelDataType emptyType;
-
-    private final RelDataType nullType;
-
     private final ExecutionContext<RowT> ctx;
 
     /**
      * Constructor.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    public ExpressionFactoryImpl(ExecutionContext<RowT> ctx, IgniteTypeFactory 
typeFactory, SqlConformance conformance) {
+    public ExpressionFactoryImpl(
+            ExecutionContext<RowT> ctx,
+            SqlConformance conformance
+    ) {
         this.ctx = ctx;
-        this.typeFactory = typeFactory;
         this.conformance = conformance;
-
-        rexBuilder = new IgniteRexBuilder(this.typeFactory);
-        emptyType = new RelDataTypeFactory.Builder(this.typeFactory).build();
-        nullType = typeFactory.createSqlType(SqlTypeName.NULL);
     }
 
     /** {@inheritDoc} */
@@ -254,7 +249,7 @@ public class ExpressionFactoryImpl<RowT> implements 
ExpressionFactory<RowT> {
     /** {@inheritDoc} */
     @Override
     public Supplier<RowT> rowSource(List<RexNode> values) {
-        List<RelDataType> typeList = Commons.transform(values, v -> v != null 
? v.getType() : nullType);
+        List<RelDataType> typeList = Commons.transform(values, v -> v != null 
? v.getType() : NULL_TYPE);
         RowSchema rowSchema = TypeUtils.rowSchemaFromRelTypes(typeList);
 
         return new ValuesImpl(scalar(values, null), 
ctx.rowHandler().factory(rowSchema));
@@ -292,7 +287,7 @@ public class ExpressionFactoryImpl<RowT> implements 
ExpressionFactory<RowT> {
 
         List<Class<?>> types = new ArrayList<>(columns);
         for (RelDataType type : RelOptUtil.getFieldTypeList(rowType)) {
-            types.add(Primitives.wrap((Class<?>) 
typeFactory.getJavaClass(type)));
+            types.add(Primitives.wrap((Class<?>) 
TYPE_FACTORY.getJavaClass(type)));
         }
 
         List<RowT> rows = new ArrayList<>(values.size() / columns);
@@ -517,10 +512,10 @@ public class ExpressionFactoryImpl<RowT> implements 
ExpressionFactory<RowT> {
 
     private Scalar compile(List<RexNode> nodes, RelDataType type, boolean 
biInParams) {
         if (type == null) {
-            type = emptyType;
+            type = EMPTY_TYPE;
         }
 
-        RexProgramBuilder programBuilder = new RexProgramBuilder(type, 
rexBuilder);
+        RexProgramBuilder programBuilder = new RexProgramBuilder(type, 
REX_BUILDER);
 
         for (RexNode node : nodes) {
             assert node != null : "unexpected nullable node";
@@ -556,7 +551,7 @@ public class ExpressionFactoryImpl<RowT> implements 
ExpressionFactory<RowT> {
 
         Function1<String, InputGetter> correlates = new 
CorrelatesBuilder(builder, ctx, hnd).build(nodes);
 
-        List<Expression> projects = 
RexToLixTranslator.translateProjects(program, typeFactory, conformance,
+        List<Expression> projects = 
RexToLixTranslator.translateProjects(program, TYPE_FACTORY, conformance,
                 builder, null, null, ctx, inputGetter, correlates);
 
         for (int i = 0; i < projects.size(); i++) {
@@ -655,8 +650,8 @@ public class ExpressionFactoryImpl<RowT> implements 
ExpressionFactory<RowT> {
             this.scalar = scalar;
             hnd = ctx.rowHandler();
 
-            RelDataType booleanType = 
typeFactory.createSqlType(SqlTypeName.BOOLEAN);
-            RelDataType nullableType = 
typeFactory.createTypeWithNullability(booleanType, true);
+            RelDataType booleanType = 
TYPE_FACTORY.createSqlType(SqlTypeName.BOOLEAN);
+            RelDataType nullableType = 
TYPE_FACTORY.createTypeWithNullability(booleanType, true);
             RowSchema schema = 
TypeUtils.rowSchemaFromRelTypes(List.of(nullableType));
 
             out = hnd.factory(schema).create();
@@ -992,7 +987,7 @@ public class ExpressionFactoryImpl<RowT> implements 
ExpressionFactory<RowT> {
         public Expression field(BlockBuilder list, int index, Type 
desiredType) {
             Expression fldExpression = fillExpressions(list, index);
 
-            Type fieldType = 
typeFactory.getJavaClass(rowType.getFieldList().get(index).getType());
+            Type fieldType = 
TYPE_FACTORY.getJavaClass(rowType.getFieldList().get(index).getType());
 
             if (desiredType == null) {
                 desiredType = fieldType;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
index b1350a8227..acd4a13864 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
@@ -151,14 +151,16 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
                 if (!closed) {
                     Throwable th = ex.get();
 
-                    if (th == null) {
-                        th = new QueryCancelledException();
-                    }
+                    if (!pendingRequests.isEmpty()) {
+                        if (th == null) {
+                            th = new QueryCancelledException();
+                        }
 
-                    Throwable th0 = th;
+                        Throwable th0 = th;
 
-                    pendingRequests.forEach(req -> 
req.fut.completeExceptionally(th0));
-                    pendingRequests.clear();
+                        pendingRequests.forEach(req -> 
req.fut.completeExceptionally(th0));
+                        pendingRequests.clear();
+                    }
 
                     source.context().execute(() -> {
                         try {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java
index 61f9a5db3e..2870c97cca 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java
@@ -156,7 +156,7 @@ public class IgnitePlanner implements Planner, 
RelOptTable.ViewExpander {
         rexExecutor = frameworkCfg.getExecutor();
         traitDefs = frameworkCfg.getTraitDefs();
 
-        rexBuilder = new IgniteRexBuilder(typeFactory);
+        rexBuilder = IgniteRexBuilder.INSTANCE;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanningContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanningContext.java
index 050921eff4..c74d75191e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanningContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanningContext.java
@@ -129,7 +129,7 @@ public final class PlanningContext implements Context {
 
     /** Get type factory. */
     public IgniteTypeFactory typeFactory() {
-        return unwrap(BaseQueryContext.class).typeFactory();
+        return IgniteTypeFactory.INSTANCE;
     }
 
     /** Get new catalog reader. */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rex/IgniteRexBuilder.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rex/IgniteRexBuilder.java
index 8885fd025d..c9914c787d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rex/IgniteRexBuilder.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rex/IgniteRexBuilder.java
@@ -22,19 +22,21 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.ignite.internal.sql.engine.type.IgniteCustomType;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * {@link RexBuilder} that provides support for {@link IgniteCustomType custom 
data types}.
  */
 public class IgniteRexBuilder extends RexBuilder {
+    public static final IgniteRexBuilder INSTANCE = new 
IgniteRexBuilder(IgniteTypeFactory.INSTANCE);
 
     /**
      * Creates a RexBuilder.
      *
      * @param typeFactory Type factory
      */
-    public IgniteRexBuilder(RelDataTypeFactory typeFactory) {
+    private IgniteRexBuilder(RelDataTypeFactory typeFactory) {
         super(typeFactory);
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 19f9d5e707..5c6ac58a02 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -52,6 +52,7 @@ import 
org.apache.ignite.internal.schema.DefaultValueGenerator;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.cache.Cache;
 import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
 import org.apache.ignite.lang.ErrorGroups.Common;
@@ -96,6 +97,10 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
     @Override
     public CompletableFuture<Void> schemaReadyFuture(int version) {
         // SqlSchemaManager creates SQL schema lazily on-demand, thus waiting 
for Catalog version is enough.
+        if (catalogManager.latestCatalogVersion() >= version) {
+            return Commons.completedFuture();
+        }
+
         return catalogManager.catalogReadyFuture(version);
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/type/IgniteTypeFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/type/IgniteTypeFactory.java
index a0a2b90192..fac70f4244 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/type/IgniteTypeFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/type/IgniteTypeFactory.java
@@ -69,30 +69,30 @@ public class IgniteTypeFactory extends JavaTypeFactoryImpl {
     private static final SqlIntervalQualifier INTERVAL_QUALIFIER_DAY_TIME = 
new SqlIntervalQualifier(TimeUnit.DAY,
             TimeUnit.SECOND, SqlParserPos.ZERO);
 
+    public static final IgniteTypeFactory INSTANCE = new 
IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+    /** Contains java types internally mapped into appropriate rel types. */
+    private final Map<Class<?>, Supplier<RelDataType>> implementedJavaTypes = 
new IdentityHashMap<>();
+
     /** Default charset. */
     private final Charset charset;
 
     /** A registry that contains custom data types. **/
     private final CustomDataTypes customDataTypes;
 
-    /** Contains java types internally mapped into appropriate rel types. */
-    private static final Map<Class<?>, Supplier<RelDataType>> 
implementedJavaTypes = new IdentityHashMap<>();
-
     {
-        {
-            implementedJavaTypes.put(LocalDate.class, () ->
-                    createTypeWithNullability(createSqlType(SqlTypeName.DATE), 
true));
-            implementedJavaTypes.put(LocalTime.class, () ->
-                    createTypeWithNullability(createSqlType(SqlTypeName.TIME), 
true));
-            implementedJavaTypes.put(LocalDateTime.class, () ->
-                    
createTypeWithNullability(createSqlType(SqlTypeName.TIMESTAMP), true));
-            implementedJavaTypes.put(Instant.class, () ->
-                    
createTypeWithNullability(createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE),
 true));
-            implementedJavaTypes.put(Duration.class, () ->
-                    
createTypeWithNullability(createSqlIntervalType(INTERVAL_QUALIFIER_DAY_TIME), 
true));
-            implementedJavaTypes.put(Period.class, () ->
-                    
createTypeWithNullability(createSqlIntervalType(INTERVAL_QUALIFIER_YEAR_MONTH), 
true));
-        }
+        implementedJavaTypes.put(LocalDate.class, () ->
+                createTypeWithNullability(createSqlType(SqlTypeName.DATE), 
true));
+        implementedJavaTypes.put(LocalTime.class, () ->
+                createTypeWithNullability(createSqlType(SqlTypeName.TIME), 
true));
+        implementedJavaTypes.put(LocalDateTime.class, () ->
+                
createTypeWithNullability(createSqlType(SqlTypeName.TIMESTAMP), true));
+        implementedJavaTypes.put(Instant.class, () ->
+                
createTypeWithNullability(createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE),
 true));
+        implementedJavaTypes.put(Duration.class, () ->
+                
createTypeWithNullability(createSqlIntervalType(INTERVAL_QUALIFIER_DAY_TIME), 
true));
+        implementedJavaTypes.put(Period.class, () ->
+                
createTypeWithNullability(createSqlIntervalType(INTERVAL_QUALIFIER_YEAR_MONTH), 
true));
     }
 
     /**
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/AbstractQueryContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/AbstractQueryContext.java
deleted file mode 100644
index 34e986bbf3..0000000000
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/AbstractQueryContext.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.util;
-
-import org.apache.calcite.plan.Context;
-
-/**
- * Abstract query context.
- */
-public class AbstractQueryContext implements Context {
-    private final Context parentCtx;
-
-    public AbstractQueryContext(Context parentCtx) {
-        this.parentCtx = parentCtx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public <C> C unwrap(Class<C> cls) {
-        if (cls == getClass()) {
-            return cls.cast(this);
-        }
-
-        return parentCtx.unwrap(cls);
-    }
-}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
index bf5eb82144..e27f2a0c86 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.util;
 
 import static org.apache.calcite.tools.Frameworks.createRootSchema;
+import static 
org.apache.ignite.internal.sql.engine.util.Commons.DISTRIBUTED_TRAITS_SET;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 
 import com.google.common.collect.Multimap;
@@ -30,6 +31,7 @@ import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.config.CalciteConnectionConfigImpl;
 import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptSchema;
@@ -42,13 +44,10 @@ import org.apache.calcite.rel.metadata.MetadataDef;
 import org.apache.calcite.rel.metadata.MetadataHandler;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.UnboundMetadata;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
 import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
 import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
@@ -56,21 +55,18 @@ import 
org.apache.ignite.internal.sql.engine.rex.IgniteRexBuilder;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.util.ArrayUtils;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Base query context.
  */
-public final class BaseQueryContext extends AbstractQueryContext {
-    public static final CalciteConnectionConfig CALCITE_CONNECTION_CONFIG;
+public final class BaseQueryContext implements Context {
+    private static final CalciteConnectionConfig CALCITE_CONNECTION_CONFIG;
 
     public static final RelOptCluster CLUSTER;
 
-    private static final IgniteTypeFactory TYPE_FACTORY;
-
     private static final IgniteCostFactory COST_FACTORY = new 
IgniteCostFactory();
 
-    private static final BaseQueryContext EMPTY_CONTEXT;
-
     static {
         Properties props = new Properties();
 
@@ -83,15 +79,10 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
 
         CALCITE_CONNECTION_CONFIG = new CalciteConnectionConfigImpl(props);
 
-        RelDataTypeSystem typeSys = 
CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class, 
FRAMEWORK_CONFIG.getTypeSystem());
-
-        TYPE_FACTORY = createTypeFactory(typeSys);
-
-        RexBuilder defaultRexBuilder = createRexBuilder(TYPE_FACTORY);
+        RexBuilder defaultRexBuilder = IgniteRexBuilder.INSTANCE;
 
-        EMPTY_CONTEXT = builder().build();
-
-        VolcanoPlanner planner = new VolcanoPlanner(COST_FACTORY, 
EMPTY_CONTEXT) {
+        BaseQueryContext emptyContext = builder().queryId(new UUID(0L, 
0L)).build();
+        VolcanoPlanner planner = new VolcanoPlanner(COST_FACTORY, 
emptyContext) {
             @Override
             public void registerSchema(RelOptSchema schema) {
                 // This method in VolcanoPlanner stores schema in hash map. It 
can be invoked during relational
@@ -101,7 +92,7 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         };
 
         // Dummy planner must contain all trait definitions to create 
singleton cluster with all default traits.
-        for (RelTraitDef<?> def : EMPTY_CONTEXT.config().getTraitDefs()) {
+        for (RelTraitDef<?> def : DISTRIBUTED_TRAITS_SET) {
             planner.addRelTraitDef(def);
         }
 
@@ -139,13 +130,9 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         CLUSTER = cluster;
     }
 
-    private final FrameworkConfig cfg;
-
-    private final IgniteLogger log;
-
-    private final IgniteTypeFactory typeFactory;
+    private final Context parentCtx;
 
-    private final RexBuilder rexBuilder;
+    private final FrameworkConfig cfg;
 
     private final QueryCancel cancel;
 
@@ -165,33 +152,30 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
             FrameworkConfig cfg,
             QueryCancel cancel,
             Object[] parameters,
-            IgniteLogger log,
             QueryPrefetchCallback prefetchCallback
     ) {
-        super(Contexts.chain(cfg.getContext()));
+        this.parentCtx = Contexts.chain(cfg.getContext());
 
         // link frameworkConfig#context() to this.
         this.cfg = Frameworks.newConfigBuilder(cfg).context(this).build();
 
         this.queryId = queryId;
-        this.log = log;
         this.cancel = cancel;
         this.parameters = parameters;
         this.prefetchCallback = prefetchCallback;
-
-        typeFactory = TYPE_FACTORY;
-
-        assert TYPE_FACTORY.getTypeSystem() == cfg.getTypeSystem();
-
-        rexBuilder = createRexBuilder(typeFactory);
     }
 
     public static Builder builder() {
         return new Builder();
     }
 
-    public static BaseQueryContext empty() {
-        return EMPTY_CONTEXT;
+    /** {@inheritDoc} */
+    @Override public <C> @Nullable C unwrap(Class<C> cls) {
+        if (cls == getClass()) {
+            return cls.cast(this);
+        }
+
+        return parentCtx.unwrap(cls);
     }
 
     public UUID queryId() {
@@ -206,22 +190,10 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         return cfg;
     }
 
-    public IgniteLogger logger() {
-        return log;
-    }
-
     public SchemaPlus schema() {
         return cfg.getDefaultSchema();
     }
 
-    public IgniteTypeFactory typeFactory() {
-        return typeFactory;
-    }
-
-    public RexBuilder rexBuilder() {
-        return rexBuilder;
-    }
-
     public int schemaVersion() {
         return 
Objects.requireNonNull(schema().unwrap(IgniteSchema.class)).version();
     }
@@ -248,27 +220,13 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         return catalogReader = new CalciteCatalogReader(
                 CalciteSchema.from(rootSchema),
                 CalciteSchema.from(dfltSchema).path(null),
-                typeFactory(), CALCITE_CONNECTION_CONFIG);
+                IgniteTypeFactory.INSTANCE, CALCITE_CONNECTION_CONFIG);
     }
 
     public QueryCancel cancel() {
         return cancel;
     }
 
-    /**
-     * Creates a builder object filled with current context attributes.
-     *
-     * @return Prefilled builder.
-     */
-    public Builder toBuilder() {
-        return builder()
-                .queryId(queryId)
-                .frameworkConfig(cfg)
-                .logger(log)
-                .cancel(cancel)
-                .parameters(parameters);
-    }
-
     /**
      * Query context builder.
      */
@@ -277,16 +235,14 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         private static final FrameworkConfig EMPTY_CONFIG =
                 Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
                         .defaultSchema(createRootSchema(false))
-                        .traitDefs(Commons.DISTRIBUTED_TRAITS_SET)
+                        .traitDefs(DISTRIBUTED_TRAITS_SET)
                         .build();
 
         private FrameworkConfig frameworkCfg = EMPTY_CONFIG;
 
         private QueryCancel cancel = new QueryCancel();
 
-        private IgniteLogger log = Loggers.voidLogger();
-
-        private UUID queryId = UUID.randomUUID();
+        private UUID queryId;
 
         private Object[] parameters = ArrayUtils.OBJECT_EMPTY_ARRAY;
 
@@ -302,11 +258,6 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
             return this;
         }
 
-        public Builder logger(IgniteLogger log) {
-            this.log = Objects.requireNonNull(log);
-            return this;
-        }
-
         public Builder queryId(UUID queryId) {
             this.queryId = Objects.requireNonNull(queryId);
             return this;
@@ -323,15 +274,7 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         }
 
         public BaseQueryContext build() {
-            return new BaseQueryContext(queryId, frameworkCfg, cancel, 
parameters, log, prefetchCallback);
+            return new BaseQueryContext(Objects.requireNonNull(queryId, 
"queryId"), frameworkCfg, cancel, parameters, prefetchCallback);
         }
     }
-
-    private static IgniteTypeFactory createTypeFactory(RelDataTypeSystem 
typeSystem) {
-        return new IgniteTypeFactory(typeSystem);
-    }
-
-    private static IgniteRexBuilder createRexBuilder(IgniteTypeFactory 
typeFactory) {
-        return new IgniteRexBuilder(typeFactory);
-    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index 85bab1af85..950968be67 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -46,6 +46,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.calcite.DataContexts;
@@ -116,6 +117,8 @@ import org.jetbrains.annotations.Nullable;
  * Utility methods.
  */
 public final class Commons {
+    private static final CompletableFuture<Void> COMPLETED_FUTURE = 
CompletableFuture.completedFuture(null);
+
     public static final String IMPLICIT_PK_COL_NAME = "__p_key";
 
     public static final int IN_BUFFER_SIZE = 512;
@@ -176,6 +179,10 @@ public final class Commons {
     private Commons() {
     }
 
+    public static CompletableFuture<Void> completedFuture() {
+        return COMPLETED_FUTURE;
+    }
+
     private static SqlTypeCoercionRule standardCompatibleCoercionRules() {
         return 
SqlTypeCoercionRule.instance(IgniteCustomAssigmentsRules.instance().getTypeMapping());
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index b2fbfd294d..c8a84e9d13 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -408,7 +408,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         await(batchFut.exceptionally(ex -> {
             assertInstanceOf(CompletionException.class, ex);
-            assertInstanceOf(RemoteFragmentExecutionException.class, 
ex.getCause());
+            assertInstanceOf(QueryCancelledException.class, ex.getCause());
             assertNull(ex.getCause().getCause());
 
             return null;
@@ -604,6 +604,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         ExecutionService execService = executionServices.get(0);
 
         Function<QueryPrefetchCallback, BaseQueryContext> createCtx = 
(callback) -> BaseQueryContext.builder()
+                .queryId(UUID.randomUUID())
                 .cancel(new QueryCancel())
                 .prefetchCallback(callback)
                 .frameworkConfig(
@@ -611,7 +612,6 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
                                 .defaultSchema(wrap(schema))
                                 .build()
                 )
-                .logger(log)
                 .build();
 
         QueryPrefetchCallback prefetchListener = new QueryPrefetchCallback() {
@@ -663,6 +663,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         IgniteInternalException expectedException = new 
IgniteInternalException(Common.INTERNAL_ERR, "Expected exception");
 
         BaseQueryContext ctx = BaseQueryContext.builder()
+                .queryId(UUID.randomUUID())
                 .cancel(new QueryCancel())
                 .prefetchCallback(prefetchFut::completeExceptionally)
                 .frameworkConfig(
@@ -670,7 +671,6 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
                                 .defaultSchema(wrap(schema))
                                 .build()
                 )
-                .logger(log)
                 .build();
 
         testCluster.node(nodeNames.get(2)).interceptor((nodeName, msg, 
original) -> {
@@ -784,13 +784,13 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
     private BaseQueryContext createContext() {
         return BaseQueryContext.builder()
+                .queryId(UUID.randomUUID())
                 .cancel(new QueryCancel())
                 .frameworkConfig(
                         Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
                                 .defaultSchema(wrap(schema))
                                 .build()
                 )
-                .logger(log)
                 .build();
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index 2266e87547..fcc1734e2d 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -36,7 +36,6 @@ import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
-import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -114,9 +113,6 @@ public class RuntimeSortedIndexTest extends 
IgniteAbstractTest {
     private RuntimeSortedIndex<Object[]> generate(RelDataType rowType, final 
List<Integer> idxCols, int notUnique) {
         RuntimeSortedIndex<Object[]> idx = new RuntimeSortedIndex<>(
                 new ExecutionContext<>(
-                        BaseQueryContext.builder()
-                                .logger(log)
-                                .build(),
                         null,
                         UUID.randomUUID(),
                         new ClusterNodeImpl("1", "fake-test-node", 
NetworkAddress.from("127.0.0.1:1111")),
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
index 46d1c20084..b9716646cb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
@@ -93,7 +93,7 @@ public class ExpressionFactoryImplTest extends 
BaseIgniteAbstractTest {
                 .executor(Mockito.mock(QueryTaskExecutor.class))
                 .build();
 
-        expFactory = new ExpressionFactoryImpl<>(ctx, typeFactory, 
SqlConformanceEnum.DEFAULT);
+        expFactory = new ExpressionFactoryImpl<>(ctx, 
SqlConformanceEnum.DEFAULT);
     }
 
     @Test
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index fb3a07f4b2..c24276cfcf 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -49,7 +49,6 @@ import 
org.apache.ignite.internal.sql.engine.exec.TxAttributes;
 import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
 import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
-import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.thread.LogUncaughtExceptionHandler;
@@ -112,9 +111,6 @@ public abstract class AbstractExecutionTest<T> extends 
IgniteAbstractTest {
         FragmentDescription fragmentDesc = new FragmentDescription(0, true, 
Long2ObjectMaps.emptyMap(), null, null);
 
         return new ExecutionContext<>(
-                BaseQueryContext.builder()
-                        .logger(log)
-                        .build(),
                 taskExecutor,
                 UUID.randomUUID(),
                 new ClusterNodeImpl("1", "fake-test-node", 
NetworkAddress.from("127.0.0.1:1111")),
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
index 76ed7d7a99..fe45def647 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
@@ -45,8 +45,6 @@ import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl;
 import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
-import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
-import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -485,13 +483,11 @@ public class MergeJoinExecutionTest extends 
AbstractExecutionTest<Object[]> {
         RelDataType rightType = TypeUtils.createRowType(tf, 
TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
         ScanNode<Object[]> rightNode = new ScanNode<>(ctx, 
Arrays.asList(right));
 
-        IgniteTypeFactory typeFactory = Commons.typeFactory();
-
         ExecutionContext<Object[]> ectx =
-                new 
ExecutionContext<>(BaseQueryContext.builder().logger(log).build(), null, null, 
null,
+                new ExecutionContext<>(null, null, null,
                         null, null, ArrayRowHandler.INSTANCE, null, null);
 
-        ExpressionFactoryImpl<Object[]> expFactory = new 
ExpressionFactoryImpl<>(ectx, typeFactory, SqlConformanceEnum.DEFAULT);
+        ExpressionFactoryImpl<Object[]> expFactory = new 
ExpressionFactoryImpl<>(ectx, SqlConformanceEnum.DEFAULT);
 
         RelFieldCollation colLeft = new RelFieldCollation(2, 
Direction.ASCENDING, NullDirection.FIRST);
         RelFieldCollation colRight = new RelFieldCollation(0, 
Direction.ASCENDING, NullDirection.FIRST);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index c94b2470b8..2fc650a84f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -87,7 +87,6 @@ import 
org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
 import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
-import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
 import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
@@ -453,7 +452,6 @@ public class TestBuilders {
         @Override
         public ExecutionContext<Object[]> build() {
             return new ExecutionContext<>(
-                    BaseQueryContext.builder().build(),
                     Objects.requireNonNull(executor, "executor"),
                     queryId,
                     Objects.requireNonNull(node, "node"),
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index 9cbe9ee1a7..043d1d5b7e 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.manager.IgniteComponent;
@@ -224,6 +225,7 @@ public class TestNode implements LifecycleAware {
 
     private BaseQueryContext createContext() {
         return BaseQueryContext.builder()
+                .queryId(UUID.randomUUID())
                 .cancel(new QueryCancel())
                 .frameworkConfig(
                         Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index cab0e9e5de..7e662c00b1 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -269,13 +269,13 @@ public abstract class AbstractPlannerTest extends 
IgniteAbstractTest {
         }
 
         return BaseQueryContext.builder()
+                .queryId(UUID.randomUUID())
                 .frameworkConfig(
                         newConfigBuilder(FRAMEWORK_CONFIG)
                                 .defaultSchema(dfltSchema)
                                 .sqlToRelConverterConfig(relConvCfg)
                                 .build()
                 )
-                .logger(log)
                 .parameters(params)
                 .build();
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
index 7e4ef3c06e..7d588f119c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 import org.apache.calcite.plan.RelOptUtil;
@@ -119,7 +120,7 @@ public class PlannerTest extends AbstractPlannerTest {
 
         PlanningContext ctx = PlanningContext.builder()
                 .parentContext(BaseQueryContext.builder()
-                        .logger(log)
+                        .queryId(UUID.randomUUID())
                         .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
                                 .defaultSchema(schema)
                                 .costFactory(new IgniteCostFactory(1, 100, 1, 
1))
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/ddl/AbstractDdlSqlToCommandConverterTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/ddl/AbstractDdlSqlToCommandConverterTest.java
index 698ddff2a5..52aa0fcaa1 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/ddl/AbstractDdlSqlToCommandConverterTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/ddl/AbstractDdlSqlToCommandConverterTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFI
 
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.parser.SqlParser;
@@ -59,6 +60,7 @@ class AbstractDdlSqlToCommandConverterTest extends 
BaseIgniteAbstractTest {
 
         return PlanningContext.builder()
                 .parentContext(BaseQueryContext.builder()
+                        .queryId(UUID.randomUUID())
                         .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
                                 .defaultSchema(schema)
                                 .build())

Reply via email to