This is an automated email from the ASF dual-hosted git repository.

ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0080ae950ff IGNITE-20428 SQL Calcite: fix query freezes when 
partitions are set. (#10935)
0080ae950ff is described below

commit 0080ae950ff735ddfdec2477589ab9b659427ded
Author: Ivan Daschinskiy <ivanda...@apache.org>
AuthorDate: Mon Sep 18 14:38:31 2023 +0300

    IGNITE-20428 SQL Calcite: fix query freezes when partitions are set. 
(#10935)
---
 .../processors/query/calcite/RootQuery.java        |  7 ++-
 .../query/calcite/exec/ExecutionServiceImpl.java   | 38 ++++--------
 .../calcite/prepare/AbstractMultiStepPlan.java     | 72 ++++++----------------
 .../query/calcite/prepare/ExecutionPlan.java       | 49 ++++++++++++++-
 .../query/calcite/prepare/MappingQueryContext.java | 13 +++-
 .../query/calcite/prepare/MultiStepPlan.java       | 24 +-------
 .../processors/query/calcite/util/Commons.java     |  6 +-
 .../QueryWithPartitionsIntegrationTest.java        | 17 ++---
 .../query/calcite/planner/PlannerTest.java         | 11 ++--
 9 files changed, 113 insertions(+), 124 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index cd6290e9495..8a1f1c20555 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -46,8 +46,9 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadata;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
-import 
org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.running.TrackableQuery;
@@ -193,14 +194,14 @@ public class RootQuery<RowT> extends Query<RowT> 
implements TrackableQuery {
     /**
      * Starts execution phase for the query and setup remote fragments.
      */
-    public void run(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT> 
root) {
+    public void run(ExecutionContext<RowT> ctx, ExecutionPlan plan, 
FieldsMetadata metadata, Node<RowT> root) {
         synchronized (mux) {
             if (state == QueryState.CLOSED)
                 throw queryCanceledException();
 
             planningTime = U.currentTimeMillis() - startTs;
 
-            RootNode<RowT> rootNode = new RootNode<>(ctx, 
plan.fieldsMetadata().rowType(), this::tryClose);
+            RootNode<RowT> rootNode = new RootNode<>(ctx, metadata.rowType(), 
this::tryClose);
             rootNode.register(root);
 
             addFragment(new 
RunningFragment<>(F.first(plan.fragments()).root(), rootNode, ctx));
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index e7d1a594b9b..38667c7c25a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -76,15 +76,14 @@ import 
org.apache.ignite.internal.processors.query.calcite.message.MessageType;
 import 
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
 import 
org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.RemoteException;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
 import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
 import org.apache.ignite.internal.processors.query.calcite.prepare.DdlPlan;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadataImpl;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
@@ -561,22 +560,11 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
     ) {
         qry.mapping();
 
-        MappingQueryContext mapCtx = Commons.mapContext(locNodeId, 
topologyVersion(), qry.context().isLocal());
+        MappingQueryContext mapCtx = Commons.mapContext(locNodeId, 
topologyVersion(), qry.context());
 
-        plan.init(mappingSvc, mapCtx);
+        ExecutionPlan execPlan = plan.init(mappingSvc, mapCtx);
 
-        List<Fragment> fragments = plan.fragments();
-
-        if (!F.isEmpty(qry.context().partitions())) {
-            fragments = Commons.transform(fragments, f -> {
-                try {
-                    return f.filterByPartitions(qry.context().partitions());
-                }
-                catch (ColocationMappingException e) {
-                    throw new FragmentMappingException("Failed to calculate 
physical distribution", f, f.root(), e);
-                }
-            });
-        }
+        List<Fragment> fragments = execPlan.fragments();
 
         // Local execution
         Fragment fragment = F.first(fragments);
@@ -584,13 +572,13 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
         if (U.assertionsEnabled()) {
             assert fragment != null;
 
-            FragmentMapping mapping = plan.mapping(fragment);
+            FragmentMapping mapping = execPlan.mapping(fragment);
 
             assert mapping != null;
 
             List<UUID> nodes = mapping.nodeIds();
 
-            assert nodes != null && nodes.size() == 1 && 
F.first(nodes).equals(localNodeId())
+            assert nodes != null && (nodes.size() == 1 && 
F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
                     : "nodes=" + nodes + ", localNode=" + localNodeId();
         }
 
@@ -603,9 +591,9 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
 
         FragmentDescription fragmentDesc = new FragmentDescription(
             fragment.fragmentId(),
-            plan.mapping(fragment),
-            plan.target(fragment),
-            plan.remotes(fragment));
+            execPlan.mapping(fragment),
+            execPlan.target(fragment),
+            execPlan.remotes(fragment));
 
         ExecutionContext<Row> ectx = new ExecutionContext<>(
             qry.context(),
@@ -624,7 +612,7 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
         Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), 
mailboxRegistry(),
             exchangeService(), failureProcessor()).go(fragment.root());
 
-        qry.run(ectx, plan, node);
+        qry.run(ectx, execPlan, plan.fieldsMetadata(), node);
 
         Map<UUID, Long> fragmentsPerNode = fragments.stream()
             .skip(1)
@@ -636,9 +624,9 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
             fragment = fragments.get(i);
             fragmentDesc = new FragmentDescription(
                 fragment.fragmentId(),
-                plan.mapping(fragment),
-                plan.target(fragment),
-                plan.remotes(fragment));
+                execPlan.mapping(fragment),
+                execPlan.target(fragment),
+                execPlan.remotes(fragment));
 
             Throwable ex = null;
             byte[] parametersMarshalled = null;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
index 0184ba91f86..f6a8be350ed 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -17,18 +17,12 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -44,9 +38,6 @@ public abstract class AbstractMultiStepPlan extends 
AbstractQueryPlan implements
     /** */
     protected final QueryTemplate queryTemplate;
 
-    /** */
-    protected ExecutionPlan executionPlan;
-
     /** */
     private final String textPlan;
 
@@ -66,11 +57,6 @@ public abstract class AbstractMultiStepPlan extends 
AbstractQueryPlan implements
         this.paramsMetadata = paramsMetadata;
     }
 
-    /** {@inheritDoc} */
-    @Override public List<Fragment> fragments() {
-        return Objects.requireNonNull(executionPlan).fragments();
-    }
-
     /** {@inheritDoc} */
     @Override public FieldsMetadata fieldsMetadata() {
         return fieldsMetadata;
@@ -82,47 +68,25 @@ public abstract class AbstractMultiStepPlan extends 
AbstractQueryPlan implements
     }
 
     /** {@inheritDoc} */
-    @Override public FragmentMapping mapping(Fragment fragment) {
-        return fragment.mapping();
-    }
-
-    /** {@inheritDoc} */
-    @Override public ColocationGroup target(Fragment fragment) {
-        if (fragment.rootFragment())
-            return null;
-
-        IgniteSender sender = (IgniteSender)fragment.root();
-        return 
mapping(sender.targetFragmentId()).findGroup(sender.exchangeId());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<Long, List<UUID>> remotes(Fragment fragment) {
-        List<IgniteReceiver> remotes = fragment.remotes();
+    @Override public ExecutionPlan init(MappingService mappingService, 
MappingQueryContext ctx) {
+        ExecutionPlan executionPlan0 = queryTemplate.map(mappingService, ctx);
 
-        if (F.isEmpty(remotes))
-            return null;
+        if (!F.isEmpty(ctx.partitions()) && 
!F.isEmpty(executionPlan0.fragments())) {
+            List<Fragment> fragments = executionPlan0.fragments();
 
-        HashMap<Long, List<UUID>> res = U.newHashMap(remotes.size());
+            fragments = Commons.transform(fragments, f -> {
+                try {
+                    return f.filterByPartitions(ctx.partitions());
+                }
+                catch (ColocationMappingException e) {
+                    throw new FragmentMappingException("Failed to calculate 
physical distribution", f, f.root(), e);
+                }
+            });
 
-        for (IgniteReceiver remote : remotes)
-            res.put(remote.exchangeId(), 
mapping(remote.sourceFragmentId()).nodeIds());
-
-        return res;
-    }
+            return new ExecutionPlan(executionPlan0.topologyVersion(), 
fragments);
+        }
 
-    /** {@inheritDoc} */
-    @Override public void init(MappingService mappingService, 
MappingQueryContext ctx) {
-        executionPlan = queryTemplate.map(mappingService, ctx);
-    }
-
-    /** */
-    private FragmentMapping mapping(long fragmentId) {
-        return Objects.requireNonNull(executionPlan).fragments().stream()
-            .filter(f -> f.fragmentId() == fragmentId)
-            .findAny().orElseThrow(() -> new IllegalStateException("Cannot 
find fragment with given ID. [" +
-                "fragmentId=" + fragmentId + ", " +
-                "fragments=" + fragments() + "]"))
-            .mapping();
+        return executionPlan0;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
index 75fc70d741b..5a25502ea27 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
@@ -17,14 +17,23 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import com.google.common.collect.ImmutableList;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  *
  */
-class ExecutionPlan {
+public class ExecutionPlan {
     /** */
     private final AffinityTopologyVersion ver;
 
@@ -46,4 +55,42 @@ class ExecutionPlan {
     public List<Fragment> fragments() {
         return fragments;
     }
+
+    /** */
+    public FragmentMapping mapping(Fragment fragment) {
+        return fragment.mapping();
+    }
+
+    /** */
+    public ColocationGroup target(Fragment fragment) {
+        if (fragment.rootFragment())
+            return null;
+
+        IgniteSender sender = (IgniteSender)fragment.root();
+        return 
mapping(sender.targetFragmentId()).findGroup(sender.exchangeId());
+    }
+
+    /** */
+    public Map<Long, List<UUID>> remotes(Fragment fragment) {
+        List<IgniteReceiver> remotes = fragment.remotes();
+
+        if (F.isEmpty(remotes))
+            return null;
+
+        HashMap<Long, List<UUID>> res = U.newHashMap(remotes.size());
+
+        for (IgniteReceiver remote : remotes)
+            res.put(remote.exchangeId(), 
mapping(remote.sourceFragmentId()).nodeIds());
+
+        return res;
+    }
+
+    /** */
+    private FragmentMapping mapping(long fragmentId) {
+        return fragments().stream()
+            .filter(f -> f.fragmentId() == fragmentId)
+            .findAny().orElseThrow(() -> new IllegalStateException("Cannot 
find fragment with given ID. [" +
+                "fragmentId=" + fragmentId + ", " + "fragments=" + fragments() 
+ "]"))
+            .mapping();
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
index 2bdd1a0c913..f8b6f43128f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
@@ -41,16 +41,20 @@ public class MappingQueryContext {
     /** */
     private final boolean isLocal;
 
+    /** */
+    private final int[] parts;
+
     /** */
     public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer) 
{
-        this(locNodeId, topVer, false);
+        this(locNodeId, topVer, false, null);
     }
 
     /** */
-    public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer, 
boolean isLocal) {
+    public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer, 
boolean isLocal, int[] parts) {
         this.locNodeId = locNodeId;
         this.topVer = topVer;
         this.isLocal = isLocal;
+        this.parts = parts;
     }
 
     /** */
@@ -68,6 +72,11 @@ public class MappingQueryContext {
         return isLocal;
     }
 
+    /** */
+    public int[] partitions() {
+        return parts;
+    }
+
     /** Creates a cluster. */
     RelOptCluster cluster() {
         if (cluster == null) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
index e738d9b92a0..692f713c939 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
@@ -17,22 +17,12 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 
 /**
  * Regular query or DML
  */
 public interface MultiStepPlan extends QueryPlan {
-    /**
-     * @return Query fragments.
-     */
-    List<Fragment> fragments();
-
     /**
      * @return Fields metadata.
      */
@@ -43,24 +33,12 @@ public interface MultiStepPlan extends QueryPlan {
      */
     FieldsMetadata paramsMetadata();
 
-    /**
-     * @param fragment Fragment.
-     * @return Mapping for a given fragment.
-     */
-    FragmentMapping mapping(Fragment fragment);
-
-    /** */
-    ColocationGroup target(Fragment fragment);
-
-    /** */
-    Map<Long, List<UUID>> remotes(Fragment fragment);
-
     /**
      * Inits query fragments.
      *
      * @param ctx Planner context.
      */
-    void init(MappingService mappingService, MappingQueryContext ctx);
+    ExecutionPlan init(MappingService mappingService, MappingQueryContext ctx);
 
     /**
      * @return Text representation of query plan
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index fcafc98b301..4476ebc3793 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -444,11 +444,11 @@ public final class Commons {
 
     /** */
     public static MappingQueryContext mapContext(UUID locNodeId, 
AffinityTopologyVersion topVer) {
-        return mapContext(locNodeId, topVer, false);
+        return new MappingQueryContext(locNodeId, topVer, false, null);
     }
 
     /** */
-    public static MappingQueryContext mapContext(UUID locNodeId, 
AffinityTopologyVersion topVer, boolean isLocal) {
-        return new MappingQueryContext(locNodeId, topVer, isLocal);
+    public static MappingQueryContext mapContext(UUID locNodeId, 
AffinityTopologyVersion topVer, BaseQueryContext ctx) {
+        return new MappingQueryContext(locNodeId, topVer, ctx.isLocal(), 
ctx.partitions());
     }
 }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
index 16655187dc8..660ee91dc3e 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
@@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
-import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Ints;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.IgniteCache;
@@ -53,12 +52,16 @@ public class QueryWithPartitionsIntegrationTest extends 
AbstractBasicIntegration
     public boolean local;
 
     /** */
-    @Parameterized.Parameters(name = "local = {0}")
+    @Parameterized.Parameter(1)
+    public int partSz;
+
+    /** */
+    @Parameterized.Parameters(name = "local = {0}, partSz = {1}")
     public static List<Object[]> parameters() {
-        return ImmutableList.of(
-                new Object[]{true},
-                new Object[]{false}
-        );
+        return Stream.of(true, false)
+            .flatMap(isLocal -> Stream.of(1, 2, 5, 10, 20)
+                .map(i -> new Object[]{isLocal, i}))
+            .collect(Collectors.toList());
     }
 
     /** {@inheritDoc} */
@@ -67,7 +70,7 @@ public class QueryWithPartitionsIntegrationTest extends 
AbstractBasicIntegration
 
         List<Integer> parts0 = IntStream.range(0, 
1024).boxed().collect(Collectors.toList());
         Collections.shuffle(parts0);
-        parts = Ints.toArray(parts0.subList(0, 20));
+        parts = Ints.toArray(parts0.subList(0, partSz));
 
         log.info("Running tests with parts=" + Arrays.toString(parts));
     }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index aa4cd3e95f2..597869d237c 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -52,6 +52,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGr
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
@@ -314,7 +315,7 @@ public class PlannerTest extends AbstractPlannerTest {
 
         IgniteRel phys = physicalPlan(ctx);
 
-        MultiStepPlan plan = splitPlan(phys);
+        ExecutionPlan plan = splitPlan(phys);
 
         List<Fragment> fragments = plan.fragments();
         assertEquals(2, fragments.size());
@@ -359,16 +360,14 @@ public class PlannerTest extends AbstractPlannerTest {
     }
 
     /** */
-    private MultiStepPlan splitPlan(IgniteRel phys) {
+    private ExecutionPlan splitPlan(IgniteRel phys) {
         assertNotNull(phys);
 
         MultiStepPlan plan = new MultiStepQueryPlan(null, null, new 
QueryTemplate(new Splitter().go(phys)), null, null);
 
         assertNotNull(plan);
 
-        plan.init(this::intermediateMapping, 
Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
-
-        return plan;
+        return plan.init(this::intermediateMapping, 
Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
     }
 
     /**
@@ -378,7 +377,7 @@ public class PlannerTest extends AbstractPlannerTest {
         BaseQueryContext qctx,
         PlanningContext ctx,
         TestIoManager mgr,
-        MultiStepPlan plan,
+        ExecutionPlan plan,
         Fragment fragment,
         UUID qryId,
         UUID nodeId

Reply via email to