This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 1995d3b  pending
1995d3b is described below

commit 1995d3b417cdd350abc4e8020bb61b847bef9228
Author: Igor Seliverstov <[email protected]>
AuthorDate: Fri Nov 1 15:27:02 2019 +0300

    pending
---
 .../query/calcite/exchange/Receiver.java           |   6 +-
 .../processors/query/calcite/exchange/Sender.java  |  10 +-
 .../metadata/IgniteMdSourceDistribution.java       |   6 +-
 .../processors/query/calcite/prepare/Query.java    |  20 ++
 .../processors/query/calcite/rel/CloneContext.java |  50 +++
 .../processors/query/calcite/rel/IgniteRel.java    |   2 +-
 .../query/calcite/rel/IgniteVisitor.java           |  45 ---
 .../calcite/rel/logical/IgniteLogicalExchange.java |   6 +-
 .../calcite/rel/logical/IgniteLogicalFilter.java   |  10 +-
 .../calcite/rel/logical/IgniteLogicalJoin.java     |  10 +-
 .../calcite/rel/logical/IgniteLogicalProject.java  |  10 +-
 .../rel/logical/IgniteLogicalTableScan.java        |  15 +-
 .../query/calcite/schema/IgniteTable.java          |  10 +-
 .../query/calcite/splitter/Fragment.java           |  22 +-
 .../calcite/splitter/PartitionsDistribution.java   |   2 +-
 .../splitter/PartitionsDistributionRegistry.java   |   6 +-
 .../query/calcite/splitter/QueryPlan.java          |  15 +
 .../query/calcite/splitter/Splitter.java           |  66 +---
 .../processors/query/calcite/util/Commons.java     |   9 +
 .../query/calcite/CalciteQueryProcessorTest.java   | 385 ++++++++++++++++++++-
 20 files changed, 556 insertions(+), 149 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
index fe35b52..dda7840 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
@@ -22,8 +22,8 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
 
 /**
@@ -51,8 +51,8 @@ public class Receiver extends SingleRel implements IgniteRel {
         return new Receiver(getCluster(), traitSet, (Sender) sole(inputs));
     }
 
-    @Override public <T> T accept(IgniteVisitor<T> visitor) {
-        return visitor.visitReceiver(this);
+    @Override public IgniteRel clone(CloneContext ctx) {
+        return new Receiver(ctx.getCluster(), getTraitSet(), 
ctx.clone(getInput()));
     }
 
     public void init(SourceDistribution targetDistribution, RelMetadataQueryEx 
mq) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
index f9bd762..7c1cd83 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
@@ -23,8 +23,8 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
@@ -49,12 +49,12 @@ public class Sender extends SingleRel implements IgniteRel {
         super(cluster, traits, input);
     }
 
-    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-        return new Sender(getCluster(), traitSet, sole(inputs));
+    @Override public IgniteRel clone(CloneContext ctx) {
+        return new Sender(ctx.getCluster(), getTraitSet(), 
ctx.clone(getInput()));
     }
 
-    @Override public <T> T accept(IgniteVisitor<T> visitor) {
-        return visitor.visitSender(this);
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new Sender(getCluster(), traitSet, sole(inputs));
     }
 
     public void init(SourceDistribution targetDistribution) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
index d44a57b..6cb5bcf 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
@@ -16,6 +16,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.BiRel;
@@ -34,7 +35,6 @@ import 
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDi
 import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 import org.apache.ignite.internal.util.GridIntList;
-import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
@@ -72,7 +72,9 @@ public class IgniteMdSourceDistribution implements 
MetadataHandler<SourceDistrib
     public SourceDistribution getSourceDistribution(Receiver rel, 
RelMetadataQuery mq) {
         SourceDistribution res = new SourceDistribution();
 
-        res.remoteInputs = F.asList(rel);
+        ArrayList<Receiver> remoteInputs = new ArrayList<>();
+        remoteInputs.add(rel);
+        res.remoteInputs = remoteInputs;
 
         return res;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Query.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Query.java
index 92b974e..d18c82c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Query.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Query.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.internal.util.typedef.F;
@@ -50,4 +51,23 @@ public class Query {
         }
         return res;
     }
+
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (!(o instanceof Query))
+            return false;
+
+        Query query = (Query) o;
+
+        if (!sql.equals(query.sql))
+            return false;
+        return Arrays.equals(params, query.params);
+    }
+
+    @Override public int hashCode() {
+        int result = sql.hashCode();
+        result = 31 * result + Arrays.hashCode(params);
+        return result;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/CloneContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/CloneContext.java
new file mode 100644
index 0000000..5c7046e
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/CloneContext.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.IdentityHashMap;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelNode;
+
+/**
+ *
+ */
+public final class CloneContext {
+    private final RelOptCluster cluster;
+    private final IdentityHashMap<IgniteRel, IgniteRel> mapping = new 
IdentityHashMap<>();
+
+    public CloneContext(RelOptCluster cluster) {
+        this.cluster = cluster;
+    }
+
+    public RelOptCluster getCluster() {
+        return cluster;
+    }
+
+    public <T extends IgniteRel> T clone(RelNode src) {
+        try {
+            return (T) mapping.computeIfAbsent((IgniteRel) src, this::clone0);
+        }
+        catch (ClassCastException e) {
+            throw new IllegalStateException("Unexpected node type: " + 
src.getClass());
+        }
+    }
+
+    private IgniteRel clone0(IgniteRel src) {
+        return src.clone(this);
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
index f15bf81..d2a47da 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
@@ -32,5 +32,5 @@ public interface IgniteRel extends RelNode {
         }
     };
 
-    <T> T accept(IgniteVisitor<T> visitor);
+    IgniteRel clone(CloneContext ctx);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
deleted file mode 100644
index 542696d..0000000
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rel;
-
-import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
-import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
-import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalExchange;
-import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalFilter;
-import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin;
-import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalProject;
-import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
-
-/**
- *
- */
-public interface IgniteVisitor<T> {
-    T visitExchange(IgniteLogicalExchange exchange);
-
-    T visitFilter(IgniteLogicalFilter filter);
-
-    T visitJoin(IgniteLogicalJoin join);
-
-    T visitProject(IgniteLogicalProject project);
-
-    T visitTableScan(IgniteLogicalTableScan tableScan);
-
-    T visitReceiver(Receiver receiver);
-
-    T visitSender(Sender sender);
-}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
index 32b8b09..dd2ad05 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
@@ -26,8 +26,8 @@ import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 
 /**
@@ -56,8 +56,8 @@ public final class IgniteLogicalExchange extends SingleRel 
implements IgniteRel
         return new IgniteLogicalExchange(getCluster(), traitSet, sole(inputs));
     }
 
-    @Override public <T> T accept(IgniteVisitor<T> visitor) {
-        return visitor.visitExchange(this);
+    @Override public IgniteRel clone(CloneContext ctx) {
+        throw new UnsupportedOperationException();
     }
 
     @Override public RelWriter explainTerms(RelWriter pw) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
index 0da0d04..2bf7b6c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
@@ -27,8 +27,8 @@ import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rex.RexNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 
 public final class IgniteLogicalFilter extends Filter implements IgniteRel {
@@ -44,6 +44,10 @@ public final class IgniteLogicalFilter extends Filter 
implements IgniteRel {
     return variablesSet;
   }
 
+  @Override public IgniteRel clone(CloneContext ctx) {
+    return new IgniteLogicalFilter(ctx.getCluster(), getTraitSet(), 
ctx.clone(getInput()), getCondition(), variablesSet);
+  }
+
   @Override public IgniteLogicalFilter copy(RelTraitSet traitSet, RelNode 
input,
       RexNode condition) {
     return new IgniteLogicalFilter(getCluster(), traitSet, input, condition, 
variablesSet);
@@ -61,8 +65,4 @@ public final class IgniteLogicalFilter extends Filter 
implements IgniteRel {
 
     return new IgniteLogicalFilter(filter.getCluster(), traits, input, 
filter.getCondition(), filter.getVariablesSet());
   }
-
-  @Override public <T> T accept(IgniteVisitor<T> visitor) {
-    return visitor.visitFilter(this);
-  }
 }
\ No newline at end of file
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
index 95699c7..df5bb2e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
@@ -25,8 +25,8 @@ import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 
 public final class IgniteLogicalJoin extends Join implements IgniteRel {
   private final boolean semiJoinDone;
@@ -44,6 +44,10 @@ public final class IgniteLogicalJoin extends Join implements 
IgniteRel {
     this.semiJoinDone = semiJoinDone;
   }
 
+  @Override public IgniteRel clone(CloneContext ctx) {
+    return new IgniteLogicalJoin(ctx.getCluster(), getTraitSet(), 
ctx.clone(getLeft()), ctx.clone(getRight()), getCondition(), variablesSet, 
getJoinType(), semiJoinDone);
+  }
+
   @Override public IgniteLogicalJoin copy(RelTraitSet traitSet, RexNode 
conditionExpr,
       RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) 
{
     return new IgniteLogicalJoin(getCluster(), traitSet, left, right, 
conditionExpr, variablesSet, joinType, semiJoinDone);
@@ -59,8 +63,4 @@ public final class IgniteLogicalJoin extends Join implements 
IgniteRel {
   @Override public boolean isSemiJoinDone() {
     return semiJoinDone;
   }
-
-  @Override public <T> T accept(IgniteVisitor<T> visitor) {
-    return visitor.visitJoin(this);
-  }
 }
\ No newline at end of file
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
index e67d371..41a6f2a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
@@ -25,8 +25,8 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 
 public final class IgniteLogicalProject extends Project implements IgniteRel {
@@ -39,6 +39,10 @@ public final class IgniteLogicalProject extends Project 
implements IgniteRel {
     super(cluster, traitSet, input, projects, rowType);
   }
 
+  @Override public IgniteRel clone(CloneContext ctx) {
+    return new IgniteLogicalProject(ctx.getCluster(), getTraitSet(), 
ctx.clone(getInput()), getProjects(), getRowType());
+  }
+
     @Override public IgniteLogicalProject copy(RelTraitSet traitSet, RelNode 
input,
       List<RexNode> projects, RelDataType rowType) {
     return new IgniteLogicalProject(getCluster(), traitSet, input, projects, 
rowType);
@@ -51,8 +55,4 @@ public final class IgniteLogicalProject extends Project 
implements IgniteRel {
 
     return new IgniteLogicalProject(project.getCluster(), traits, input, 
project.getProjects(), project.getRowType());
   }
-
-  @Override public <T> T accept(IgniteVisitor<T> visitor) {
-    return visitor.visitProject(this);
-  }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
index fb4cf58..87ccb20 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
@@ -22,16 +22,21 @@ import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 
 public final class IgniteLogicalTableScan extends TableScan implements 
IgniteRel {
   public IgniteLogicalTableScan(RelOptCluster cluster, RelTraitSet traitSet, 
RelOptTable table) {
     super(cluster, traitSet, table);
   }
 
+  @Override public IgniteRel clone(CloneContext ctx) {
+    return new IgniteLogicalTableScan(ctx.getCluster(), getTraitSet(), 
getTable());
+  }
+
   @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
     assert inputs.isEmpty();
 
@@ -39,11 +44,7 @@ public final class IgniteLogicalTableScan extends TableScan 
implements IgniteRel
   }
 
   public SourceDistribution tableDistribution() {
-     return getTable().unwrap(IgniteTable.class)
-         .sourceDistribution(getCluster().getPlanner().getContext());
-  }
-
-  @Override public <T> T accept(IgniteVisitor<T> visitor) {
-    return visitor.visitTableScan(this);
+    boolean local = !getTraitSet().isEnabled(DistributionTraitDef.INSTANCE);
+    return 
getTable().unwrap(IgniteTable.class).sourceDistribution(getCluster().getPlanner().getContext(),
 local);
   }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index bf8b752..6d25e19 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -80,7 +80,7 @@ public class IgniteTable extends AbstractTable implements 
TranslatableTable {
         return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.noOpFunction()); // TODO
     }
 
-    public SourceDistribution sourceDistribution(Context context) {
+    public SourceDistribution sourceDistribution(Context context, boolean 
local) {
         int cacheId = CU.cacheId(cacheName);
 
         SourceDistribution res = new SourceDistribution();
@@ -89,9 +89,11 @@ public class IgniteTable extends AbstractTable implements 
TranslatableTable {
         localInputs.add(cacheId);
         res.localInputs = localInputs;
 
-        PartitionsDistributionRegistry registry = 
context.unwrap(PartitionsDistributionRegistry.class);
-        AffinityTopologyVersion topVer = 
context.unwrap(AffinityTopologyVersion.class);
-        res.partitionMapping = registry.get(cacheId, topVer);
+        if (!local) {
+            PartitionsDistributionRegistry registry = 
context.unwrap(PartitionsDistributionRegistry.class);
+            AffinityTopologyVersion topVer = 
context.unwrap(AffinityTopologyVersion.class);
+            res.partitionMapping = registry.distributed(cacheId, topVer);
+        }
 
         return res;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index f9ee3d1..ffc6b48 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -28,34 +28,38 @@ import org.apache.ignite.internal.util.typedef.F;
  *
  */
 public class Fragment {
-    public final RelNode root;
+    public final RelNode rel;
 
     public SourceDistribution distribution;
 
-    public Fragment(RelNode root) {
-        this.root = root;
+    public Fragment(RelNode rel) {
+        this.rel = rel;
     }
 
     public void init(Context ctx) {
         RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
 
-        distribution = mq.getSourceDistribution(root);
+        distribution = mq.getSourceDistribution(rel);
 
         PartitionsDistribution mapping = distribution.partitionMapping;
 
-        if (mapping == null)
-            distribution.partitionMapping = isRootFragment() ? 
registry(ctx).single() : registry(ctx).random(topologyVersion(ctx));
+        if (mapping == null) {
+            if (!isRoot())
+                distribution.partitionMapping = 
registry(ctx).random(topologyVersion(ctx));
+            else if (!F.isEmpty(distribution.remoteInputs))
+                distribution.partitionMapping = registry(ctx).single();
+        }
         else if (mapping.excessive)
             distribution.partitionMapping = mapping.deduplicate();
-        
+
         if (!F.isEmpty(distribution.remoteInputs)) {
             for (Receiver input : distribution.remoteInputs)
                 input.init(distribution, mq);
         }
     }
 
-    private boolean isRootFragment() {
-        return !(root instanceof Sender);
+    private boolean isRoot() {
+        return !(rel instanceof Sender);
     }
 
     private PartitionsDistributionRegistry registry(Context ctx) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
index 72ed1fa..fb2728a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
@@ -50,7 +50,7 @@ public class PartitionsDistribution {
             else {
                 int[] mergedParts = merge(nodeParts[i], other.nodeParts[j]);
 
-                if (mergedParts.length > 0) {
+                if (mergedParts == ALL_PARTS || mergedParts.length > 0) {
                     if (nodes0 == null) {
                         int len = Math.min(nodes.length, other.nodes.length);
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
index 568ce65..345957e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
@@ -22,7 +22,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  *
  */
 public interface PartitionsDistributionRegistry {
-    PartitionsDistribution get(int cacheId, AffinityTopologyVersion topVer);
-    PartitionsDistribution random(AffinityTopologyVersion topVer);
-    PartitionsDistribution single();
+    PartitionsDistribution single(); // returns local node with single 
partition
+    PartitionsDistribution random(AffinityTopologyVersion topVer); // returns 
random distribution, partitions count depends on nodes count
+    PartitionsDistribution distributed(int cacheId, AffinityTopologyVersion 
topVer); // returns cache distribution
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index 84985e6..d23091d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -18,6 +18,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.splitter;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.Context;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
 
 /**
  *
@@ -34,4 +35,18 @@ public class QueryPlan {
             fragment.init(ctx);
         }
     }
+
+    public ImmutableList<Fragment> fragments() {
+        return fragments;
+    }
+
+    public QueryPlan clone(CloneContext ctx) {
+        ImmutableList.Builder<Fragment> b = ImmutableList.builder();
+
+        for (Fragment f : fragments) {
+            b.add(new Fragment(ctx.clone(f.rel)));
+        }
+
+        return new QueryPlan(b.build());
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index 5220d74..fb1c730 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -17,24 +17,19 @@
 package org.apache.ignite.internal.processors.query.calcite.splitter;
 
 import com.google.common.collect.ImmutableList;
-import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
 import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalExchange;
-import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalFilter;
-import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin;
-import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalProject;
-import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
 
 /**
  *
  */
-public class Splitter implements IgniteVisitor<IgniteRel> {
+public class Splitter extends RelShuttleImpl {
     private ImmutableList.Builder<Fragment> b;
 
     public QueryPlan go(IgniteRel root) {
@@ -43,53 +38,24 @@ public class Splitter implements IgniteVisitor<IgniteRel> {
         return new QueryPlan(b.add(new Fragment(root.accept(this))).build());
     }
 
-    @Override public IgniteRel visitExchange(IgniteLogicalExchange exchange) {
-        RelOptCluster cluster = exchange.getCluster();
-        RelTraitSet traitSet = exchange.getTraitSet();
+    @Override public RelNode visit(RelNode rel) {
+        if (!(rel instanceof IgniteRel))
+            throw new AssertionError("Unexpected node: " + rel);
+        else if (rel instanceof Sender || rel instanceof Receiver)
+            throw new AssertionError("An attempt to split an already split 
task.");
+        else if (rel instanceof IgniteLogicalExchange) {
+            IgniteLogicalExchange exchange = (IgniteLogicalExchange) rel;
 
-        IgniteRel input = visitChildren(exchange.getInput());
+            RelOptCluster cluster = exchange.getCluster();
+            RelTraitSet traitSet = exchange.getTraitSet();
 
-        Sender sender = new Sender(cluster, traitSet, input);
+            Sender sender = new Sender(cluster, traitSet, 
visit(exchange.getInput()));
 
-        b.add(new Fragment(sender));
+            b.add(new Fragment(sender));
 
-        return new Receiver(cluster, traitSet, sender);
-    }
-
-    @Override public IgniteRel visitFilter(IgniteLogicalFilter filter) {
-        return visitChildren(filter);
-    }
-
-    @Override public IgniteRel visitJoin(IgniteLogicalJoin join) {
-        return visitChildren(join);
-    }
-
-    @Override public IgniteRel visitProject(IgniteLogicalProject project) {
-        return visitChildren(project);
-    }
-
-    @Override public IgniteRel visitTableScan(IgniteLogicalTableScan 
tableScan) {
-        return tableScan;
-    }
-
-    @Override public IgniteRel visitReceiver(Receiver receiver) {
-        throw new AssertionError("An attempt to split an already split task.");
-    }
-
-    @Override public IgniteRel visitSender(Sender sender) {
-        throw new AssertionError("An attempt to split an already split task.");
-    }
-
-    private IgniteRel visitChildren(RelNode parent) {
-        List<RelNode> inputs = parent.getInputs();
-
-        for (int i = 0; i < inputs.size(); i++) {
-            IgniteRel input = (IgniteRel) inputs.get(i);
-            IgniteRel rel = input.accept(this);
-
-            if (rel != input)
-                parent.replaceInput(i, rel);
+            return new Receiver(cluster, traitSet, sender);
         }
-        return (IgniteRel) parent;
+
+        return super.visit(rel);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 739d411..fc4a6eb 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -60,6 +60,15 @@ public final class Commons {
         return paramSrc.get();
     }
 
+    public static <T> T contextParam(Context ctx, Class<T> paramType, 
Supplier<T> paramSrc) {
+        T param = ctx.unwrap(paramType);
+
+        if (param != null)
+            return param;
+
+        return paramSrc.get();
+    }
+
     /** */
     public static RowType rowType(GridQueryTypeDescriptor desc) {
         RowType.Builder b = RowType.builder();
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index d9ebd5b..8806868 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -62,6 +62,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     private static PartitionsDistribution projectDistribution;
     private static PartitionsDistribution randomDistribution;
     private static PartitionsDistribution singleDistribution;
+
     private static PartitionsDistributionRegistry registry;
 
     @BeforeClass
@@ -132,7 +133,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
         singleDistribution.nodeParts = new int[][]{{1}};
 
         registry = new PartitionsDistributionRegistry() {
-            @Override public PartitionsDistribution get(int cacheId, 
AffinityTopologyVersion topVer) {
+            @Override public PartitionsDistribution distributed(int cacheId, 
AffinityTopologyVersion topVer) {
                 if (cacheId == CU.cacheId("Developer"))
                     return developerDistribution;
                 if (cacheId == CU.cacheId("Project"))
@@ -165,6 +166,322 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
         assertNotNull(ctx);
 
         RelTraitDef[] traitDefs = {
+            ConventionTraitDef.INSTANCE
+        };
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(query);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+        }
+
+        assertNotNull(relRoot.rel);
+
+
+    }
+
+    @Test
+    public void testHepPlaner() throws Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.projectId = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            ConventionTraitDef.INSTANCE
+        };
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(query);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerType.HEP, 
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot.rel);
+    }
+
+    @Test
+    public void testVolcanoPlanerDistributed() throws Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.projectId = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE
+        };
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(query);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteRel.LOGICAL_CONVENTION)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, 
rel, desired);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot.rel);
+    }
+
+    @Test
+    public void testVolcanoPlanerLocal() throws Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.projectId = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            ConventionTraitDef.INSTANCE
+        };
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteRel.LOGICAL_CONVENTION)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, 
rel, desired);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot.rel);
+    }
+
+    @Test
+    public void testSplitterLocal() throws Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.projectId = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            ConventionTraitDef.INSTANCE
+        };
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerType.HEP, 
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteRel.LOGICAL_CONVENTION)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, 
rel, desired);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot);
+
+        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
+
+        assertNotNull(plan);
+    }
+
+    @Test
+    public void testSplitterCollocated() throws Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.id = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE
+        };
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerType.HEP, 
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteRel.LOGICAL_CONVENTION)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, 
rel, desired);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot);
+
+        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
+
+        assertNotNull(plan);
+
+        assertTrue(plan.fragments().size() == 2);
+    }
+
+    @Test
+    public void testSplitterPartiallyCollocated() throws Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.projectId = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
             DistributionTraitDef.INSTANCE,
             ConventionTraitDef.INSTANCE
         };
@@ -211,5 +528,71 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
         plan.init(ctx);
 
         assertNotNull(plan);
+
+        assertTrue(plan.fragments().size() == 3);
+    }
+
+    @Test
+    public void testSplitterNonCollocated() throws Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.projectId = p.ver0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE
+        };
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerType.HEP, 
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteRel.LOGICAL_CONVENTION)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, 
rel, desired);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot);
+
+        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
+
+        assertNotNull(plan);
+
+        assertTrue(plan.fragments().size() == 4);
     }
 }
\ No newline at end of file

Reply via email to