This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new e797a6e  pending
e797a6e is described below

commit e797a6eba85738b9c8b9d8b2fe500e590a776a2f
Author: Igor Seliverstov <gvvinbl...@gmail.com>
AuthorDate: Mon Oct 28 21:06:03 2019 +0300

    pending
---
 .../query/calcite/CalciteQueryProcessor.java       |   5 +-
 .../query/calcite/exchange/Receiver.java           |   7 +-
 .../processors/query/calcite/exchange/Sender.java  |   7 +-
 .../calcite/metadata/IgniteMdDistribution.java     |  28 +--
 .../metadata/IgniteMdSourceDistribution.java       |  94 ++++------
 .../query/calcite/metadata/IgniteMetadata.java     |  20 +--
 .../query/calcite/metadata/RelMetadataQueryEx.java |  88 +++++++++
 .../processors/query/calcite/rel/IgniteRel.java    |   4 +-
 .../query/calcite/rel/IgniteVisitor.java           |  24 ++-
 .../calcite/rel/logical/IgniteLogicalExchange.java |   5 +
 .../calcite/rel/logical/IgniteLogicalFilter.java   |   5 +
 .../calcite/rel/logical/IgniteLogicalJoin.java     |   5 +
 .../calcite/rel/logical/IgniteLogicalProject.java  |   5 +
 .../rel/logical/IgniteLogicalTableScan.java        |   5 +
 .../query/calcite/schema/IgniteTable.java          |  13 +-
 .../calcite/splitter/PartitionsDistribution.java   | 196 +++++++++++++++++++++
 .../PartitionsDistributionRegistry.java}           |   8 +-
 .../query/calcite/splitter/SourceDistribution.java |   4 +-
 .../query/calcite/splitter/SplitTask.java          |   9 +
 .../query/calcite/splitter/TaskSplitter.java       |  84 ++++++++-
 .../query/calcite/util/IgniteMethod.java           |   8 +-
 .../query/calcite/CalciteQueryProcessorTest.java   |  37 +++-
 22 files changed, 543 insertions(+), 118 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index a024027..d1ad130 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -145,12 +145,11 @@ public class CalciteQueryProcessor implements QueryEngine 
{
      * @return Query execution context.
      */
     Context context(@NotNull Context ctx, String query, Object[] params) { // 
Package private visibility for tests.
-        return Contexts.chain(ctx,
+        return Contexts.chain(ctx, config.getContext(),
             Contexts.of(
                 new Query(query, params),
                 contextParameter(ctx, SchemaPlus.class, schemaHolder::schema),
-                contextParameter(ctx, AffinityTopologyVersion.class, 
this::readyAffinityVersion)),
-            config.getContext());
+                contextParameter(ctx, AffinityTopologyVersion.class, 
this::readyAffinityVersion)));
     }
 
     private QueryExecution prepare(Context ctx) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
index 8d2ee1c..993f55b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
@@ -22,6 +22,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 
 /**
  *
@@ -32,7 +33,7 @@ public class Receiver extends SingleRel implements IgniteRel {
      * @param traits Trait set.
      * @param sender Corresponding sender.
      */
-    protected Receiver(RelOptCluster cluster, RelTraitSet traits, Sender 
sender) {
+    public Receiver(RelOptCluster cluster, RelTraitSet traits, Sender sender) {
         super(cluster, traits, sender);
     }
 
@@ -45,4 +46,8 @@ public class Receiver extends SingleRel implements IgniteRel {
     @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
         return new Receiver(getCluster(), traitSet, (Sender) sole(inputs));
     }
+
+    @Override public <T> T accept(IgniteVisitor<T> visitor) {
+        return visitor.visitReceiver(this);
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
index 5049250..63cf8f7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
@@ -21,6 +21,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 
 /**
  *
@@ -33,7 +34,11 @@ public class Sender extends SingleRel implements IgniteRel {
      * @param traits Trait set.
      * @param input Input relational expression
      */
-    protected Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input) 
{
+    public Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
         super(cluster, traits, input);
     }
+
+    @Override public <T> T accept(IgniteVisitor<T> visitor) {
+        return visitor.visitSender(this);
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
index ded1a8d..298e905 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
@@ -47,36 +47,36 @@ import static 
org.apache.ignite.internal.processors.query.calcite.trait.Distribu
 /**
  *
  */
-public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.TraitDistribution> {
+public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.DistributionTraitMetadata> {
     public static final RelMetadataProvider SOURCE =
-        
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.DISTRIBUTION.method(),
 new IgniteMdDistribution());
+        
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.DISTRIBUTION_TRAIT.method(),
 new IgniteMdDistribution());
 
-    @Override public MetadataDef<IgniteMetadata.TraitDistribution> getDef() {
-        return IgniteMetadata.TraitDistribution.DEF;
+    @Override public MetadataDef<IgniteMetadata.DistributionTraitMetadata> 
getDef() {
+        return IgniteMetadata.DistributionTraitMetadata.DEF;
     }
 
-    public DistributionTrait distribution(RelNode rel, RelMetadataQuery mq) {
+    public DistributionTrait getDistributionTrait(RelNode rel, 
RelMetadataQuery mq) {
         return DistributionTraitDef.INSTANCE.getDefault();
     }
 
-    public DistributionTrait distribution(Filter filter, RelMetadataQuery mq) {
+    public DistributionTrait getDistributionTrait(Filter filter, 
RelMetadataQuery mq) {
         return filter(mq, filter.getInput(), filter.getCondition());
     }
 
-    public DistributionTrait distribution(Project project, RelMetadataQuery 
mq) {
+    public DistributionTrait getDistributionTrait(Project project, 
RelMetadataQuery mq) {
         return project(mq, project.getInput(), project.getProjects());
     }
 
-    public DistributionTrait distribution(Join join, RelMetadataQuery mq) {
+    public DistributionTrait getDistributionTrait(Join join, RelMetadataQuery 
mq) {
         return join(mq, join.getLeft(), join.getRight(), join.getCondition());
     }
 
-    public DistributionTrait distribution(RelSubset rel, RelMetadataQuery mq) {
+    public DistributionTrait getDistributionTrait(RelSubset rel, 
RelMetadataQuery mq) {
         return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
     }
 
     public static DistributionTrait project(RelMetadataQuery mq, RelNode 
input, List<RexNode> projects) {
-        DistributionTrait trait = distribution_(input, mq);
+        DistributionTrait trait = distribution(input, mq);
 
         if (trait.type() == HASH) {
             ImmutableIntList keys = trait.keys();
@@ -115,14 +115,14 @@ public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.Trai
     }
 
     public static DistributionTrait filter(RelMetadataQuery mq, RelNode input, 
RexNode condition) {
-        return distribution_(input, mq);
+        return distribution(input, mq);
     }
 
     public static DistributionTrait join(RelMetadataQuery mq, RelNode left, 
RelNode right, RexNode condition) {
-        return distribution_(left, mq);
+        return distribution(left, mq);
     }
 
-    public static DistributionTrait distribution_(RelNode rel, 
RelMetadataQuery mq) {
-        return rel.metadata(IgniteMetadata.TraitDistribution.class, 
mq).distribution();
+    public static DistributionTrait distribution(RelNode rel, RelMetadataQuery 
mq) {
+        return RelMetadataQueryEx.wrap(mq).getDistributionTrait(rel);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
index 1c80b96..255e06c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
@@ -16,12 +16,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
@@ -31,78 +26,80 @@ import org.apache.calcite.rel.metadata.MetadataHandler;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TaskDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.SourceDistributionMetadata;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
  */
-public class IgniteMdSourceDistribution implements 
MetadataHandler<TaskDistribution> {
+public class IgniteMdSourceDistribution implements 
MetadataHandler<SourceDistributionMetadata> {
     public static final RelMetadataProvider SOURCE =
-        
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.TASK_DISTRIBUTION.method(),
 new IgniteMdSourceDistribution());
+        
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.SOURCE_DISTRIBUTION.method(),
 new IgniteMdSourceDistribution());
 
-    @Override public MetadataDef<TaskDistribution> getDef() {
-        return TaskDistribution.DEF;
+    @Override public MetadataDef<SourceDistributionMetadata> getDef() {
+        return SourceDistributionMetadata.DEF;
     }
 
-    public SourceDistribution distribution(RelNode rel, RelMetadataQuery mq) {
+    public SourceDistribution getSourceDistribution(RelNode rel, 
RelMetadataQuery mq) {
         throw new AssertionError();
     }
 
-    public SourceDistribution distribution(RelSubset rel, RelMetadataQuery mq) 
{
+    public SourceDistribution getSourceDistribution(RelSubset rel, 
RelMetadataQuery mq) {
         throw new AssertionError();
     }
 
-    public SourceDistribution distribution(SingleRel rel, RelMetadataQuery mq) 
{
-        return distribution_(rel.getInput(), mq);
+    public SourceDistribution getSourceDistribution(SingleRel rel, 
RelMetadataQuery mq) {
+        return distribution(rel.getInput(), mq);
     }
 
-    public SourceDistribution distribution(BiRel rel, RelMetadataQuery mq) {
-        return merge(distribution_(rel.getLeft(), mq), 
distribution_(rel.getRight(), mq));
+    public SourceDistribution getSourceDistribution(BiRel rel, 
RelMetadataQuery mq) {
+        mq = RelMetadataQueryEx.wrap(mq);
+
+        return merge(distribution(rel.getLeft(), mq), 
distribution(rel.getRight(), mq));
     }
 
-    public SourceDistribution distribution(Receiver rel, RelMetadataQuery mq) {
+    public SourceDistribution getSourceDistribution(Receiver rel, 
RelMetadataQuery mq) {
         SourceDistribution res = new SourceDistribution();
 
-        res.remoteInputs.add(rel);
+        res.remoteInputs = F.asList(rel);
 
         return res;
     }
 
-    public SourceDistribution distribution(IgniteLogicalTableScan rel, 
RelMetadataQuery mq) {
+    public SourceDistribution getSourceDistribution(IgniteLogicalTableScan 
rel, RelMetadataQuery mq) {
         return rel.tableDistribution();
     }
 
-    public static SourceDistribution distribution_(RelNode rel, 
RelMetadataQuery mq) {
-        return rel.metadata(TaskDistribution.class, mq).distribution();
+    public static SourceDistribution distribution(RelNode rel, 
RelMetadataQuery mq) {
+        return RelMetadataQueryEx.wrap(mq).getSourceDistribution(rel);
     }
 
     private static SourceDistribution merge(SourceDistribution left, 
SourceDistribution right) {
         SourceDistribution res = new SourceDistribution();
 
-        res.remoteInputs = merge(left.remoteInputs, right.remoteInputs);
         res.partitionMapping = merge(left.partitionMapping, 
right.partitionMapping);
+        res.remoteInputs = merge(left.remoteInputs, right.remoteInputs);
         res.localInputs = merge(left.localInputs, right.localInputs);
 
         return res;
     }
 
-    private static <T> List<T> merge(List<T> left, List<T> right) {
+    private static PartitionsDistribution merge(PartitionsDistribution left, 
PartitionsDistribution right) {
         if (left == null)
             return right;
+        if (right == null)
+            return left;
 
-        if (right != null)
-            left.addAll(right);
-
-        return left;
+        return left.mergeWith(right);
     }
 
-    private static GridIntList merge(GridIntList left, GridIntList right) {
+    private static <T> List<T> merge(List<T> left, List<T> right) {
         if (left == null)
             return right;
 
@@ -112,44 +109,13 @@ public class IgniteMdSourceDistribution implements 
MetadataHandler<TaskDistribut
         return left;
     }
 
-    private static Map<ClusterNode, int[]> merge(Map<ClusterNode, int[]> left, 
Map<ClusterNode, int[]> right) {
+    private static GridIntList merge(GridIntList left, GridIntList right) {
         if (left == null)
             return right;
 
-        if (right == null)
-            return left;
-
-        Map<ClusterNode, int[]> res = new HashMap<>(Math.min(left.size(), 
right.size()));
-
-        Set<ClusterNode> keys = new HashSet<>(left.keySet());
-
-        keys.retainAll(right.keySet());
-
-        for (ClusterNode node : keys) {
-            int[] leftParts  = left.get(node);
-            int[] rightParts = right.get(node);
-
-            int[] nodeParts = new int[Math.min(leftParts.length, 
rightParts.length)];
-
-            int i = 0, j = 0, k = 0;
-
-            while (i < leftParts.length && j < rightParts.length) {
-                if (leftParts[i] < rightParts[j])
-                    i++;
-                else if (rightParts[j] < leftParts[i])
-                    j++;
-                else {
-                    nodeParts[k++] = leftParts[i];
-
-                    i++;
-                    j++;
-                }
-            }
-
-            if (k > 0)
-                res.put(node, k < nodeParts.length ? Arrays.copyOf(nodeParts, 
k) : nodeParts);
-        }
+        if (right != null)
+            left.addAll(right);
 
-        return res;
+        return left;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index 3b5445e..090ef53 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -40,27 +40,27 @@ public class IgniteMetadata {
                 IgniteMdSourceDistribution.SOURCE,
                 DefaultRelMetadataProvider.INSTANCE));
 
-    public interface TraitDistribution extends Metadata {
-        MetadataDef<TraitDistribution> DEF = 
MetadataDef.of(TraitDistribution.class, TraitDistribution.Handler.class, 
IgniteMethod.DISTRIBUTION.method());
+    public interface DistributionTraitMetadata extends Metadata {
+        MetadataDef<DistributionTraitMetadata> DEF = 
MetadataDef.of(DistributionTraitMetadata.class, 
DistributionTraitMetadata.Handler.class, 
IgniteMethod.DISTRIBUTION_TRAIT.method());
 
         /** Determines how the rows are distributed. */
-        DistributionTrait distribution();
+        DistributionTrait getDistributionTrait();
 
         /** Handler API. */
-        interface Handler extends MetadataHandler<TraitDistribution> {
-            DistributionTrait distribution(RelNode r, RelMetadataQuery mq);
+        interface Handler extends MetadataHandler<DistributionTraitMetadata> {
+            DistributionTrait getDistributionTrait(RelNode r, RelMetadataQuery 
mq);
         }
     }
 
-    public interface TaskDistribution extends Metadata {
-        MetadataDef<TaskDistribution> DEF = 
MetadataDef.of(TaskDistribution.class, TaskDistribution.Handler.class, 
IgniteMethod.TASK_DISTRIBUTION.method());
+    public interface SourceDistributionMetadata extends Metadata {
+        MetadataDef<SourceDistributionMetadata> DEF = 
MetadataDef.of(SourceDistributionMetadata.class, 
SourceDistributionMetadata.Handler.class, 
IgniteMethod.SOURCE_DISTRIBUTION.method());
 
         /** Determines how the rows are distributed. */
-        SourceDistribution distribution();
+        SourceDistribution getSourceDistribution();
 
         /** Handler API. */
-        interface Handler extends MetadataHandler<TaskDistribution> {
-            SourceDistribution distribution(RelNode r, RelMetadataQuery mq);
+        interface Handler extends MetadataHandler<SourceDistributionMetadata> {
+            SourceDistribution getSourceDistribution(RelNode r, 
RelMetadataQuery mq);
         }
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
new file mode 100644
index 0000000..8c10a51
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+public class RelMetadataQueryEx extends RelMetadataQuery {
+    private static final RelMetadataQueryEx PROTO = new RelMetadataQueryEx();
+    private static final JaninoRelMetadataProvider PROVIDER = 
JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER);
+
+    private IgniteMetadata.DistributionTraitMetadata.Handler 
distributionTraitHandler;
+    private IgniteMetadata.SourceDistributionMetadata.Handler 
sourceDistributionHandler;
+
+    private RelMetadataQueryEx() {
+        super(JaninoRelMetadataProvider.DEFAULT, RelMetadataQuery.EMPTY);
+
+        distributionTraitHandler = 
initialHandler(IgniteMetadata.DistributionTraitMetadata.Handler.class);
+        sourceDistributionHandler = 
initialHandler(IgniteMetadata.SourceDistributionMetadata.Handler.class);
+    }
+
+    protected RelMetadataQueryEx(JaninoRelMetadataProvider metadataProvider, 
RelMetadataQueryEx prototype) {
+        super(metadataProvider, prototype);
+
+        distributionTraitHandler = prototype.distributionTraitHandler;
+        sourceDistributionHandler = prototype.sourceDistributionHandler;
+    }
+
+    protected RelMetadataQueryEx(JaninoRelMetadataProvider metadataProvider, 
RelMetadataQuery parent) {
+        super(metadataProvider, parent);
+
+        distributionTraitHandler = PROTO.distributionTraitHandler;
+        sourceDistributionHandler = PROTO.sourceDistributionHandler;
+    }
+
+    @SuppressWarnings("MethodOverridesStaticMethodOfSuperclass")
+    public static RelMetadataQueryEx instance() {
+        return new RelMetadataQueryEx(PROVIDER, PROTO);
+    }
+
+    public static RelMetadataQueryEx wrap(@NotNull RelMetadataQuery mq) {
+        if (mq.getClass() == RelMetadataQueryEx.class)
+            return (RelMetadataQueryEx) mq;
+
+        return new RelMetadataQueryEx(PROVIDER, mq);
+    }
+
+    public SourceDistribution getSourceDistribution(RelNode rel) {
+        for (;;) {
+            try {
+                return sourceDistributionHandler.getSourceDistribution(rel, 
this);
+            } catch (JaninoRelMetadataProvider.NoHandler e) {
+                sourceDistributionHandler = revise(e.relClass, 
IgniteMetadata.SourceDistributionMetadata.DEF);
+            }
+        }
+    }
+
+    public DistributionTrait getDistributionTrait(RelNode rel) {
+        for (;;) {
+            try {
+                return distributionTraitHandler.getDistributionTrait(rel, 
this);
+            } catch (JaninoRelMetadataProvider.NoHandler e) {
+                distributionTraitHandler = revise(e.relClass, 
IgniteMetadata.DistributionTraitMetadata.DEF);
+            }
+        }
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
index 995ad70..f15bf81 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
@@ -32,7 +32,5 @@ public interface IgniteRel extends RelNode {
         }
     };
 
-    default void visit(IgniteVisitor visitor) {
-        visitor.visit(this);
-    }
+    <T> T accept(IgniteVisitor<T> visitor);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
index a89c034..542696d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
@@ -17,9 +17,29 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
+import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalExchange;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalFilter;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalProject;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+
 /**
  *
  */
-public interface IgniteVisitor {
-    public void visit(IgniteRel rel);
+public interface IgniteVisitor<T> {
+    T visitExchange(IgniteLogicalExchange exchange);
+
+    T visitFilter(IgniteLogicalFilter filter);
+
+    T visitJoin(IgniteLogicalJoin join);
+
+    T visitProject(IgniteLogicalProject project);
+
+    T visitTableScan(IgniteLogicalTableScan tableScan);
+
+    T visitReceiver(Receiver receiver);
+
+    T visitSender(Sender sender);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
index ed9bee6..3a577e9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Util;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 
@@ -62,4 +63,8 @@ public final class IgniteLogicalExchange extends SingleRel 
implements IgniteRel
     @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
         return new IgniteLogicalExchange(getCluster(), traitSet, sole(inputs));
     }
+
+    @Override public <T> T accept(IgniteVisitor<T> visitor) {
+        return visitor.visitExchange(this);
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
index 0bc0fbf..34bbe3d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rex.RexNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 
 public final class IgniteLogicalFilter extends Filter implements IgniteRel {
   private final Set<CorrelationId> variablesSet;
@@ -59,4 +60,8 @@ public final class IgniteLogicalFilter extends Filter 
implements IgniteRel {
 
     return new IgniteLogicalFilter(filter.getCluster(), traits, input, 
filter.getCondition(), filter.getVariablesSet());
   }
+
+  @Override public <T> T accept(IgniteVisitor<T> visitor) {
+    return visitor.visitFilter(this);
+  }
 }
\ No newline at end of file
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
index 215b676..95699c7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 
 public final class IgniteLogicalJoin extends Join implements IgniteRel {
   private final boolean semiJoinDone;
@@ -58,4 +59,8 @@ public final class IgniteLogicalJoin extends Join implements 
IgniteRel {
   @Override public boolean isSemiJoinDone() {
     return semiJoinDone;
   }
+
+  @Override public <T> T accept(IgniteVisitor<T> visitor) {
+    return visitor.visitJoin(this);
+  }
 }
\ No newline at end of file
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
index 89a992b..56c0f50 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 
 public final class IgniteLogicalProject extends Project implements IgniteRel {
   public IgniteLogicalProject(
@@ -49,4 +50,8 @@ public final class IgniteLogicalProject extends Project 
implements IgniteRel {
 
     return new IgniteLogicalProject(project.getCluster(), traits, input, 
project.getProjects(), project.getRowType());
   }
+
+  @Override public <T> T accept(IgniteVisitor<T> visitor) {
+    return visitor.visitProject(this);
+  }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
index 8698573..ca2ae90 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
@@ -23,6 +23,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
 
@@ -40,4 +41,8 @@ public final class IgniteLogicalTableScan extends TableScan 
implements IgniteRel
   public SourceDistribution tableDistribution() {
      return 
getTable().unwrap(IgniteTable.class).tableDistribution(getCluster().getPlanner().getContext());
   }
+
+  @Override public <T> T accept(IgniteVisitor<T> visitor) {
+    return visitor.visitTableScan(this);
+  }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 3cbda12..bccf570 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -27,9 +27,9 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTable;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import 
org.apache.ignite.internal.processors.query.calcite.exchange.DistributionRegistry;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
@@ -78,13 +78,16 @@ public class IgniteTable extends AbstractTable implements 
TranslatableTable {
     public SourceDistribution tableDistribution(Context context) {
         SourceDistribution res = new SourceDistribution();
 
-        res.localInputs = new GridIntList();
-        res.localInputs.add(CU.cacheId(cacheName));
+        GridIntList localInputs = new GridIntList();
 
-        DistributionRegistry registry = 
context.unwrap(DistributionRegistry.class);
+        localInputs.add(CU.cacheId(cacheName));
+
+        res.localInputs = localInputs;
+
+        PartitionsDistributionRegistry registry = 
context.unwrap(PartitionsDistributionRegistry.class);
         AffinityTopologyVersion topVer = 
context.unwrap(AffinityTopologyVersion.class);
 
-        res.partitionMapping = 
registry.partitionMapping(CU.cacheId(cacheName), topVer);
+        res.partitionMapping = 
registry.partitionsDistribution(CU.cacheId(cacheName), topVer);
 
         return res;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
new file mode 100644
index 0000000..2a7707c
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.splitter;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+public class PartitionsDistribution {
+    public static final int[] ALL_PARTS = new int[0];
+    private static final int[] EMPTY = new int[0];
+
+    public boolean excessive;
+    public int parts;
+    public int[] nodes;
+    public int[][] nodeParts;
+
+    public PartitionsDistribution mergeWith(PartitionsDistribution other) {
+        if (parts != 0 && other.parts != 0 && parts != other.parts)
+            throw new IllegalStateException("Non-collocated query fragment.");
+
+        int[] nodes0 = null;
+        int[][] nodeParts0 = null;
+
+        int i = 0, j = 0, k = 0;
+
+        while (i < nodes0.length && j < other.nodes.length) {
+            if (nodes[i] < other.nodes[j])
+                i++;
+            else if (other.nodes[j] < nodes[i])
+                j++;
+            else {
+                int[] mergedParts = merge(nodeParts[i], other.nodeParts[j]);
+
+                if (mergedParts.length > 0) {
+                    if (nodes0 == null) {
+                        int len = Math.min(nodes.length, other.nodes.length);
+
+                        nodes0 = new int[len];
+                        nodeParts0 = new int[len][];
+                    }
+
+                    nodes0[k] = nodes[i];
+                    nodeParts0[k] = mergedParts;
+
+                    k++;
+                }
+
+                i++;
+                j++;
+            }
+        }
+
+        PartitionsDistribution res = new PartitionsDistribution();
+
+        res.excessive = excessive && other.excessive;
+        res.parts = Math.max(parts, other.parts);
+        res.nodes = nodes0.length == k ? nodes0 : Arrays.copyOf(nodes0, k);
+        res.nodeParts = nodeParts0.length == k ? nodeParts0 : 
Arrays.copyOf(nodeParts0, k);
+
+        check(res);
+
+        return res;
+    }
+
+    private void check(PartitionsDistribution res) {
+        if (res.parts == 0)
+            return; // Only receivers and/or replicated caches in task subtree.
+
+        BitSet check = new BitSet(res.parts);
+
+        int checkedParts = 0;
+
+        for (int[] nodePart : res.nodeParts) {
+            for (int p : nodePart) {
+                if (!check.get(p)) {
+                    check.set(p);
+                    checkedParts++;
+                }
+            }
+        }
+
+        if (checkedParts < res.parts)
+            throw new IllegalStateException("Failed to collocate used 
caches.");
+    }
+
+    private int[] merge(int[] left, int[] right) {
+        if (left == ALL_PARTS)
+            return right;
+        if (right == ALL_PARTS)
+            return left;
+
+        int[] nodeParts = null;
+
+        int i = 0, j = 0, k = 0;
+
+        while (i < left.length && j < right.length) {
+            if (left[i] < right[j])
+                i++;
+            else if (right[j] < left[i])
+                j++;
+            else {
+                if (nodeParts == null)
+                    nodeParts = new int[Math.min(left.length, right.length)];
+
+                nodeParts[k++] = left[i];
+
+                i++;
+                j++;
+            }
+        }
+
+        if (k == 0)
+            return EMPTY;
+
+        return nodeParts.length == k ? nodeParts : Arrays.copyOf(nodeParts, k);
+    }
+
+    public PartitionsDistribution deduplicate() {
+        if (!excessive)
+            return this;
+
+        Map<Integer, Integer> map = new HashMap<>();
+
+        int idx = 0; int[] idxs = new int[nodeParts.length];
+        while (map.size() < parts) {
+            int[] nodePart = nodeParts[idx];
+
+            int j = idxs[idx];
+
+            while (j < nodePart.length) {
+                if (map.putIfAbsent(nodePart[j], nodes[idx]) == null || j + 1 
== nodePart.length) {
+                    idxs[idx] = j + 1;
+
+                    break;
+                }
+
+                j++;
+            }
+
+            idx = (idx + 1) % nodes.length;
+        }
+
+        int[] nodes0 = new int[nodes.length]; int[][] nodeParts0 = new 
int[nodes.length][];
+
+        int k = 0;
+
+        for (int i = 0; i < nodes.length; i++) {
+            int j = 0;
+
+            int[] nodePart0 = null;
+
+            for (int p : nodeParts[i]) {
+                if (map.get(p) == nodes[i]) {
+                    if (nodePart0 == null)
+                        nodePart0 = new int[nodeParts[i].length];
+
+                    nodePart0[j++] = p;
+                }
+            }
+
+            if (nodePart0 != null) {
+                nodes0[k] = nodes[i];
+                nodeParts0[k] = nodePart0.length == j ? nodePart0 : 
Arrays.copyOf(nodePart0, j);
+
+                k++;
+            }
+        }
+
+        PartitionsDistribution res = new PartitionsDistribution();
+
+        res.parts = parts;
+        res.nodes = nodes0.length == k ? nodes0 : Arrays.copyOf(nodes0, k);
+        res.nodeParts = nodeParts0.length == k ? nodeParts0 : 
Arrays.copyOf(nodeParts0, k);
+
+        return res;
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
similarity index 74%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
index a2b0942..f234ae7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
@@ -14,15 +14,13 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.exchange;
+package org.apache.ignite.internal.processors.query.calcite.splitter;
 
-import java.util.Map;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 
 /**
  *
  */
-public interface DistributionRegistry {
-    Map<ClusterNode, int[]> partitionMapping(int cacheId, 
AffinityTopologyVersion topVer);
+public interface PartitionsDistributionRegistry {
+    PartitionsDistribution partitionsDistribution(int cacheId, 
AffinityTopologyVersion topVer);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
index eded34e..02a5514 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
@@ -17,8 +17,6 @@
 package org.apache.ignite.internal.processors.query.calcite.splitter;
 
 import java.util.List;
-import java.util.Map;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
 import org.apache.ignite.internal.util.GridIntList;
 
@@ -26,7 +24,7 @@ import org.apache.ignite.internal.util.GridIntList;
  *
  */
 public class SourceDistribution {
-    public Map<ClusterNode, int[]> partitionMapping; // partition filter for 
unstable topology
+    public PartitionsDistribution partitionMapping; // partitions mapping.
     public List<Receiver> remoteInputs; // remote inputs to notify particular 
senders about final task distribution
     public GridIntList localInputs; // involved caches, used for partitions 
reservation
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
index f405939..1fbe086 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
@@ -28,5 +28,14 @@ public class SplitTask {
     public SplitTask(RelNode root, SourceDistribution distribution) {
         this.distribution = distribution;
         this.root = root;
+
+        init();
+    }
+
+    private void init() {
+        PartitionsDistribution mapping = distribution.partitionMapping;
+
+        if (mapping != null && mapping.excessive)
+            distribution.partitionMapping = mapping.deduplicate();
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
index 4ca6716..22527b3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
@@ -16,11 +16,91 @@
 
 package org.apache.ignite.internal.processors.query.calcite.splitter;
 
-import org.apache.calcite.rel.RelShuttleImpl;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdSourceDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalExchange;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalFilter;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalProject;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
 
 /**
  *
  */
-public class TaskSplitter extends RelShuttleImpl {
+public class TaskSplitter implements IgniteVisitor<IgniteRel> {
+    /** */
+    private List<IgniteRel> roots;
 
+    public List<SplitTask> go(IgniteRel root) {
+        roots = new ArrayList<>();
+
+        roots.add(root.accept(this));
+
+        ArrayList<SplitTask> splitTasks = new ArrayList<>(roots.size());
+
+        RelMetadataQuery mq = RelMetadataQueryEx.instance();
+
+        for (IgniteRel igniteRel : roots) {
+            splitTasks.add(new SplitTask(igniteRel, 
IgniteMdSourceDistribution.distribution(igniteRel, mq)));
+        }
+
+        return splitTasks;
+    }
+
+    @Override public IgniteRel visitExchange(IgniteLogicalExchange exchange) {
+        RelOptCluster cluster = exchange.getCluster();
+        RelNode input = exchange.getInput();
+
+        Sender sender = new Sender(cluster, input.getTraitSet(), 
visitChildren(input));
+
+        roots.add(sender);
+
+        return new Receiver(cluster, exchange.getTraitSet(), sender);
+    }
+
+    @Override public IgniteRel visitFilter(IgniteLogicalFilter filter) {
+        return visitChildren(filter);
+    }
+
+    @Override public IgniteRel visitJoin(IgniteLogicalJoin join) {
+        return visitChildren(join);
+    }
+
+    @Override public IgniteRel visitProject(IgniteLogicalProject project) {
+        return visitChildren(project);
+    }
+
+    @Override public IgniteRel visitTableScan(IgniteLogicalTableScan 
tableScan) {
+        return tableScan;
+    }
+
+    @Override public IgniteRel visitReceiver(Receiver receiver) {
+        throw new AssertionError("An attempt to split an already split task.");
+    }
+
+    @Override public IgniteRel visitSender(Sender sender) {
+        throw new AssertionError("An attempt to split an already split task.");
+    }
+
+    private IgniteRel visitChildren(RelNode parent) {
+        List<RelNode> inputs = parent.getInputs();
+
+        for (int i = 0; i < inputs.size(); i++) {
+            IgniteRel input = (IgniteRel) inputs.get(i);
+            IgniteRel rel = input.accept(this);
+
+            if (rel != input)
+                parent.replaceInput(i, rel);
+        }
+        return (IgniteRel) parent;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
index b6a6703..63a5c2b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
@@ -19,15 +19,15 @@ package 
org.apache.ignite.internal.processors.query.calcite.util;
 
 import java.lang.reflect.Method;
 import org.apache.calcite.linq4j.tree.Types;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TaskDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TraitDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.DistributionTraitMetadata;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.SourceDistributionMetadata;
 
 /**
  *
  */
 public enum IgniteMethod {
-    DISTRIBUTION(TraitDistribution.class, "distribution"),
-    TASK_DISTRIBUTION(TaskDistribution.class, "distribution");
+    DISTRIBUTION_TRAIT(DistributionTraitMetadata.class, 
"getDistributionTrait"),
+    SOURCE_DISTRIBUTION(SourceDistributionMetadata.class, 
"getSourceDistribution");
 
     private final Method method;
 
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 028bb9c..884b9ca 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite;
 
 
+import java.util.List;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.ConventionTraitDef;
@@ -37,8 +38,13 @@ import 
org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
+import 
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry;
+import org.apache.ignite.internal.processors.query.calcite.splitter.SplitTask;
+import 
org.apache.ignite.internal.processors.query.calcite.splitter.TaskSplitter;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -54,6 +60,9 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     private static CalciteQueryProcessor proc;
     private static SchemaPlus schema;
 
+    private static PartitionsDistribution developerDistribution;
+    private static PartitionsDistribution projectDistribution;
+
     @BeforeClass
     public static void setupClass() {
         proc = new CalciteQueryProcessor();
@@ -71,6 +80,12 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 .field("cityId", Integer.class)
                 .build()));
 
+        developerDistribution = new PartitionsDistribution();
+
+        developerDistribution.parts = 5;
+        developerDistribution.nodes = new int[]{0,1,2};
+        developerDistribution.nodeParts = new int[][]{{1,2},{3,4},{5}};
+
         publicSchema.addTable("Project", new IgniteTable("Project", "Project",
             RowType.builder()
                 .keyField("id", Integer.class, true)
@@ -78,6 +93,13 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 .field("ver", Integer.class)
                 .build()));
 
+        projectDistribution = new PartitionsDistribution();
+
+        projectDistribution.excessive = true;
+        projectDistribution.parts = 5;
+        projectDistribution.nodes = new int[]{0,1,2};
+        projectDistribution.nodeParts = new int[][]{{1,2,3,5},{2,3,4},{1,4,5}};
+
         publicSchema.addTable("Country", new IgniteTable("Country", "Country",
             RowType.builder()
                 .keyField("id", Integer.class, true)
@@ -106,7 +128,16 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
             "ON d.projectId = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        Context ctx = proc.context(Contexts.of(schema, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+        PartitionsDistributionRegistry registry = (id, top) -> {
+            if (id == CU.cacheId("Developer"))
+                return developerDistribution;
+            if (id == CU.cacheId("Project"))
+                return projectDistribution;
+
+            throw new AssertionError("Unexpected cache id:" + id);
+        };
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
 
         assertNotNull(ctx);
 
@@ -149,5 +180,9 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
         }
 
         assertNotNull(relRoot);
+
+        List<SplitTask> fragments = new TaskSplitter().go((IgniteRel) 
relRoot.rel);
+
+        assertNotNull(fragments);
     }
 }
\ No newline at end of file

Reply via email to