This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 9125a61  pending
9125a61 is described below

commit 9125a610a1e560ee89db2d655f7ec726261cf975
Author: Igor Seliverstov <[email protected]>
AuthorDate: Thu Nov 7 19:04:59 2019 +0300

    pending
---
 .../query/calcite/CalciteQueryProcessor.java       |   2 +
 .../query/calcite/cluster/RegistryImpl.java        | 173 ++++++++
 .../DistributionRegistry.java}                     |  10 +-
 .../FragmentLocation.java}                         |  12 +-
 .../calcite/metadata/IgniteMdDistribution.java     |   7 +-
 .../calcite/metadata/IgniteMdFragmentLocation.java | 161 +++++++
 .../metadata/IgniteMdSourceDistribution.java       | 128 ------
 .../query/calcite/metadata/IgniteMetadata.java     |  17 +-
 .../query/calcite/metadata/Location.java           | 165 ++++++++
 .../LocationMappingException.java}                 |  11 +-
 .../LocationRegistry.java}                         |  10 +-
 .../OptimisticPlanningException.java}              |  18 +-
 .../query/calcite/metadata/RelMetadataQueryEx.java |  47 +--
 .../processors/query/calcite/rel/CloneContext.java |  48 ---
 .../query/calcite/rel/IgniteExchange.java          |   4 -
 .../processors/query/calcite/rel/IgniteFilter.java |   4 -
 .../query/calcite/rel/IgniteHashJoin.java          |   4 -
 .../query/calcite/rel/IgniteProject.java           |   6 +-
 .../processors/query/calcite/rel/IgniteRel.java    |   2 -
 .../query/calcite/rel/IgniteTableScan.java         |  10 +-
 .../processors/query/calcite/rel/Receiver.java     |  23 +-
 .../processors/query/calcite/rel/Sender.java       |  48 ++-
 .../query/calcite/schema/IgniteTable.java          |  33 +-
 .../query/calcite/splitter/Fragment.java           |  51 ++-
 .../calcite/splitter/PartitionsDistribution.java   | 196 ---------
 .../query/calcite/splitter/QueryPlan.java          |  56 ++-
 .../query/calcite/splitter/Splitter.java           |  48 ++-
 ...utionFunction.java => DestinationFunction.java} |   2 +-
 ...actory.java => DestinationFunctionFactory.java} |   6 +-
 .../query/calcite/trait/DistributionTrait.java     |  72 ++--
 .../query/calcite/trait/DistributionTraitImpl.java |  71 ----
 ...nFunctionFactory.java => DistributionType.java} |  30 +-
 .../query/calcite/trait/IgniteDistributions.java   |  70 +++-
 .../processors/query/calcite/util/Commons.java     |  93 +++++
 .../Edge.java}                                     |  29 +-
 .../query/calcite/util/IgniteMethod.java           |   4 +-
 .../query/calcite/util/IgniteRelShuttle.java       |  83 ++++
 .../query/calcite/CalciteQueryProcessorTest.java   | 461 ++++++++++++++++++---
 38 files changed, 1483 insertions(+), 732 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index b036397..f188665 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -37,6 +37,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryContext;
 import org.apache.ignite.internal.processors.query.QueryEngine;
+import 
org.apache.ignite.internal.processors.query.calcite.cluster.RegistryImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.DistributedExecution;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
@@ -148,6 +149,7 @@ public class CalciteQueryProcessor implements QueryEngine {
         return Contexts.chain(ctx, config.getContext(),
             Contexts.of(
                 new Query(query, params),
+                new RegistryImpl(kernalContext),
                 provided(ctx, SchemaPlus.class, schemaHolder::schema),
                 provided(ctx, AffinityTopologyVersion.class, 
this::readyAffinityVersion)));
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
new file mode 100644
index 0000000..4ad7d48
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.cluster;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.ToIntFunction;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
+import org.apache.ignite.internal.processors.query.calcite.metadata.Location;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class RegistryImpl implements DistributionRegistry, LocationRegistry {
+    private final GridKernalContext ctx;
+
+    public RegistryImpl(GridKernalContext ctx) {
+        this.ctx = ctx;
+    }
+
+    @Override public DistributionTrait distribution(int cacheId, RowType 
rowType) {
+        if (ctx.cache().context().cacheContext(cacheId).isReplicated())
+            return IgniteDistributions.broadcast();
+
+        Object key = 
ctx.cache().context().affinity().affinity(cacheId).similarAffinityKey();
+        ToIntFunction<Object> partFun = 
ctx.cache().context().cacheContext(cacheId).affinity()::partition;
+
+        return IgniteDistributions.hash(rowType.distributionKeys(), new 
AffinityFactory(partFun, key));
+    }
+
+    @Override public Location single(AffinityTopologyVersion topVer) {
+        return new 
Location(Collections.singletonList(ctx.discovery().localNode()), null, (byte) 
0);
+    }
+
+    @Override public Location random(AffinityTopologyVersion topVer) {
+        return new Location(ctx.discovery().discoCache(topVer).serverNodes(), 
null, (byte) 0);
+    }
+
+    @Override public Location distributed(int cacheId, AffinityTopologyVersion 
topVer) {
+        GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
+
+        return cctx.isReplicated() ? replicatedLocation(cctx, topVer) : 
partitionedLocation(cctx, topVer);
+    }
+
+    private Location partitionedLocation(GridCacheContext cctx, 
AffinityTopologyVersion topVer) {
+        byte flags = Location.HAS_PARTITIONED_CACHES;
+
+        List<List<ClusterNode>> assignments = 
cctx.affinity().assignments(topVer);
+
+        if (cctx.config().getWriteSynchronizationMode() == 
CacheWriteSynchronizationMode.PRIMARY_SYNC) {
+            List<List<ClusterNode>> assignments0 = new 
ArrayList<>(assignments.size());
+
+            for (List<ClusterNode> partNodes : assignments)
+                assignments0.add(F.isEmpty(partNodes) ? 
Collections.emptyList() : Collections.singletonList(F.first(partNodes)));
+
+            assignments = assignments0;
+        }
+        else if (!cctx.topology().rebalanceFinished(topVer)) {
+            flags |= Location.HAS_MOVING_PARTITIONS;
+
+            List<List<ClusterNode>> assignments0 = new 
ArrayList<>(assignments.size());
+
+            for (int part = 0; part < assignments.size(); part++) {
+                List<ClusterNode> partNodes = assignments0.get(part), 
partNodes0 = new ArrayList<>(partNodes.size());
+
+                for (ClusterNode partNode : partNodes) {
+                    if (cctx.topology().partitionState(partNode.id(), part) == 
GridDhtPartitionState.OWNING)
+                        partNodes0.add(partNode);
+                }
+
+                assignments0.add(partNodes0);
+            }
+
+            assignments = assignments0;
+        }
+
+        return new Location(null, assignments, flags);
+    }
+
+    private Location replicatedLocation(GridCacheContext cctx, 
AffinityTopologyVersion topVer) {
+        byte flags = Location.HAS_REPLICATED_CACHES;
+
+        if (cctx.config().getNodeFilter() != null)
+            flags |= Location.PARTIALLY_REPLICATED;
+
+        List<ClusterNode> nodes = 
cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.cacheId());
+
+        if (!cctx.topology().rebalanceFinished(topVer)) {
+            flags |= Location.PARTIALLY_REPLICATED;
+
+            List<ClusterNode> nodes0 = new ArrayList<>(nodes.size());
+
+            int parts = cctx.topology().partitions();
+
+            parent:
+            for (ClusterNode node : nodes) {
+                for (int part = 0; part < parts; part++) {
+                    if (cctx.topology().partitionState(node.id(), part) != 
GridDhtPartitionState.OWNING)
+                        continue parent;
+                }
+
+                nodes0.add(node);
+            }
+
+            nodes = nodes0;
+        }
+
+        return new Location(nodes, null, flags);
+    }
+
+    private static class AffinityFactory implements DestinationFunctionFactory 
{
+        private final ToIntFunction<Object> partFun;
+        private final Object key;
+
+        AffinityFactory(ToIntFunction<Object> partFun, Object key) {
+            this.partFun = partFun;
+            this.key = key;
+        }
+
+        @Override public DestinationFunction create(FragmentLocation 
targetLocation, ImmutableIntList keys) {
+            assert keys.size() == 1 && targetLocation.location != null;
+
+            return create(targetLocation.location, partFun, keys.getInt(0));
+        }
+
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            return key.equals(((AffinityFactory) o).key);
+        }
+
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        private static DestinationFunction create(Location location, 
ToIntFunction<Object> partFun, int affField) {
+            return row -> location.nodes(partFun.applyAsInt(((Object[]) 
row)[affField]));
+        }
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
similarity index 67%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
index 76af490..32cf357 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
@@ -14,14 +14,14 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import java.util.List;
-import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 
 /**
  *
  */
-public interface DistributionFunction {
-    List<ClusterNode> destination(Object row);
+public interface DistributionRegistry {
+    DistributionTrait distribution(int cacheId, RowType rowType);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
similarity index 68%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
index 618b3d8..7106e81 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
@@ -14,17 +14,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.metadata;
 
 import java.util.List;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.util.GridIntList;
 
 /**
  *
  */
-public class SourceDistribution {
-    public PartitionsDistribution partitionMapping; // partitions mapping.
-    public List<Receiver> remoteInputs; // remote inputs to notify particular 
senders about final task distribution
-    public GridIntList localInputs; // involved caches, used for partitions 
reservation
+public class FragmentLocation {
+    public Location location;
+    public List<Receiver> remoteInputs;
+    public GridIntList localInputs;
+    public AffinityTopologyVersion topVer;
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
index f644d0e..b2f91a9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
@@ -42,14 +42,15 @@ import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTra
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait.DistributionType.HASH;
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
 
 /**
  *
  */
 public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.DistributionTraitMetadata> {
     public static final RelMetadataProvider SOURCE =
-        
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.DISTRIBUTION_TRAIT.method(),
 new IgniteMdDistribution());
+        ReflectiveRelMetadataProvider.reflectiveSource(
+            IgniteMethod.DISTRIBUTION_TRAIT.method(), new 
IgniteMdDistribution());
 
     @Override public MetadataDef<IgniteMetadata.DistributionTraitMetadata> 
getDef() {
         return IgniteMetadata.DistributionTraitMetadata.DEF;
@@ -108,7 +109,7 @@ public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.Dist
                 newKeys.add(mapped);
             }
 
-            return IgniteDistributions.hash(newKeys, trait.functionFactory());
+            return IgniteDistributions.hash(newKeys, 
trait.destinationFunctionFactory());
         }
 
         return trait;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
new file mode 100644
index 0000000..0c20e59
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentLocationMetadata;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.Edge;
+import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class IgniteMdFragmentLocation implements 
MetadataHandler<FragmentLocationMetadata> {
+    public static final RelMetadataProvider SOURCE =
+        ReflectiveRelMetadataProvider.reflectiveSource(
+            IgniteMethod.FRAGMENT_LOCATION.method(), new 
IgniteMdFragmentLocation());
+
+    @Override public MetadataDef<FragmentLocationMetadata> getDef() {
+        return FragmentLocationMetadata.DEF;
+    }
+
+    public FragmentLocation getLocation(RelNode rel, RelMetadataQuery mq) {
+        throw new AssertionError();
+    }
+
+    public FragmentLocation getLocation(RelSubset rel, RelMetadataQuery mq) {
+        throw new AssertionError();
+    }
+
+    public FragmentLocation getLocation(SingleRel rel, RelMetadataQuery mq) {
+        return location(rel.getInput(), mq);
+    }
+
+    public FragmentLocation getLocation(Sender rel, RelMetadataQuery mq) {
+        return rel.location(mq);
+    }
+
+    public FragmentLocation getLocation(BiRel rel, RelMetadataQuery mq) {
+        mq = RelMetadataQueryEx.wrap(mq);
+
+        FragmentLocation leftLoc = location(rel.getLeft(), mq);
+        FragmentLocation rightLoc = location(rel.getRight(), mq);
+
+        try {
+            return merge(leftLoc, rightLoc);
+        }
+        catch (LocationMappingException e) {
+            // a replicated cache is cheaper to redistribute
+            if (!leftLoc.location.hasPartitionedCaches())
+                throw planningException(rel, e, true);
+            else if (!rightLoc.location.hasPartitionedCaches())
+                throw planningException(rel, e, false);
+
+            // both sub-trees have partitioned sources, less cost is better
+            RelOptCluster cluster = rel.getCluster();
+
+            RelOptCost leftCost = 
rel.getLeft().computeSelfCost(cluster.getPlanner(), mq);
+            RelOptCost rightCost = 
rel.getRight().computeSelfCost(cluster.getPlanner(), mq);
+
+            throw planningException(rel, e, leftCost.isLe(rightCost));
+        }
+    }
+
+    private OptimisticPlanningException planningException(BiRel rel, Exception 
cause, boolean splitLeft) {
+        String msg = "Failed to calculate physical distribution";
+
+        if (splitLeft)
+            return new OptimisticPlanningException(msg, new Edge(rel, 
rel.getLeft(), 0), cause);
+
+        return new OptimisticPlanningException(msg, new Edge(rel, 
rel.getRight(), 1), cause);
+    }
+
+    public FragmentLocation getLocation(Receiver rel, RelMetadataQuery mq) {
+        FragmentLocation res = new FragmentLocation();
+
+        res.remoteInputs = Collections.singletonList(rel);
+        res.topVer = 
rel.getCluster().getPlanner().getContext().unwrap(AffinityTopologyVersion.class);
+
+        return res;
+    }
+
+    public FragmentLocation getLocation(IgniteTableScan rel, RelMetadataQuery 
mq) {
+        return rel.location();
+    }
+
+    public static FragmentLocation location(RelNode rel, RelMetadataQuery mq) {
+        return RelMetadataQueryEx.wrap(mq).getFragmentLocation(rel);
+    }
+
+    private static FragmentLocation merge(FragmentLocation left, 
FragmentLocation right) throws LocationMappingException {
+        FragmentLocation res = new FragmentLocation();
+
+        res.location = merge(left.location, right.location);
+        res.remoteInputs = merge(left.remoteInputs, right.remoteInputs);
+        res.localInputs = merge(left.localInputs, right.localInputs);
+        res.topVer = U.firstNotNull(left.topVer, right.topVer);
+
+        return res;
+    }
+
+    private static Location merge(Location left, Location right) throws 
LocationMappingException {
+        if (left == null)
+            return right;
+        if (right == null)
+            return left;
+
+        return left.mergeWith(right);
+    }
+
+    private static <T> List<T> merge(List<T> left, List<T> right) {
+        if (left == null)
+            return right;
+        if (right == null)
+            return left;
+
+        return Commons.union(left, right);
+    }
+
+    private static GridIntList merge(GridIntList left, GridIntList right) {
+        if (left == null)
+            return right;
+
+        if (right != null)
+            left.addAll(right);
+
+        return left;
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
deleted file mode 100644
index a944db1..0000000
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.metadata;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.BiRel;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.calcite.rel.metadata.MetadataDef;
-import org.apache.calcite.rel.metadata.MetadataHandler;
-import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
-import org.apache.calcite.rel.metadata.RelMetadataProvider;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.SourceDistributionMetadata;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
-import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
-import org.apache.ignite.internal.util.GridIntList;
-
-/**
- *
- */
-public class IgniteMdSourceDistribution implements 
MetadataHandler<SourceDistributionMetadata> {
-    public static final RelMetadataProvider SOURCE =
-        
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.SOURCE_DISTRIBUTION.method(),
 new IgniteMdSourceDistribution());
-
-    @Override public MetadataDef<SourceDistributionMetadata> getDef() {
-        return SourceDistributionMetadata.DEF;
-    }
-
-    public SourceDistribution getSourceDistribution(RelNode rel, 
RelMetadataQuery mq) {
-        throw new AssertionError();
-    }
-
-    public SourceDistribution getSourceDistribution(RelSubset rel, 
RelMetadataQuery mq) {
-        throw new AssertionError();
-    }
-
-    public SourceDistribution getSourceDistribution(SingleRel rel, 
RelMetadataQuery mq) {
-        return distribution(rel.getInput(), mq);
-    }
-
-    public SourceDistribution getSourceDistribution(Sender rel, 
RelMetadataQuery mq) {
-        return rel.sourceDistribution(mq);
-    }
-
-    public SourceDistribution getSourceDistribution(BiRel rel, 
RelMetadataQuery mq) {
-        mq = RelMetadataQueryEx.wrap(mq);
-
-        return merge(distribution(rel.getLeft(), mq), 
distribution(rel.getRight(), mq));
-    }
-
-    public SourceDistribution getSourceDistribution(Receiver rel, 
RelMetadataQuery mq) {
-        SourceDistribution res = new SourceDistribution();
-
-        ArrayList<Receiver> remoteInputs = new ArrayList<>();
-        remoteInputs.add(rel);
-        res.remoteInputs = remoteInputs;
-
-        return res;
-    }
-
-    public SourceDistribution getSourceDistribution(IgniteTableScan rel, 
RelMetadataQuery mq) {
-        return rel.tableDistribution();
-    }
-
-    public static SourceDistribution distribution(RelNode rel, 
RelMetadataQuery mq) {
-        return RelMetadataQueryEx.wrap(mq).getSourceDistribution(rel);
-    }
-
-    private static SourceDistribution merge(SourceDistribution left, 
SourceDistribution right) {
-        SourceDistribution res = new SourceDistribution();
-
-        res.partitionMapping = merge(left.partitionMapping, 
right.partitionMapping);
-        res.remoteInputs = merge(left.remoteInputs, right.remoteInputs);
-        res.localInputs = merge(left.localInputs, right.localInputs);
-
-        return res;
-    }
-
-    private static PartitionsDistribution merge(PartitionsDistribution left, 
PartitionsDistribution right) {
-        if (left == null)
-            return right;
-        if (right == null)
-            return left;
-
-        return left.mergeWith(right);
-    }
-
-    private static <T> List<T> merge(List<T> left, List<T> right) {
-        if (left == null)
-            return right;
-
-        if (right != null)
-            left.addAll(right);
-
-        return left;
-    }
-
-    private static GridIntList merge(GridIntList left, GridIntList right) {
-        if (left == null)
-            return right;
-
-        if (right != null)
-            left.addAll(right);
-
-        return left;
-    }
-}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index 090ef53..d12c19c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -25,7 +25,6 @@ import org.apache.calcite.rel.metadata.MetadataDef;
 import org.apache.calcite.rel.metadata.MetadataHandler;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 
@@ -37,11 +36,12 @@ public class IgniteMetadata {
         ChainedRelMetadataProvider.of(
             ImmutableList.of(
                 IgniteMdDistribution.SOURCE,
-                IgniteMdSourceDistribution.SOURCE,
+                IgniteMdFragmentLocation.SOURCE,
                 DefaultRelMetadataProvider.INSTANCE));
 
     public interface DistributionTraitMetadata extends Metadata {
-        MetadataDef<DistributionTraitMetadata> DEF = 
MetadataDef.of(DistributionTraitMetadata.class, 
DistributionTraitMetadata.Handler.class, 
IgniteMethod.DISTRIBUTION_TRAIT.method());
+        MetadataDef<DistributionTraitMetadata> DEF = 
MetadataDef.of(DistributionTraitMetadata.class,
+            DistributionTraitMetadata.Handler.class, 
IgniteMethod.DISTRIBUTION_TRAIT.method());
 
         /** Determines how the rows are distributed. */
         DistributionTrait getDistributionTrait();
@@ -52,15 +52,16 @@ public class IgniteMetadata {
         }
     }
 
-    public interface SourceDistributionMetadata extends Metadata {
-        MetadataDef<SourceDistributionMetadata> DEF = 
MetadataDef.of(SourceDistributionMetadata.class, 
SourceDistributionMetadata.Handler.class, 
IgniteMethod.SOURCE_DISTRIBUTION.method());
+    public interface FragmentLocationMetadata extends Metadata {
+        MetadataDef<FragmentLocationMetadata> DEF = 
MetadataDef.of(FragmentLocationMetadata.class,
+            FragmentLocationMetadata.Handler.class, 
IgniteMethod.FRAGMENT_LOCATION.method());
 
         /** Determines how the rows are distributed. */
-        SourceDistribution getSourceDistribution();
+        FragmentLocation getLocation();
 
         /** Handler API. */
-        interface Handler extends MetadataHandler<SourceDistributionMetadata> {
-            SourceDistribution getSourceDistribution(RelNode r, 
RelMetadataQuery mq);
+        interface Handler extends MetadataHandler<FragmentLocationMetadata> {
+            FragmentLocation getLocation(RelNode r, RelMetadataQuery mq);
         }
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/Location.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/Location.java
new file mode 100644
index 0000000..c83d16b
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/Location.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class Location {
+    public static byte HAS_MOVING_PARTITIONS = 0x1;
+    public static byte HAS_REPLICATED_CACHES = 0x2;
+    public static byte HAS_PARTITIONED_CACHES = 0x4;
+    public static byte PARTIALLY_REPLICATED = 0x8;
+    public static byte DEDUPLICATED = 0x16;
+
+    private final List<ClusterNode> nodes;
+    private final List<List<ClusterNode>> assignments;
+    private final byte flags;
+
+    public Location(List<ClusterNode> nodes, List<List<ClusterNode>> 
assignments, byte flags) {
+        this.nodes = nodes;
+        this.assignments = assignments;
+        this.flags = flags;
+    }
+
+    public List<ClusterNode> nodes() {
+        return nodes;
+    }
+
+    public List<ClusterNode> nodes(int part) {
+        return assignments.get(part % assignments.size());
+    }
+
+    public Location mergeWith(Location other) throws LocationMappingException {
+        byte flags = (byte) (this.flags | other.flags);
+
+        if ((flags & PARTIALLY_REPLICATED) == 0)
+            return new Location(U.firstNotNull(nodes, other.nodes), 
mergeAssignments(other, null), flags);
+
+        List<ClusterNode> nodes;
+
+        if (this.nodes == null)
+            nodes = other.nodes;
+        else if (other.nodes == null)
+            nodes = this.nodes;
+        else
+            nodes = Commons.intersect(this.nodes, other.nodes);
+
+        if (nodes != null && nodes.isEmpty())
+            throw new LocationMappingException("Failed to map fragment to 
location.");
+
+        return new Location(nodes, mergeAssignments(other, nodes), flags);
+    }
+
+    private List<List<ClusterNode>> mergeAssignments(Location other, 
List<ClusterNode> nodes) throws LocationMappingException {
+        byte flags = (byte) (this.flags | other.flags); 
List<List<ClusterNode>> left = assignments, right = other.assignments;
+
+        if (left == null && right == null)
+            return null; // nothing to intersect;
+
+        if (left == null || right == null || (flags & HAS_MOVING_PARTITIONS) 
== 0) {
+            List<List<ClusterNode>> assignments = U.firstNotNull(left, right);
+
+            if (nodes == null || (flags & PARTIALLY_REPLICATED) == 0)
+                return assignments;
+
+            List<List<ClusterNode>> assignments0 = new 
ArrayList<>(assignments.size());
+            HashSet<ClusterNode> nodesSet = new HashSet<>(nodes);
+
+            for (List<ClusterNode> partNodes : assignments) {
+                List<ClusterNode> partNodes0 = new 
ArrayList<>(partNodes.size());
+
+                for (ClusterNode partNode : partNodes) {
+                    if (nodesSet.contains(partNode))
+                        partNodes0.add(partNode);
+                }
+
+                if (partNodes0.isEmpty())
+                    throw new LocationMappingException("Failed to map fragment 
to location.");
+
+                assignments0.add(partNodes0);
+            }
+
+            return assignments0;
+        }
+
+        List<List<ClusterNode>> assignments = new ArrayList<>(left.size());
+        HashSet<ClusterNode> nodesSet = nodes != null ? new HashSet<>(nodes) : 
null;
+
+        for (int i = 0; i < left.size(); i++) {
+            List<ClusterNode> leftNodes = left.get(i), partNodes = new 
ArrayList<>(leftNodes.size());
+            HashSet<ClusterNode> rightNodesSet = new HashSet<>(right.get(i));
+
+            for (ClusterNode partNode : leftNodes) {
+                if (rightNodesSet.contains(partNode) && (nodesSet == null || 
nodesSet.contains(partNode)))
+                    partNodes.add(partNode);
+            }
+
+            if (partNodes.isEmpty())
+                throw new LocationMappingException("Failed to map fragment to 
location.");
+
+            assignments.add(partNodes);
+        }
+
+        return assignments;
+    }
+
+    public Location deduplicate() throws LocationMappingException {
+        if (assignments == null || (flags & DEDUPLICATED) == DEDUPLICATED)
+            return this;
+
+        HashSet<ClusterNode> nodes0 = new HashSet<>();
+        List<List<ClusterNode>> assignments0 = new 
ArrayList<>(assignments.size());
+
+        for (List<ClusterNode> partNodes : assignments) {
+            ClusterNode node = F.first(partNodes);
+
+            if (node == null)
+                throw new LocationMappingException("Failed to map fragment to 
location.");
+
+            assignments0.add(Collections.singletonList(node));
+            nodes0.add(node);
+        }
+
+        return new Location(new ArrayList<>(nodes0), assignments0, 
(byte)(flags | DEDUPLICATED));
+    }
+
+    public boolean hasMovingPartitions() {
+        return (flags & HAS_MOVING_PARTITIONS) == HAS_MOVING_PARTITIONS;
+    }
+
+    public boolean hasReplicatedCaches() {
+        return (flags & HAS_REPLICATED_CACHES) == HAS_REPLICATED_CACHES;
+    }
+
+    public boolean hasPartitionedCaches() {
+        return (flags & HAS_PARTITIONED_CACHES) == HAS_PARTITIONED_CACHES;
+    }
+
+    public boolean partiallyReplicated() {
+        return (flags & PARTIALLY_REPLICATED) == PARTIALLY_REPLICATED;
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationMappingException.java
similarity index 75%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationMappingException.java
index 76af490..e1d884a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationMappingException.java
@@ -14,14 +14,13 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
-
-import java.util.List;
-import org.apache.ignite.cluster.ClusterNode;
+package org.apache.ignite.internal.processors.query.calcite.metadata;
 
 /**
  *
  */
-public interface DistributionFunction {
-    List<ClusterNode> destination(Object row);
+public class LocationMappingException extends Exception {
+    public LocationMappingException(String message) {
+        super(message);
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
similarity index 63%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
index 345957e..d81e440 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
@@ -14,15 +14,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.metadata;
 
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 
 /**
  *
  */
-public interface PartitionsDistributionRegistry {
-    PartitionsDistribution single(); // returns local node with single 
partition
-    PartitionsDistribution random(AffinityTopologyVersion topVer); // returns 
random distribution, partitions count depends on nodes count
-    PartitionsDistribution distributed(int cacheId, AffinityTopologyVersion 
topVer); // returns cache distribution
+public interface LocationRegistry {
+    Location single(AffinityTopologyVersion topVer); // returns local node 
with single partition
+    Location random(AffinityTopologyVersion topVer); // returns random 
distribution, partitions count depends on nodes count
+    Location distributed(int cacheId, AffinityTopologyVersion topVer); // 
returns cache distribution
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
similarity index 61%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
index 8bc129e..fb609ea 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
@@ -14,14 +14,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import org.apache.calcite.util.ImmutableIntList;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import org.apache.ignite.internal.processors.query.calcite.util.Edge;
 
 /**
  *
  */
-public interface DistributionFunctionFactory {
-    DistributionFunction create(SourceDistribution targetDistr, 
ImmutableIntList keys);
+public class OptimisticPlanningException extends RuntimeException{
+    private final Edge edge;
+
+    public OptimisticPlanningException(String message, Edge edge, Throwable 
cause) {
+        super(message, cause);
+        this.edge = edge;
+    }
+
+    public Edge edge() {
+        return edge;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index 8c10a51..fc6e4fa 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -19,7 +19,6 @@ package 
org.apache.ignite.internal.processors.query.calcite.metadata;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import org.jetbrains.annotations.NotNull;
 
@@ -31,47 +30,47 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
     private static final JaninoRelMetadataProvider PROVIDER = 
JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER);
 
     private IgniteMetadata.DistributionTraitMetadata.Handler 
distributionTraitHandler;
-    private IgniteMetadata.SourceDistributionMetadata.Handler 
sourceDistributionHandler;
+    private IgniteMetadata.FragmentLocationMetadata.Handler 
sourceDistributionHandler;
 
-    private RelMetadataQueryEx() {
-        super(JaninoRelMetadataProvider.DEFAULT, RelMetadataQuery.EMPTY);
+    @SuppressWarnings("MethodOverridesStaticMethodOfSuperclass")
+    public static RelMetadataQueryEx instance() {
+        return new RelMetadataQueryEx(PROTO);
+    }
 
-        distributionTraitHandler = 
initialHandler(IgniteMetadata.DistributionTraitMetadata.Handler.class);
-        sourceDistributionHandler = 
initialHandler(IgniteMetadata.SourceDistributionMetadata.Handler.class);
+    public static RelMetadataQueryEx wrap(@NotNull RelMetadataQuery mq) {
+        if (mq.getClass() == RelMetadataQueryEx.class)
+            return (RelMetadataQueryEx) mq;
+
+        return new RelMetadataQueryEx(mq);
     }
 
-    protected RelMetadataQueryEx(JaninoRelMetadataProvider metadataProvider, 
RelMetadataQueryEx prototype) {
-        super(metadataProvider, prototype);
+    private RelMetadataQueryEx(@NotNull RelMetadataQueryEx parent) {
+        super(PROVIDER, parent);
 
-        distributionTraitHandler = prototype.distributionTraitHandler;
-        sourceDistributionHandler = prototype.sourceDistributionHandler;
+        distributionTraitHandler = parent.distributionTraitHandler;
+        sourceDistributionHandler = parent.sourceDistributionHandler;
     }
 
-    protected RelMetadataQueryEx(JaninoRelMetadataProvider metadataProvider, 
RelMetadataQuery parent) {
-        super(metadataProvider, parent);
+    private RelMetadataQueryEx(@NotNull RelMetadataQuery parent) {
+        super(PROVIDER, parent);
 
         distributionTraitHandler = PROTO.distributionTraitHandler;
         sourceDistributionHandler = PROTO.sourceDistributionHandler;
     }
 
-    @SuppressWarnings("MethodOverridesStaticMethodOfSuperclass")
-    public static RelMetadataQueryEx instance() {
-        return new RelMetadataQueryEx(PROVIDER, PROTO);
-    }
-
-    public static RelMetadataQueryEx wrap(@NotNull RelMetadataQuery mq) {
-        if (mq.getClass() == RelMetadataQueryEx.class)
-            return (RelMetadataQueryEx) mq;
+    private RelMetadataQueryEx() {
+        super(JaninoRelMetadataProvider.DEFAULT, RelMetadataQuery.EMPTY);
 
-        return new RelMetadataQueryEx(PROVIDER, mq);
+        distributionTraitHandler = 
initialHandler(IgniteMetadata.DistributionTraitMetadata.Handler.class);
+        sourceDistributionHandler = 
initialHandler(IgniteMetadata.FragmentLocationMetadata.Handler.class);
     }
 
-    public SourceDistribution getSourceDistribution(RelNode rel) {
+    public FragmentLocation getFragmentLocation(RelNode rel) {
         for (;;) {
             try {
-                return sourceDistributionHandler.getSourceDistribution(rel, 
this);
+                return sourceDistributionHandler.getLocation(rel, this);
             } catch (JaninoRelMetadataProvider.NoHandler e) {
-                sourceDistributionHandler = revise(e.relClass, 
IgniteMetadata.SourceDistributionMetadata.DEF);
+                sourceDistributionHandler = revise(e.relClass, 
IgniteMetadata.FragmentLocationMetadata.DEF);
             }
         }
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/CloneContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/CloneContext.java
deleted file mode 100644
index 967606f..0000000
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/CloneContext.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rel;
-
-import java.util.IdentityHashMap;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.rel.RelNode;
-
-/**
- *
- */
-public final class CloneContext {
-    private final RelOptCluster cluster;
-    private final IdentityHashMap<IgniteRel, IgniteRel> mapping = new 
IdentityHashMap<>();
-
-    public CloneContext(RelOptCluster cluster) {
-        this.cluster = cluster;
-    }
-
-    public RelOptCluster getCluster() {
-        return cluster;
-    }
-
-    public <T extends RelNode> T clone(T src) {
-        if (!(src instanceof IgniteRel))
-            throw new IllegalStateException("Unexpected node type: " + 
src.getClass());
-
-        return (T) mapping.computeIfAbsent((IgniteRel) src, this::clone0);
-    }
-
-    private IgniteRel clone0(IgniteRel src) {
-        return src.clone(this);
-    }
-}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
index 1bd637c..5fce676 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
@@ -54,10 +54,6 @@ public final class IgniteExchange extends SingleRel 
implements IgniteRel {
         return new IgniteExchange(getCluster(), traitSet, sole(inputs));
     }
 
-    @Override public IgniteRel clone(CloneContext ctx) {
-        return new IgniteExchange(ctx.getCluster(), getTraitSet(), 
ctx.clone(getInput()));
-    }
-
     @Override public RelWriter explainTerms(RelWriter pw) {
         return super.explainTerms(pw)
             .item("distribution", 
getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index 1e23373..ed8b52f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -42,10 +42,6 @@ public final class IgniteFilter extends Filter implements 
IgniteRel {
     return variablesSet;
   }
 
-  @Override public IgniteRel clone(CloneContext ctx) {
-    return new IgniteFilter(ctx.getCluster(), getTraitSet(), 
ctx.clone(getInput()), getCondition(), variablesSet);
-  }
-
   @Override public IgniteFilter copy(RelTraitSet traitSet, RelNode input,
       RexNode condition) {
     return new IgniteFilter(getCluster(), traitSet, input, condition, 
variablesSet);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
index 2818382..6450abd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
@@ -42,10 +42,6 @@ public final class IgniteHashJoin extends Join implements 
IgniteRel {
     this.semiJoinDone = semiJoinDone;
   }
 
-  @Override public IgniteRel clone(CloneContext ctx) {
-    return new IgniteHashJoin(ctx.getCluster(), getTraitSet(), 
ctx.clone(getLeft()), ctx.clone(getRight()), getCondition(), variablesSet, 
getJoinType(), semiJoinDone);
-  }
-
   @Override public IgniteHashJoin copy(RelTraitSet traitSet, RexNode 
conditionExpr,
       RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) 
{
     return new IgniteHashJoin(getCluster(), traitSet, left, right, 
conditionExpr, variablesSet, joinType, semiJoinDone);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index 1c1e4ec..79408cd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -37,11 +37,7 @@ public final class IgniteProject extends Project implements 
IgniteRel {
     super(cluster, traitSet, input, projects, rowType);
   }
 
-  @Override public IgniteRel clone(CloneContext ctx) {
-    return new IgniteProject(ctx.getCluster(), getTraitSet(), 
ctx.clone(getInput()), getProjects(), getRowType());
-  }
-
-    @Override public IgniteProject copy(RelTraitSet traitSet, RelNode input,
+  @Override public IgniteProject copy(RelTraitSet traitSet, RelNode input,
       List<RexNode> projects, RelDataType rowType) {
     return new IgniteProject(getCluster(), traitSet, input, projects, rowType);
   }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
index d2a47da..9a761ad 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
@@ -31,6 +31,4 @@ public interface IgniteRel extends RelNode {
             return true; // Enables trait definition conversion
         }
     };
-
-    IgniteRel clone(CloneContext ctx);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
index 5ceaaae..4f854c3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
@@ -22,8 +22,8 @@ import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 
 public final class IgniteTableScan extends TableScan implements IgniteRel {
@@ -31,18 +31,14 @@ public final class IgniteTableScan extends TableScan 
implements IgniteRel {
     super(cluster, traitSet, table);
   }
 
-  @Override public IgniteRel clone(CloneContext ctx) {
-    return new IgniteTableScan(ctx.getCluster(), getTraitSet(), getTable());
-  }
-
   @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
     assert inputs.isEmpty();
 
     return this;
   }
 
-  public SourceDistribution tableDistribution() {
+  public FragmentLocation location() {
     boolean local = !getTraitSet().isEnabled(DistributionTraitDef.INSTANCE);
-    return 
getTable().unwrap(IgniteTable.class).sourceDistribution(getCluster().getPlanner().getContext(),
 local);
+    return 
getTable().unwrap(IgniteTable.class).location(getCluster().getPlanner().getContext(),
 local);
   }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
index a4d46f7..74c9d1b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
@@ -21,14 +21,15 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 
 /**
  *
  */
-public class Receiver extends SingleRel implements IgniteRel {
-    private SourceDistribution sourceDistribution;
+public final class Receiver extends SingleRel implements IgniteRel {
+    private FragmentLocation sourceDistribution;
 
     /**
      * @param cluster Cluster this relational expression belongs to
@@ -49,17 +50,17 @@ public class Receiver extends SingleRel implements 
IgniteRel {
         return new Receiver(getCluster(), traitSet, (Sender) sole(inputs));
     }
 
-    @Override public IgniteRel clone(CloneContext ctx) {
-        return new Receiver(ctx.getCluster(), getTraitSet(), 
ctx.clone(getInput()));
-    }
-
-    public void init(SourceDistribution targetDistribution, RelMetadataQueryEx 
mq) {
-        getInput().init(targetDistribution);
+    public void init(FragmentLocation targetDistribution, RelMetadataQueryEx 
mq) {
+        getInput().init(targetDistribution, 
getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
 
-        sourceDistribution = getInput().sourceDistribution(mq);
+        sourceDistribution = getInput().location(mq);
     }
 
-    public SourceDistribution sourceDistribution() {
+    public FragmentLocation sourceDistribution() {
         return sourceDistribution;
     }
+
+    public void reset() {
+        sourceDistribution = null;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
index 715eb3f..f53b953 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
@@ -22,19 +22,19 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 
 /**
  *
  */
-public class Sender extends SingleRel implements IgniteRel {
-    private SourceDistribution sourceDistribution;
-    private SourceDistribution targetDistribution;
-    private DistributionFunction targetFunction;
+public final class Sender extends SingleRel implements IgniteRel {
+    private FragmentLocation location;
+    private FragmentLocation targetLocation;
+    private DistributionTrait targetDistribution;
+    private DestinationFunction destinationFunction;
 
     /**
      * Creates a <code>SingleRel</code>.
@@ -47,34 +47,36 @@ public class Sender extends SingleRel implements IgniteRel {
         super(cluster, traits, input);
     }
 
-    @Override public IgniteRel clone(CloneContext ctx) {
-        return new Sender(ctx.getCluster(), getTraitSet(), 
ctx.clone(getInput()));
-    }
-
     @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
         return new Sender(getCluster(), traitSet, sole(inputs));
     }
 
-    public void init(SourceDistribution targetDistribution) {
+    public void init(FragmentLocation targetLocation, DistributionTrait 
targetDistribution) {
+        this.targetLocation = targetLocation;
         this.targetDistribution = targetDistribution;
     }
 
-    public DistributionFunction targetFunction() {
-        if (targetFunction == null) {
-            assert targetDistribution != null && 
targetDistribution.partitionMapping != null;
-
-            DistributionTrait distribution = 
getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+    public DestinationFunction targetFunction() {
+        if (destinationFunction == null) {
+            assert targetLocation != null && targetLocation.location != null 
&& targetDistribution != null;
 
-            targetFunction = 
distribution.functionFactory().create(targetDistribution, distribution.keys());
+            destinationFunction = 
targetDistribution.destinationFunctionFactory().create(targetLocation, 
targetDistribution.keys());
         }
 
-        return targetFunction;
+        return destinationFunction;
     }
 
-    public SourceDistribution sourceDistribution(RelMetadataQuery mq) {
-        if (sourceDistribution == null)
-            sourceDistribution = 
RelMetadataQueryEx.wrap(mq).getSourceDistribution(getInput());
+    public FragmentLocation location(RelMetadataQuery mq) {
+        if (location == null)
+            location = 
RelMetadataQueryEx.wrap(mq).getFragmentLocation(getInput());
+
+        return location;
+    }
 
-        return sourceDistribution;
+    public void reset() {
+        location = null;
+        targetLocation = null;
+        targetDistribution = null;
+        destinationFunction = null;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 41ffb12..b5940b0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -27,13 +27,13 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTable;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
@@ -77,24 +77,35 @@ public class IgniteTable extends AbstractTable implements 
TranslatableTable {
     }
 
     public DistributionTrait distributionTrait(Context context) {
-        return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.noOpFunction()); // TODO
+        return 
distributionRegistry(context).distribution(CU.cacheId(cacheName), rowType);
     }
 
-    public SourceDistribution sourceDistribution(Context context, boolean 
local) {
+    public FragmentLocation location(Context context, boolean local) {
         int cacheId = CU.cacheId(cacheName);
 
-        SourceDistribution res = new SourceDistribution();
+        FragmentLocation res = new FragmentLocation();
 
         GridIntList localInputs = new GridIntList();
         localInputs.add(cacheId);
         res.localInputs = localInputs;
 
-        if (!local) {
-            PartitionsDistributionRegistry registry = 
context.unwrap(PartitionsDistributionRegistry.class);
-            AffinityTopologyVersion topVer = 
context.unwrap(AffinityTopologyVersion.class);
-            res.partitionMapping = registry.distributed(cacheId, topVer);
-        }
+        if (!local)
+            res.location = locationRegistry(context).distributed(cacheId, 
topologyVersion(context));
+
+        res.topVer = topologyVersion(context);
 
         return res;
     }
+
+    private LocationRegistry locationRegistry(Context ctx) {
+        return ctx.unwrap(LocationRegistry.class);
+    }
+
+    public DistributionRegistry distributionRegistry(Context ctx) {
+        return ctx.unwrap(DistributionRegistry.class);
+    }
+
+    private AffinityTopologyVersion topologyVersion(Context ctx) {
+        return ctx.unwrap(AffinityTopologyVersion.class);
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index 2a98eaa..2fd35d7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -19,6 +19,10 @@ package 
org.apache.ignite.internal.processors.query.calcite.splitter;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationMappingException;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
@@ -30,7 +34,7 @@ import org.apache.ignite.internal.util.typedef.F;
 public class Fragment {
     public final RelNode rel;
 
-    public SourceDistribution distribution;
+    public FragmentLocation fragmentLocation;
 
     public Fragment(RelNode rel) {
         this.rel = rel;
@@ -39,22 +43,28 @@ public class Fragment {
     public void init(Context ctx) {
         RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
 
-        distribution = mq.getSourceDistribution(rel);
+        fragmentLocation = mq.getFragmentLocation(rel);
 
-        PartitionsDistribution mapping = distribution.partitionMapping;
+        AffinityTopologyVersion topVer = topologyVersion(ctx);
 
-        if (mapping == null) {
+        if (fragmentLocation.location == null) {
             if (!isRoot())
-                distribution.partitionMapping = 
registry(ctx).random(topologyVersion(ctx));
-            else if (!F.isEmpty(distribution.remoteInputs))
-                distribution.partitionMapping = registry(ctx).single();
+                fragmentLocation.location = registry(ctx).random(topVer);
+            else if (!F.isEmpty(fragmentLocation.remoteInputs))
+                fragmentLocation.location = registry(ctx).single(topVer);
+        }
+        else {
+            try {
+                fragmentLocation.location = 
fragmentLocation.location.deduplicate();
+            }
+            catch (LocationMappingException e) {
+                throw new IgniteSQLException("Failed to map fragment to 
location, partition lost.", e);
+            }
         }
-        else if (mapping.excessive)
-            distribution.partitionMapping = mapping.deduplicate();
 
-        if (!F.isEmpty(distribution.remoteInputs)) {
-            for (Receiver input : distribution.remoteInputs)
-                input.init(distribution, mq);
+        if (!F.isEmpty(fragmentLocation.remoteInputs)) {
+            for (Receiver input : fragmentLocation.remoteInputs)
+                input.init(fragmentLocation, mq);
         }
     }
 
@@ -62,11 +72,24 @@ public class Fragment {
         return !(rel instanceof Sender);
     }
 
-    private PartitionsDistributionRegistry registry(Context ctx) {
-        return ctx.unwrap(PartitionsDistributionRegistry.class);
+    private LocationRegistry registry(Context ctx) {
+        return ctx.unwrap(LocationRegistry.class);
     }
 
     private AffinityTopologyVersion topologyVersion(Context ctx) {
         return ctx.unwrap(AffinityTopologyVersion.class);
     }
+
+    public void reset() {
+        if (rel instanceof Sender)
+            ((Sender) rel).reset();
+
+        if (fragmentLocation != null && 
!F.isEmpty(fragmentLocation.remoteInputs)) {
+            for (Receiver receiver : fragmentLocation.remoteInputs) {
+                receiver.reset();
+            }
+        }
+
+        fragmentLocation = null;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
deleted file mode 100644
index fb2728a..0000000
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.splitter;
-
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- *
- */
-public class PartitionsDistribution {
-    public static final int[] ALL_PARTS = new int[0];
-    private static final int[] EMPTY = new int[0];
-
-    public boolean excessive;
-    public int parts;
-    public int[] nodes;
-    public int[][] nodeParts;
-
-    public PartitionsDistribution mergeWith(PartitionsDistribution other) {
-        if (parts != 0 && other.parts != 0 && parts != other.parts)
-            throw new IllegalStateException("Non-collocated query fragment.");
-
-        int[] nodes0 = null;
-        int[][] nodeParts0 = null;
-
-        int i = 0, j = 0, k = 0;
-
-        while (i < nodes.length && j < other.nodes.length) {
-            if (nodes[i] < other.nodes[j])
-                i++;
-            else if (other.nodes[j] < nodes[i])
-                j++;
-            else {
-                int[] mergedParts = merge(nodeParts[i], other.nodeParts[j]);
-
-                if (mergedParts == ALL_PARTS || mergedParts.length > 0) {
-                    if (nodes0 == null) {
-                        int len = Math.min(nodes.length, other.nodes.length);
-
-                        nodes0 = new int[len];
-                        nodeParts0 = new int[len][];
-                    }
-
-                    nodes0[k] = nodes[i];
-                    nodeParts0[k] = mergedParts;
-
-                    k++;
-                }
-
-                i++;
-                j++;
-            }
-        }
-
-        PartitionsDistribution res = new PartitionsDistribution();
-
-        res.excessive = excessive && other.excessive;
-        res.parts = Math.max(parts, other.parts);
-        res.nodes = nodes0.length == k ? nodes0 : Arrays.copyOf(nodes0, k);
-        res.nodeParts = nodeParts0.length == k ? nodeParts0 : 
Arrays.copyOf(nodeParts0, k);
-
-        check(res);
-
-        return res;
-    }
-
-    private void check(PartitionsDistribution res) {
-        if (res.parts == 0)
-            return; // Only receivers and/or replicated caches in task subtree.
-
-        BitSet check = new BitSet(res.parts);
-
-        int checkedParts = 0;
-
-        for (int[] nodePart : res.nodeParts) {
-            for (int p : nodePart) {
-                if (!check.get(p)) {
-                    check.set(p);
-                    checkedParts++;
-                }
-            }
-        }
-
-        if (checkedParts < res.parts)
-            throw new IllegalStateException("Failed to collocate used 
caches.");
-    }
-
-    private int[] merge(int[] left, int[] right) {
-        if (left == ALL_PARTS)
-            return right;
-        if (right == ALL_PARTS)
-            return left;
-
-        int[] nodeParts = null;
-
-        int i = 0, j = 0, k = 0;
-
-        while (i < left.length && j < right.length) {
-            if (left[i] < right[j])
-                i++;
-            else if (right[j] < left[i])
-                j++;
-            else {
-                if (nodeParts == null)
-                    nodeParts = new int[Math.min(left.length, right.length)];
-
-                nodeParts[k++] = left[i];
-
-                i++;
-                j++;
-            }
-        }
-
-        if (k == 0)
-            return EMPTY;
-
-        return nodeParts.length == k ? nodeParts : Arrays.copyOf(nodeParts, k);
-    }
-
-    public PartitionsDistribution deduplicate() {
-        if (!excessive)
-            return this;
-
-        Map<Integer, Integer> map = new HashMap<>();
-
-        int idx = 0; int[] idxs = new int[nodeParts.length];
-        while (map.size() < parts) {
-            int[] nodePart = nodeParts[idx];
-
-            int j = idxs[idx];
-
-            while (j < nodePart.length) {
-                if (map.putIfAbsent(nodePart[j], nodes[idx]) == null || j + 1 
== nodePart.length) {
-                    idxs[idx] = j + 1;
-
-                    break;
-                }
-
-                j++;
-            }
-
-            idx = (idx + 1) % nodes.length;
-        }
-
-        int[] nodes0 = new int[nodes.length]; int[][] nodeParts0 = new 
int[nodes.length][];
-
-        int k = 0;
-
-        for (int i = 0; i < nodes.length; i++) {
-            int j = 0;
-
-            int[] nodePart0 = null;
-
-            for (int p : nodeParts[i]) {
-                if (map.get(p) == nodes[i]) {
-                    if (nodePart0 == null)
-                        nodePart0 = new int[nodeParts[i].length];
-
-                    nodePart0[j++] = p;
-                }
-            }
-
-            if (nodePart0 != null) {
-                nodes0[k] = nodes[i];
-                nodeParts0[k] = nodePart0.length == j ? nodePart0 : 
Arrays.copyOf(nodePart0, j);
-
-                k++;
-            }
-        }
-
-        PartitionsDistribution res = new PartitionsDistribution();
-
-        res.parts = parts;
-        res.nodes = nodes0.length == k ? nodes0 : Arrays.copyOf(nodes0, k);
-        res.nodeParts = nodeParts0.length == k ? nodeParts0 : 
Arrays.copyOf(nodeParts0, k);
-
-        return res;
-    }
-}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index d23091d..b944a9b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -16,37 +16,61 @@
 
 package org.apache.ignite.internal.processors.query.calcite.splitter;
 
-import com.google.common.collect.ImmutableList;
+import java.util.List;
 import org.apache.calcite.plan.Context;
-import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.util.Edge;
 
 /**
  *
  */
 public class QueryPlan {
-    private final ImmutableList<Fragment> fragments;
+    private final List<Fragment> fragments;
 
-    public QueryPlan(ImmutableList<Fragment> fragments) {
+    public QueryPlan(List<Fragment> fragments) {
         this.fragments = fragments;
     }
 
     public void init(Context ctx) {
-        for (Fragment fragment : fragments) {
-            fragment.init(ctx);
-        }
-    }
+        int i = 0;
 
-    public ImmutableList<Fragment> fragments() {
-        return fragments;
-    }
+        while (true) {
+            try {
+                for (Fragment fragment : fragments)
+                    fragment.init(ctx);
+
+                break;
+            }
+            catch (OptimisticPlanningException e) {
+                if (++i > 3)
+                    throw new IgniteSQLException("Failed to map query.", e);
+
+                for (Fragment fragment0 : fragments)
+                    fragment0.reset();
 
-    public QueryPlan clone(CloneContext ctx) {
-        ImmutableList.Builder<Fragment> b = ImmutableList.builder();
+                Edge edge = e.edge();
 
-        for (Fragment f : fragments) {
-            b.add(new Fragment(ctx.clone(f.rel)));
+                RelNode parent = edge.parent();
+                RelNode child = edge.child();
+
+                RelOptCluster cluster = child.getCluster();
+                RelTraitSet traitSet = child.getTraitSet();
+
+                Sender sender = new Sender(cluster, traitSet, child);
+                parent.replaceInput(edge.childIdx(), new Receiver(cluster, 
traitSet, sender));
+
+                fragments.add(new Fragment(sender));
+            }
         }
+    }
 
-        return new QueryPlan(b.build());
+    public List<Fragment> fragments() {
+        return fragments;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index 31b8819..5f751d3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -16,46 +16,52 @@
 
 package org.apache.ignite.internal.processors.query.calcite.splitter;
 
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelShuttleImpl;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import 
org.apache.ignite.internal.processors.query.calcite.util.IgniteRelShuttle;
 
 /**
  *
  */
-public class Splitter extends RelShuttleImpl {
-    private ImmutableList.Builder<Fragment> b;
+public class Splitter extends IgniteRelShuttle {
+    private List<Fragment> fragments;
 
     public QueryPlan go(IgniteRel root) {
-        b = ImmutableList.builder();
+        fragments = new ArrayList<>();
 
-        return new QueryPlan(b.add(new Fragment(root.accept(this))).build());
+        fragments.add(new Fragment(root.accept(this)));
+
+        Collections.reverse(fragments);
+
+        return new QueryPlan(fragments);
     }
 
-    @Override public RelNode visit(RelNode rel) {
-        if (!(rel instanceof IgniteRel))
-            throw new AssertionError("Unexpected node: " + rel);
-        else if (rel instanceof Sender || rel instanceof Receiver)
-            throw new AssertionError("An attempt to split an already split 
task.");
-        else if (rel instanceof IgniteExchange) {
-            IgniteExchange exchange = (IgniteExchange) rel;
+    @Override public RelNode visit(IgniteExchange rel) {
+        RelOptCluster cluster = rel.getCluster();
+
+        Sender sender = new Sender(cluster, rel.getInput().getTraitSet(), 
visit(rel.getInput()));
 
-            RelOptCluster cluster = exchange.getCluster();
-            RelTraitSet traitSet = exchange.getTraitSet();
+        fragments.add(new Fragment(sender));
 
-            Sender sender = new Sender(cluster, traitSet, 
visit(exchange.getInput()));
+        return new Receiver(cluster, rel.getTraitSet(), sender);
+    }
 
-            b.add(new Fragment(sender));
+    @Override public RelNode visit(Receiver rel) {
+        throw new AssertionError("An attempt to split an already split task.");
+    }
 
-            return new Receiver(cluster, traitSet, sender);
-        }
+    @Override public RelNode visit(Sender rel) {
+        throw new AssertionError("An attempt to split an already split task.");
+    }
 
-        return super.visit(rel);
+    @Override protected RelNode visitOther(RelNode rel) {
+        throw new AssertionError("Unexpected node: " + rel);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
similarity index 95%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
index 76af490..7577767 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
@@ -22,6 +22,6 @@ import org.apache.ignite.cluster.ClusterNode;
 /**
  *
  */
-public interface DistributionFunction {
+public interface DestinationFunction {
     List<ClusterNode> destination(Object row);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
similarity index 80%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
index 8bc129e..d4dc4fc 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
@@ -17,11 +17,11 @@
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
 import org.apache.calcite.util.ImmutableIntList;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
 
 /**
  *
  */
-public interface DistributionFunctionFactory {
-    DistributionFunction create(SourceDistribution targetDistr, 
ImmutableIntList keys);
+public interface DestinationFunctionFactory {
+    DestinationFunction create(FragmentLocation targetLocation, 
ImmutableIntList keys);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index 2510c2c..e733461 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -17,6 +17,7 @@
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
 import java.util.Objects;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.util.ImmutableIntList;
@@ -24,37 +25,57 @@ import org.apache.calcite.util.ImmutableIntList;
 /**
  *
  */
-public interface DistributionTrait extends RelTrait {
-    enum DistributionType {
-        HASH("hash"),
-        RANDOM("random"),
-        BROADCAST("broadcast"),
-        SINGLE("single"),
-        ANY("any");
-
-        /** */
-        private final String description;
-
-        /** */
-        DistributionType(String description) {
-            this.description = description;
-        }
+public final class DistributionTrait implements RelTrait {
+    private final DistributionType type;
+    private final ImmutableIntList keys;
+    private final DestinationFunctionFactory functionFactory;
+
+    public DistributionTrait(DistributionType type, ImmutableIntList keys, 
DestinationFunctionFactory functionFactory) {
+        this.type = type;
+        this.keys = keys;
+        this.functionFactory = functionFactory;
+    }
+
+    public DistributionType type() {
+        return type;
+    }
+
+    public DestinationFunctionFactory destinationFunctionFactory() {
+        return functionFactory;
+    }
+
+    public ImmutableIntList keys() {
+        return keys;
+    }
 
-        /** */
-        @Override public String toString() {
-            return description;
+    @Override public void register(RelOptPlanner planner) {}
+
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o instanceof DistributionTrait) {
+            DistributionTrait that = (DistributionTrait) o;
+
+            return type == that.type() && keys.equals(that.keys());
         }
+
+        return false;
     }
 
-    DistributionType type();
+    @Override public int hashCode() {
+        return Objects.hash(type, keys);
+    }
 
-    DistributionFunctionFactory functionFactory();
+    @Override public String toString() {
+        return type + (type == DistributionType.HASH ? keys.toString()  : "");
+    }
 
-    @Override default RelTraitDef getTraitDef() {
+    @Override public RelTraitDef getTraitDef() {
         return DistributionTraitDef.INSTANCE;
     }
 
-    @Override default boolean satisfies(RelTrait trait) {
+    @Override public boolean satisfies(RelTrait trait) {
         if (trait == this)
             return true;
 
@@ -68,13 +89,10 @@ public interface DistributionTrait extends RelTrait {
 
         if (type() == other.type())
             return type() != DistributionType.HASH
-                || (Objects.equals(keys(), other.keys()) && 
Objects.equals(functionFactory(), other.functionFactory()));
+                || (Objects.equals(keys(), other.keys())
+                    && Objects.equals(destinationFunctionFactory(), 
other.destinationFunctionFactory()));
 
         return other.type() == DistributionType.RANDOM && type() == 
DistributionType.HASH;
     }
 
-    /**
-     * @return Hash distribution columns ordinals or empty list otherwise.
-     */
-    ImmutableIntList keys();
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
deleted file mode 100644
index 099c96b..0000000
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.trait;
-
-import java.util.Objects;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.util.ImmutableIntList;
-
-/**
- *
- */
-public class DistributionTraitImpl implements DistributionTrait {
-    private final DistributionType type;
-    private final ImmutableIntList keys;
-    private final DistributionFunctionFactory functionFactory;
-
-    public DistributionTraitImpl(DistributionType type, ImmutableIntList keys, 
DistributionFunctionFactory functionFactory) {
-        this.type = type;
-        this.keys = keys;
-        this.functionFactory = functionFactory;
-    }
-
-    @Override public DistributionType type() {
-        return type;
-    }
-
-    @Override public DistributionFunctionFactory functionFactory() {
-        return functionFactory;
-    }
-
-    @Override public ImmutableIntList keys() {
-        return keys;
-    }
-
-    @Override public void register(RelOptPlanner planner) {}
-
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o instanceof DistributionTrait) {
-            DistributionTrait that = (DistributionTrait) o;
-
-            return type == that.type() && keys.equals(that.keys());
-        }
-
-        return false;
-    }
-
-    @Override public int hashCode() {
-        return Objects.hash(type, keys);
-    }
-
-    @Override public String toString() {
-        return type + (type == DistributionType.HASH ? keys.toString()  : "");
-    }
-}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
similarity index 65%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
index 8bc129e..60e37ed 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
@@ -16,12 +16,32 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
-import org.apache.calcite.util.ImmutableIntList;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
-
 /**
  *
  */
-public interface DistributionFunctionFactory {
-    DistributionFunction create(SourceDistribution targetDistr, 
ImmutableIntList keys);
+public enum DistributionType {
+    HASH("hash"),
+    RANDOM("random"),
+    BROADCAST("broadcast"),
+    SINGLE("single"),
+    ANY("any");
+
+    /**
+     *
+     */
+    private final String description;
+
+    /**
+     *
+     */
+    DistributionType(String description) {
+        this.description = description;
+    }
+
+    /**
+     *
+     */
+    @Override public String toString() {
+        return description;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 59f69fe..8f83227 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -16,21 +16,45 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.ToIntFunction;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.cluster.ClusterNode;
 
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait.DistributionType.HASH;
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
 
 /**
  *
  */
 public class IgniteDistributions {
-    private static final DistributionFunctionFactory NO_OP_FACTORY = (t,k) -> 
null;
+    private static final DestinationFunctionFactory NO_OP_FACTORY = (t,k) -> 
null;
+    private static final DestinationFunctionFactory HASH_FACTORY = (t,k) -> {
+        int[] fields = k.toIntArray();
 
-    private static final DistributionTrait BROADCAST = new 
DistributionTraitImpl(DistributionTrait.DistributionType.BROADCAST, 
ImmutableIntList.of(), allTargetsFunction());
-    private static final DistributionTrait SINGLE = new 
DistributionTraitImpl(DistributionTrait.DistributionType.SINGLE, 
ImmutableIntList.of(), singleTargetFunction());
-    private static final DistributionTrait RANDOM = new 
DistributionTraitImpl(DistributionTrait.DistributionType.RANDOM, 
ImmutableIntList.of(), randomTargetFunction());
-    private static final DistributionTrait ANY    = new 
DistributionTraitImpl(DistributionTrait.DistributionType.ANY, 
ImmutableIntList.of(), noOpFunction());
+        ToIntFunction<Object> hashFun = r -> {
+            Object[] row = (Object[]) r;
+
+            if (row == null)
+                return 0;
+
+            int hash = 1;
+
+            for (int i : fields)
+                hash = 31 * hash + (row[i] == null ? 0 : row[i].hashCode());
+
+            return hash;
+        };
+
+        return r -> t.location.nodes(hashFun.applyAsInt(r));
+    };
+
+
+    private static final DistributionTrait BROADCAST = new 
DistributionTrait(DistributionType.BROADCAST, ImmutableIntList.of(), 
allTargetsFunction());
+    private static final DistributionTrait SINGLE = new 
DistributionTrait(DistributionType.SINGLE, ImmutableIntList.of(), 
singleTargetFunction());
+    private static final DistributionTrait RANDOM = new 
DistributionTrait(DistributionType.RANDOM, ImmutableIntList.of(), 
randomTargetFunction());
+    private static final DistributionTrait ANY    = new 
DistributionTrait(DistributionType.ANY, ImmutableIntList.of(), noOpFunction());
 
     public static DistributionTrait any() {
         return ANY;
@@ -48,23 +72,39 @@ public class IgniteDistributions {
         return BROADCAST;
     }
 
-    public static DistributionTrait hash(List<Integer> keys, 
DistributionFunctionFactory factory) {
-        return new DistributionTraitImpl(HASH, ImmutableIntList.copyOf(keys), 
factory);
+    public static DistributionTrait hash(List<Integer> keys, 
DestinationFunctionFactory factory) {
+        return new DistributionTrait(HASH, ImmutableIntList.copyOf(keys), 
factory);
     }
 
-    public static DistributionFunctionFactory noOpFunction() {
+    public static DestinationFunctionFactory noOpFunction() {
         return NO_OP_FACTORY;
     }
 
-    public static DistributionFunctionFactory singleTargetFunction() {
-        return noOpFunction(); // TODO
+    public static DestinationFunctionFactory singleTargetFunction() {
+        return (t, k) -> {
+            List<ClusterNode> nodes = t.location.nodes();
+
+            return r -> nodes;
+        };
+    }
+
+    public static DestinationFunctionFactory allTargetsFunction() {
+        return (t, k) -> {
+            List<ClusterNode> nodes = t.location.nodes();
+
+            return r -> nodes;
+        };
     }
 
-    public static DistributionFunctionFactory allTargetsFunction() {
-        return noOpFunction(); // TODO
+    public static DestinationFunctionFactory randomTargetFunction() {
+        return (t, k) -> {
+            List<ClusterNode> nodes = t.location.nodes();
+
+            return r -> 
Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())));
+        };
     }
 
-    public static DistributionFunctionFactory randomTargetFunction() {
-        return noOpFunction(); // TODO
+    public static DestinationFunctionFactory hashFunction() {
+        return HASH_FACTORY;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index fc4a6eb..ea3ecc3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -17,8 +17,12 @@
 
 package org.apache.ignite.internal.processors.query.calcite.util;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.IdentityHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -39,12 +43,16 @@ import 
org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
 public final class Commons {
+    private static final int[] EMPTY = new int[0];
+
     private Commons(){}
 
     public static Context convert(QueryContext ctx) {
@@ -129,4 +137,89 @@ public final class Commons {
             return transform;
         };
     }
+
+    public static int[] intersect(int[] left, int[] right) {
+        if (F.isEmpty(left) || F.isEmpty(right))
+            return EMPTY;
+
+        int[] res = null;
+
+        int i = 0, j = 0, k = 0, size = Math.min(left.length, right.length);
+
+        while (i < left.length && j < right.length) {
+            if (left[i] < right[j])
+                i++;
+            else if (right[j] < left[i])
+                j++;
+            else {
+                if (res == null)
+                    res = new int[size];
+
+                res[k++] = left[i];
+
+                i++;
+                j++;
+            }
+        }
+
+        if (k == 0)
+            return EMPTY;
+
+        return res.length == k ? res : Arrays.copyOf(res, k);
+    }
+
+    public static <T> List<T> intersect(List<T> left, List<T> right) {
+        if (F.isEmpty(left) || F.isEmpty(right))
+            return Collections.emptyList();
+
+        HashSet<T> set = new HashSet<>(right);
+
+        return 
left.stream().filter(set::contains).collect(Collectors.toList());
+    }
+
+    public static <T> List<T> union(List<T> left, List<T> right) {
+        Set<T> set = U.newHashSet(left.size() + right.size());
+
+        set.addAll(left);
+        set.addAll(right);
+
+        return new ArrayList<>(set);
+    }
+
+    public static int[] union(int[] left, int[] right) {
+        if (F.isEmpty(left) && F.isEmpty(right))
+            return EMPTY;
+
+        int min = Math.min(left.length, right.length);
+        int max = left.length + right.length;
+        int expected = Math.max(min, (int) (max * 1.5));
+
+        int[] res = new int[U.ceilPow2(expected)];
+
+        int i = 0, j = 0, k = 0;
+
+        while (i < left.length && j < right.length) {
+            res = ensureSize(res, k + 1);
+
+            if (left[i] < right[j])
+                res[k++] = left[i++];
+            else if (right[j] < left[i])
+                res[k++] = right[j++];
+            else {
+                res[k++] = left[i];
+
+                i++;
+                j++;
+            }
+        }
+
+        if (k == 0)
+            return EMPTY;
+
+        return res.length == k ? res : Arrays.copyOf(res, k);
+    }
+
+    private static int[] ensureSize(int[] array, int size) {
+        return size < array.length ? array : Arrays.copyOf(array, 
U.ceilPow2(size));
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Edge.java
similarity index 55%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Edge.java
index 8bc129e..58f7146 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Edge.java
@@ -14,14 +14,33 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.util;
 
-import org.apache.calcite.util.ImmutableIntList;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import org.apache.calcite.rel.RelNode;
 
 /**
  *
  */
-public interface DistributionFunctionFactory {
-    DistributionFunction create(SourceDistribution targetDistr, 
ImmutableIntList keys);
+public class Edge {
+    private final RelNode parent;
+    private final RelNode child;
+    private final int childIdx;
+
+    public Edge(RelNode parent, RelNode child, int childIdx) {
+        this.parent = parent;
+        this.child = child;
+        this.childIdx = childIdx;
+    }
+
+    public RelNode parent() {
+        return parent;
+    }
+
+    public RelNode child() {
+        return child;
+    }
+
+    public int childIdx() {
+        return childIdx;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
index 63a5c2b..94109e9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
@@ -20,14 +20,14 @@ package 
org.apache.ignite.internal.processors.query.calcite.util;
 import java.lang.reflect.Method;
 import org.apache.calcite.linq4j.tree.Types;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.DistributionTraitMetadata;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.SourceDistributionMetadata;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentLocationMetadata;
 
 /**
  *
  */
 public enum IgniteMethod {
     DISTRIBUTION_TRAIT(DistributionTraitMetadata.class, 
"getDistributionTrait"),
-    SOURCE_DISTRIBUTION(SourceDistributionMetadata.class, 
"getSourceDistribution");
+    FRAGMENT_LOCATION(FragmentLocationMetadata.class, "getLocation");
 
     private final Method method;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
new file mode 100644
index 0000000..a236774
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.util;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+
+/**
+ *
+ */
+public class IgniteRelShuttle extends RelShuttleImpl {
+    public RelNode visit(IgniteExchange rel) {
+        return visitChild(rel, 0, rel.getInput());
+    }
+
+    public RelNode visit(IgniteFilter rel) {
+        return visitChild(rel, 0, rel.getInput());
+    }
+
+    public RelNode visit(IgniteProject rel) {
+        return visitChild(rel, 0, rel.getInput());
+    }
+
+    public RelNode visit(Receiver rel) {
+        return visitChild(rel, 0, rel.getInput());
+    }
+
+    public RelNode visit(Sender rel) {
+        return visitChild(rel, 0, rel.getInput());
+    }
+
+    public RelNode visit(IgniteTableScan rel) {
+        return rel;
+    }
+
+    public RelNode visit(IgniteHashJoin rel) {
+        return visitChildren(rel);
+    }
+
+    @Override public RelNode visit(RelNode rel) {
+        if (rel instanceof IgniteExchange)
+            return visit((IgniteExchange)rel);
+        if (rel instanceof IgniteFilter)
+            return visit((IgniteFilter)rel);
+        if (rel instanceof IgniteProject)
+            return visit((IgniteProject)rel);
+        if (rel instanceof Receiver)
+            return visit((Receiver)rel);
+        if (rel instanceof Sender)
+            return visit((Sender)rel);
+        if (rel instanceof IgniteTableScan)
+            return visit((IgniteTableScan)rel);
+        if (rel instanceof IgniteHashJoin)
+            return visit((IgniteHashJoin)rel);
+
+        return visitOther(rel);
+    }
+
+    protected RelNode visitOther(RelNode rel) {
+        return super.visit(rel);
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 8806868..3138c6c 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -18,6 +18,10 @@
 package org.apache.ignite.internal.processors.query.calcite;
 
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.ConventionTraitDef;
@@ -28,7 +32,11 @@ import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
+import org.apache.ignite.internal.processors.query.calcite.metadata.Location;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
@@ -37,16 +45,17 @@ import 
org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry;
 import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -58,12 +67,8 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     private static CalciteQueryProcessor proc;
     private static SchemaPlus schema;
 
-    private static PartitionsDistribution developerDistribution;
-    private static PartitionsDistribution projectDistribution;
-    private static PartitionsDistribution randomDistribution;
-    private static PartitionsDistribution singleDistribution;
-
-    private static PartitionsDistributionRegistry registry;
+    private static TestRegistry registry;
+    private static List<ClusterNode> nodes;
 
     @BeforeClass
     public static void setupClass() {
@@ -109,47 +114,13 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
 
         schema.add("PUBLIC", publicSchema);
 
-        developerDistribution = new PartitionsDistribution();
-
-        developerDistribution.parts = 5;
-        developerDistribution.nodes = new int[]{0,1,2};
-        developerDistribution.nodeParts = new int[][]{{1,2},{3,4},{5}};
-
-        projectDistribution = new PartitionsDistribution();
-
-        projectDistribution.excessive = true;
-        projectDistribution.parts = 5;
-        projectDistribution.nodes = new int[]{0,1,2};
-        projectDistribution.nodeParts = new int[][]{{1,2,3,5},{2,3,4},{1,4,5}};
-
-        randomDistribution = new PartitionsDistribution();
-        randomDistribution.parts = 3;
-        randomDistribution.nodes = new int[]{0,1,2};
-        randomDistribution.nodeParts = new int[][]{{1},{2},{3}};
-
-        singleDistribution = new PartitionsDistribution();
-        singleDistribution.parts = 1;
-        singleDistribution.nodes = new int[]{0};
-        singleDistribution.nodeParts = new int[][]{{1}};
+        nodes = new ArrayList<>(4);
 
-        registry = new PartitionsDistributionRegistry() {
-            @Override public PartitionsDistribution distributed(int cacheId, 
AffinityTopologyVersion topVer) {
-                if (cacheId == CU.cacheId("Developer"))
-                    return developerDistribution;
-                if (cacheId == CU.cacheId("Project"))
-                    return projectDistribution;
-
-                throw new AssertionError("Unexpected cache id:" + cacheId);
-            }
-
-            @Override public PartitionsDistribution 
random(AffinityTopologyVersion topVer) {
-                return randomDistribution;
-            }
+        for (int i = 0; i < 4; i++) {
+            nodes.add(new GridTestNode(UUID.randomUUID()));
+        }
 
-            @Override public PartitionsDistribution single() {
-                return singleDistribution;
-            }
-        };
+        registry = new TestRegistry();
     }
 
     @Test
@@ -405,7 +376,72 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     }
 
     @Test
-    public void testSplitterCollocated() throws Exception {
+    public void testSplitterCollocatedPartitionedPartitioned() throws 
Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.id = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE
+        };
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerType.HEP, 
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteRel.LOGICAL_CONVENTION)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, 
rel, desired);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot);
+
+        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
+
+        assertNotNull(plan);
+
+        assertTrue(plan.fragments().size() == 2);
+    }
+
+    @Test
+    @Ignore("Need to request broadcast trait as a variant for left inner join 
sub-tree.")
+    public void testSplitterCollocatedReplicatedReplicated() throws Exception {
         String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +
             "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
@@ -413,6 +449,111 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
             "ON d.id = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
+        TestRegistry registry = new TestRegistry(){
+            @Override public DistributionTrait distribution(int cacheId, 
RowType rowType) {
+                return IgniteDistributions.broadcast();
+            }
+
+            @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+                if (cacheId == CU.cacheId("Developer"))
+                    return new Location(select(nodes, 0,1,2), null, 
Location.HAS_REPLICATED_CACHES);
+                if (cacheId == CU.cacheId("Project"))
+                    return new Location(select(nodes, 0,1,2), null, 
Location.HAS_REPLICATED_CACHES);
+
+                throw new AssertionError("Unexpected cache id:" + cacheId);
+            }
+        };
+
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE
+        };
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerType.HEP, 
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteRel.LOGICAL_CONVENTION)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, 
rel, desired);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot);
+
+        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
+
+        assertNotNull(plan);
+
+        assertTrue(plan.fragments().size() == 2);
+    }
+
+    @Test
+    @Ignore("Need to request broadcast trait as a variant for left inner join 
sub-tree.")
+    public void testSplitterCollocatedReplicatedAndPartitioned() throws 
Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.id = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        TestRegistry registry = new TestRegistry(){
+            @Override public DistributionTrait distribution(int cacheId, 
RowType rowType) {
+                if (cacheId == CU.cacheId("Project"))
+                    return IgniteDistributions.broadcast();
+
+                return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
+            }
+
+            @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+                if (cacheId == CU.cacheId("Developer"))
+                    return new Location(null, Arrays.asList(
+                        select(nodes, 0,1),
+                        select(nodes, 1,2),
+                        select(nodes, 2,0),
+                        select(nodes, 0,1),
+                        select(nodes, 1,2)
+                    ), Location.HAS_PARTITIONED_CACHES);
+                if (cacheId == CU.cacheId("Project"))
+                    return new Location(select(nodes, 0,1), null, 
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+
+                throw new AssertionError("Unexpected cache id:" + cacheId);
+            }
+        };
+
         Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
 
         assertNotNull(ctx);
@@ -595,4 +736,228 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
 
         assertTrue(plan.fragments().size() == 4);
     }
+
+    @Test
+    @Ignore("Need to request broadcast trait as a variant for left inner join 
sub-tree.")
+    public void testSplitterPartiallyReplicated1() throws Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.id = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+
+        TestRegistry registry = new TestRegistry(){
+            @Override public DistributionTrait distribution(int cacheId, 
RowType rowType) {
+                if (cacheId == CU.cacheId("Project"))
+                    return IgniteDistributions.broadcast();
+
+                return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
+            }
+
+            @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+                if (cacheId == CU.cacheId("Developer"))
+                    return new Location(null, Arrays.asList(
+                        select(nodes, 0,1),
+                        select(nodes, 1,2),
+                        select(nodes, 2,0),
+                        select(nodes, 0,1),
+                        select(nodes, 1,2)
+                    ), Location.HAS_PARTITIONED_CACHES);
+                if (cacheId == CU.cacheId("Project"))
+                    return new Location(select(nodes, 0,1), null, 
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+
+                throw new AssertionError("Unexpected cache id:" + cacheId);
+            }
+        };
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE
+        };
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerType.HEP, 
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteRel.LOGICAL_CONVENTION)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, 
rel, desired);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot);
+
+        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
+
+        assertNotNull(plan);
+
+        assertTrue(plan.fragments().size() == 2);
+    }
+
+    @Test
+    @Ignore("Need to request broadcast trait as a variant for left inner join 
sub-tree.")
+    public void testSplitterPartiallyReplicated2() throws Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.id = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+
+        TestRegistry registry = new TestRegistry(){
+            @Override public DistributionTrait distribution(int cacheId, 
RowType rowType) {
+                if (cacheId == CU.cacheId("Project"))
+                    return IgniteDistributions.broadcast();
+
+                return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
+            }
+
+            @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+                if (cacheId == CU.cacheId("Developer"))
+                    return new Location(null, Arrays.asList(
+                        select(nodes, 0,1),
+                        select(nodes, 2),
+                        select(nodes, 2,0),
+                        select(nodes, 0,1),
+                        select(nodes, 1,2)
+                    ), Location.HAS_PARTITIONED_CACHES);
+                if (cacheId == CU.cacheId("Project"))
+                    return new Location(select(nodes, 0,1), null, 
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+
+                throw new AssertionError("Unexpected cache id:" + cacheId);
+            }
+        };
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE
+        };
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerType.HEP, 
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteRel.LOGICAL_CONVENTION)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, 
rel, desired);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot);
+
+        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
+
+        assertNotNull(plan);
+
+        assertTrue(plan.fragments().size() == 3);
+    }
+
+    private static <T> List<T> select(List<T> src, int... idxs) {
+        ArrayList<T> res = new ArrayList<>(idxs.length);
+
+        for (int idx : idxs) {
+            res.add(src.get(idx));
+        }
+
+        return res;
+    }
+
+    private static class TestRegistry implements LocationRegistry, 
DistributionRegistry {
+        @Override public Location random(AffinityTopologyVersion topVer) {
+            return new Location(select(nodes, 0,1,2,3), null, (byte) 0);
+        }
+
+        @Override public Location single(AffinityTopologyVersion topVer) {
+            return new Location(select(nodes, 0), null, (byte) 0);
+        }
+
+        @Override public DistributionTrait distribution(int cacheId, RowType 
rowType) {
+            return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
+        }
+
+        @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+            if (cacheId == CU.cacheId("Developer"))
+                return new Location(null, Arrays.asList(
+                    select(nodes, 0,1),
+                    select(nodes, 1,2),
+                    select(nodes, 2,0),
+                    select(nodes, 0,1),
+                    select(nodes, 1,2)
+                ), Location.HAS_PARTITIONED_CACHES);
+            if (cacheId == CU.cacheId("Project"))
+                return new Location(null, Arrays.asList(
+                    select(nodes, 0,1),
+                    select(nodes, 1,2),
+                    select(nodes, 2,0),
+                    select(nodes, 0,1),
+                    select(nodes, 1,2)
+                ), Location.HAS_PARTITIONED_CACHES);
+
+            throw new AssertionError("Unexpected cache id:" + cacheId);
+        }
+    }
 }
\ No newline at end of file

Reply via email to