This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new e18d8a1  pending
e18d8a1 is described below

commit e18d8a17a211b61b15213ddedcc8ac637b4f5888
Author: Igor Seliverstov <gvvinbl...@gmail.com>
AuthorDate: Wed Oct 30 19:46:03 2019 +0300

    pending
---
 modules/calcite/pom.xml                            |  7 ++
 .../query/calcite/CalciteQueryProcessor.java       |  6 +-
 .../query/calcite/exchange/Receiver.java           | 14 ++++
 .../processors/query/calcite/exchange/Sender.java  | 38 ++++++++++
 .../calcite/metadata/IgniteMdDistribution.java     |  2 +-
 .../metadata/IgniteMdSourceDistribution.java       |  5 ++
 .../calcite/rel/logical/IgniteLogicalExchange.java | 15 ++--
 .../calcite/rel/logical/IgniteLogicalFilter.java   |  7 +-
 .../calcite/rel/logical/IgniteLogicalProject.java  |  7 +-
 .../rel/logical/IgniteLogicalTableScan.java        |  3 +-
 .../processors/query/calcite/rule/IgniteRules.java |  8 +--
 .../query/calcite/rule/logical/IgniteJoinRule.java |  7 +-
 .../query/calcite/schema/IgniteTable.java          | 18 +++--
 .../query/calcite/splitter/Fragment.java           | 68 ++++++++++++++++++
 .../calcite/splitter/PartitionsDistribution.java   |  2 +-
 .../splitter/PartitionsDistributionRegistry.java   |  4 +-
 .../splitter/{SplitTask.java => QueryPlan.java}    | 24 +++----
 .../splitter/{TaskSplitter.java => Splitter.java}  | 37 ++++------
 .../DistributionFunction.java}                     |  9 +--
 .../DistributionFunctionFactory.java}              |  9 +--
 .../query/calcite/trait/DistributionTrait.java     |  8 ++-
 .../query/calcite/trait/DistributionTraitDef.java  |  2 +-
 .../query/calcite/trait/DistributionTraitImpl.java |  8 ++-
 .../query/calcite/trait/IgniteDistributions.java   | 35 ++++++++--
 .../processors/query/calcite/util/Commons.java     | 21 +-----
 .../query/calcite/CalciteQueryProcessorTest.java   | 81 ++++++++++++++--------
 26 files changed, 304 insertions(+), 141 deletions(-)

diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml
index 1fa26fa..d654edb 100644
--- a/modules/calcite/pom.xml
+++ b/modules/calcite/pom.xml
@@ -79,6 +79,13 @@
         </dependency>
 
         <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>${slf4j.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-beans</artifactId>
             <version>${spring.version}</version>
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index d1ad130..b036397 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -48,7 +48,7 @@ import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.internal.processors.query.calcite.util.Commons.contextParameter;
+import static 
org.apache.ignite.internal.processors.query.calcite.util.Commons.provided;
 
 /**
  *
@@ -148,8 +148,8 @@ public class CalciteQueryProcessor implements QueryEngine {
         return Contexts.chain(ctx, config.getContext(),
             Contexts.of(
                 new Query(query, params),
-                contextParameter(ctx, SchemaPlus.class, schemaHolder::schema),
-                contextParameter(ctx, AffinityTopologyVersion.class, 
this::readyAffinityVersion)));
+                provided(ctx, SchemaPlus.class, schemaHolder::schema),
+                provided(ctx, AffinityTopologyVersion.class, 
this::readyAffinityVersion)));
     }
 
     private QueryExecution prepare(Context ctx) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
index 993f55b..fe35b52 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
@@ -21,13 +21,17 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
+import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
 
 /**
  *
  */
 public class Receiver extends SingleRel implements IgniteRel {
+    private SourceDistribution sourceDistribution;
+
     /**
      * @param cluster Cluster this relational expression belongs to
      * @param traits Trait set.
@@ -50,4 +54,14 @@ public class Receiver extends SingleRel implements IgniteRel 
{
     @Override public <T> T accept(IgniteVisitor<T> visitor) {
         return visitor.visitReceiver(this);
     }
+
+    public void init(SourceDistribution targetDistribution, RelMetadataQueryEx 
mq) {
+        getInput().init(targetDistribution);
+
+        sourceDistribution = getInput().sourceDistribution(mq);
+    }
+
+    public SourceDistribution sourceDistribution() {
+        return sourceDistribution;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
index 63cf8f7..f9bd762 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
@@ -16,17 +16,28 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exchange;
 
+import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
+import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 
 /**
  *
  */
 public class Sender extends SingleRel implements IgniteRel {
+    private SourceDistribution sourceDistribution;
+    private SourceDistribution targetDistribution;
+    private DistributionFunction targetFunction;
+
     /**
      * Creates a <code>SingleRel</code>.
      *
@@ -38,7 +49,34 @@ public class Sender extends SingleRel implements IgniteRel {
         super(cluster, traits, input);
     }
 
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new Sender(getCluster(), traitSet, sole(inputs));
+    }
+
     @Override public <T> T accept(IgniteVisitor<T> visitor) {
         return visitor.visitSender(this);
     }
+
+    public void init(SourceDistribution targetDistribution) {
+        this.targetDistribution = targetDistribution;
+    }
+
+    public DistributionFunction targetFunction() {
+        if (targetFunction == null) {
+            assert targetDistribution != null && 
targetDistribution.partitionMapping != null;
+
+            DistributionTrait distribution = 
getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+
+            targetFunction = 
distribution.functionFactory().create(targetDistribution, distribution.keys());
+        }
+
+        return targetFunction;
+    }
+
+    public SourceDistribution sourceDistribution(RelMetadataQuery mq) {
+        if (sourceDistribution == null)
+            sourceDistribution = 
RelMetadataQueryEx.wrap(mq).getSourceDistribution(getInput());
+
+        return sourceDistribution;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
index 298e905..f644d0e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
@@ -108,7 +108,7 @@ public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.Dist
                 newKeys.add(mapped);
             }
 
-            return IgniteDistributions.hash(newKeys);
+            return IgniteDistributions.hash(newKeys, trait.functionFactory());
         }
 
         return trait;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
index 255e06c..d44a57b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
@@ -27,6 +27,7 @@ import 
org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.SourceDistributionMetadata;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
 import 
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution;
@@ -58,6 +59,10 @@ public class IgniteMdSourceDistribution implements 
MetadataHandler<SourceDistrib
         return distribution(rel.getInput(), mq);
     }
 
+    public SourceDistribution getSourceDistribution(Sender rel, 
RelMetadataQuery mq) {
+        return rel.sourceDistribution(mq);
+    }
+
     public SourceDistribution getSourceDistribution(BiRel rel, 
RelMetadataQuery mq) {
         mq = RelMetadataQueryEx.wrap(mq);
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
index 3a577e9..32b8b09 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
@@ -22,12 +22,12 @@ import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Util;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 
 /**
@@ -52,14 +52,6 @@ public final class IgniteLogicalExchange extends SingleRel 
implements IgniteRel
             Util.nLogN(rowCount) * bytesPerRow, rowCount, 0);
     }
 
-    public DistributionTrait sourceDistribution() {
-        return 
getInput().getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
-    }
-
-    public DistributionTrait targetDistribution() {
-        return getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
-    }
-
     @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
         return new IgniteLogicalExchange(getCluster(), traitSet, sole(inputs));
     }
@@ -67,4 +59,9 @@ public final class IgniteLogicalExchange extends SingleRel 
implements IgniteRel
     @Override public <T> T accept(IgniteVisitor<T> visitor) {
         return visitor.visitExchange(this);
     }
+
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return super.explainTerms(pw)
+            .item("distribution", 
getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
index 34bbe3d..0da0d04 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
@@ -24,11 +24,12 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rex.RexNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 
 public final class IgniteLogicalFilter extends Filter implements IgniteRel {
   private final Set<CorrelationId> variablesSet;
@@ -53,10 +54,10 @@ public final class IgniteLogicalFilter extends Filter 
implements IgniteRel {
         .itemIf("variablesSet", variablesSet, !variablesSet.isEmpty());
   }
 
-  public static IgniteLogicalFilter create(LogicalFilter filter, RelNode 
input) {
+  public static IgniteLogicalFilter create(Filter filter, RelNode input) {
     RelTraitSet traits = filter.getTraitSet()
         .replace(IgniteRel.LOGICAL_CONVENTION)
-        
.replace(IgniteMdDistribution.filter(filter.getCluster().getMetadataQuery(), 
input, filter.getCondition()));
+        .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteMdDistribution.filter(RelMetadataQueryEx.instance(), input, 
filter.getCondition()));
 
     return new IgniteLogicalFilter(filter.getCluster(), traits, input, 
filter.getCondition(), filter.getVariablesSet());
   }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
index 56c0f50..e67d371 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
@@ -21,12 +21,13 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 
 public final class IgniteLogicalProject extends Project implements IgniteRel {
   public IgniteLogicalProject(
@@ -43,10 +44,10 @@ public final class IgniteLogicalProject extends Project 
implements IgniteRel {
     return new IgniteLogicalProject(getCluster(), traitSet, input, projects, 
rowType);
   }
 
-  public static IgniteLogicalProject create(LogicalProject project, RelNode 
input) {
+  public static IgniteLogicalProject create(Project project, RelNode input) {
     RelTraitSet traits = project.getTraitSet()
         .replace(IgniteRel.LOGICAL_CONVENTION)
-        
.replace(IgniteMdDistribution.project(project.getCluster().getMetadataQuery(), 
input, project.getProjects()));
+        .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteMdDistribution.project(RelMetadataQueryEx.instance(), input, 
project.getProjects()));
 
     return new IgniteLogicalProject(project.getCluster(), traits, input, 
project.getProjects(), project.getRowType());
   }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
index ca2ae90..fb4cf58 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
@@ -39,7 +39,8 @@ public final class IgniteLogicalTableScan extends TableScan 
implements IgniteRel
   }
 
   public SourceDistribution tableDistribution() {
-     return 
getTable().unwrap(IgniteTable.class).tableDistribution(getCluster().getPlanner().getContext());
+     return getTable().unwrap(IgniteTable.class)
+         .sourceDistribution(getCluster().getPlanner().getContext());
   }
 
   @Override public <T> T accept(IgniteVisitor<T> visitor) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
index 7873c6c..2d155f5 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
@@ -158,17 +158,15 @@ public class IgniteRules {
         SubQueryRemoveRule.PROJECT,
         SubQueryRemoveRule.JOIN);
 
-    public static final List<RelOptRule> IGNITE_BASE_RULES = ImmutableList.of(
+    public static final List<RelOptRule> IGNITE_LOGICAL_RULES = 
ImmutableList.of(
         IgniteFilterRule.INSTANCE,
         IgniteProjectRule.INSTANCE,
         IgniteJoinRule.INSTANCE);
 
     public static List<RelOptRule> logicalRules(Context ctx) {
         return ImmutableList.<RelOptRule>builder()
-//            .addAll(BASE_RULES)
-//            .addAll(ABSTRACT_RULES)
-            .addAll(ABSTRACT_RELATIONAL_RULES)
-            .addAll(IGNITE_BASE_RULES)
+            .addAll(IGNITE_LOGICAL_RULES)
+            .add(AbstractConverter.ExpandConversionRule.INSTANCE)
             .build();
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java
index a5abdea..3913cb4 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
@@ -45,11 +46,11 @@ public class IgniteJoinRule extends RelOptRule {
 
         RelTraitSet leftTraits = join.getLeft().getTraitSet()
             .replace(IgniteRel.LOGICAL_CONVENTION)
-            
.replace(IgniteDistributions.hash(join.analyzeCondition().leftKeys));
+            
.replace(IgniteDistributions.hash(join.analyzeCondition().leftKeys, 
IgniteDistributions.noOpFunction()));
 
         RelTraitSet rightTraits = join.getRight().getTraitSet()
             .replace(IgniteRel.LOGICAL_CONVENTION)
-            
.replace(IgniteDistributions.hash(join.analyzeCondition().rightKeys));
+            
.replace(IgniteDistributions.hash(join.analyzeCondition().rightKeys, 
IgniteDistributions.noOpFunction()));
 
         RelNode left = convert(join.getLeft(), leftTraits);
         RelNode right = convert(join.getRight(), rightTraits);
@@ -58,7 +59,7 @@ public class IgniteJoinRule extends RelOptRule {
 
         RelTraitSet traitSet = join.getTraitSet()
             .replace(IgniteRel.LOGICAL_CONVENTION)
-            .replace(IgniteMdDistribution.join(mq, left, right, 
join.getCondition()));
+            .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteMdDistribution.join(mq, left, right, join.getCondition()));
 
         call.transformTo(new IgniteLogicalJoin(join.getCluster(), traitSet, 
left, right,
             join.getCondition(), join.getVariablesSet(), join.getJoinType(), 
join.isSemiJoinDone()));
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index bccf570..bf8b752 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
 import 
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.util.GridIntList;
@@ -71,23 +72,26 @@ public class IgniteTable extends AbstractTable implements 
TranslatableTable {
     @Override public RelNode toRel(RelOptTable.ToRelContext context, 
RelOptTable relOptTable) {
         RelOptCluster cluster = context.getCluster();
         RelTraitSet traitSet = 
cluster.traitSet().replace(IgniteRel.LOGICAL_CONVENTION)
-                .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteDistributions.hash(rowType.distributionKeys()));
+                .replaceIf(DistributionTraitDef.INSTANCE, () -> 
distributionTrait(cluster.getPlanner().getContext()));
         return new IgniteLogicalTableScan(cluster, traitSet, relOptTable);
     }
 
-    public SourceDistribution tableDistribution(Context context) {
-        SourceDistribution res = new SourceDistribution();
+    public DistributionTrait distributionTrait(Context context) {
+        return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.noOpFunction()); // TODO
+    }
 
-        GridIntList localInputs = new GridIntList();
+    public SourceDistribution sourceDistribution(Context context) {
+        int cacheId = CU.cacheId(cacheName);
 
-        localInputs.add(CU.cacheId(cacheName));
+        SourceDistribution res = new SourceDistribution();
 
+        GridIntList localInputs = new GridIntList();
+        localInputs.add(cacheId);
         res.localInputs = localInputs;
 
         PartitionsDistributionRegistry registry = 
context.unwrap(PartitionsDistributionRegistry.class);
         AffinityTopologyVersion topVer = 
context.unwrap(AffinityTopologyVersion.class);
-
-        res.partitionMapping = 
registry.partitionsDistribution(CU.cacheId(cacheName), topVer);
+        res.partitionMapping = registry.get(cacheId, topVer);
 
         return res;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
new file mode 100644
index 0000000..f9ee3d1
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.splitter;
+
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class Fragment {
+    public final RelNode root;
+
+    public SourceDistribution distribution;
+
+    public Fragment(RelNode root) {
+        this.root = root;
+    }
+
+    public void init(Context ctx) {
+        RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
+
+        distribution = mq.getSourceDistribution(root);
+
+        PartitionsDistribution mapping = distribution.partitionMapping;
+
+        if (mapping == null)
+            distribution.partitionMapping = isRootFragment() ? 
registry(ctx).single() : registry(ctx).random(topologyVersion(ctx));
+        else if (mapping.excessive)
+            distribution.partitionMapping = mapping.deduplicate();
+        
+        if (!F.isEmpty(distribution.remoteInputs)) {
+            for (Receiver input : distribution.remoteInputs)
+                input.init(distribution, mq);
+        }
+    }
+
+    private boolean isRootFragment() {
+        return !(root instanceof Sender);
+    }
+
+    private PartitionsDistributionRegistry registry(Context ctx) {
+        return ctx.unwrap(PartitionsDistributionRegistry.class);
+    }
+
+    private AffinityTopologyVersion topologyVersion(Context ctx) {
+        return ctx.unwrap(AffinityTopologyVersion.class);
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
index 2a7707c..72ed1fa 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
@@ -42,7 +42,7 @@ public class PartitionsDistribution {
 
         int i = 0, j = 0, k = 0;
 
-        while (i < nodes0.length && j < other.nodes.length) {
+        while (i < nodes.length && j < other.nodes.length) {
             if (nodes[i] < other.nodes[j])
                 i++;
             else if (other.nodes[j] < nodes[i])
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
index f234ae7..568ce65 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
@@ -22,5 +22,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  *
  */
 public interface PartitionsDistributionRegistry {
-    PartitionsDistribution partitionsDistribution(int cacheId, 
AffinityTopologyVersion topVer);
+    PartitionsDistribution get(int cacheId, AffinityTopologyVersion topVer);
+    PartitionsDistribution random(AffinityTopologyVersion topVer);
+    PartitionsDistribution single();
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
similarity index 60%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index 1fbe086..84985e6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -16,26 +16,22 @@
 
 package org.apache.ignite.internal.processors.query.calcite.splitter;
 
-import org.apache.calcite.rel.RelNode;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.Context;
 
 /**
  *
  */
-public class SplitTask {
-    public final SourceDistribution distribution;
-    public final RelNode root;
+public class QueryPlan {
+    private final ImmutableList<Fragment> fragments;
 
-    public SplitTask(RelNode root, SourceDistribution distribution) {
-        this.distribution = distribution;
-        this.root = root;
-
-        init();
+    public QueryPlan(ImmutableList<Fragment> fragments) {
+        this.fragments = fragments;
     }
 
-    private void init() {
-        PartitionsDistribution mapping = distribution.partitionMapping;
-
-        if (mapping != null && mapping.excessive)
-            distribution.partitionMapping = mapping.deduplicate();
+    public void init(Context ctx) {
+        for (Fragment fragment : fragments) {
+            fragment.init(ctx);
+        }
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
similarity index 73%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index 22527b3..5220d74 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -16,15 +16,13 @@
 
 package org.apache.ignite.internal.processors.query.calcite.splitter;
 
-import java.util.ArrayList;
+import com.google.common.collect.ImmutableList;
 import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdSourceDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalExchange;
@@ -36,35 +34,26 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLog
 /**
  *
  */
-public class TaskSplitter implements IgniteVisitor<IgniteRel> {
-    /** */
-    private List<IgniteRel> roots;
+public class Splitter implements IgniteVisitor<IgniteRel> {
+    private ImmutableList.Builder<Fragment> b;
 
-    public List<SplitTask> go(IgniteRel root) {
-        roots = new ArrayList<>();
+    public QueryPlan go(IgniteRel root) {
+        b = ImmutableList.builder();
 
-        roots.add(root.accept(this));
-
-        ArrayList<SplitTask> splitTasks = new ArrayList<>(roots.size());
-
-        RelMetadataQuery mq = RelMetadataQueryEx.instance();
-
-        for (IgniteRel igniteRel : roots) {
-            splitTasks.add(new SplitTask(igniteRel, 
IgniteMdSourceDistribution.distribution(igniteRel, mq)));
-        }
-
-        return splitTasks;
+        return new QueryPlan(b.add(new Fragment(root.accept(this))).build());
     }
 
     @Override public IgniteRel visitExchange(IgniteLogicalExchange exchange) {
         RelOptCluster cluster = exchange.getCluster();
-        RelNode input = exchange.getInput();
+        RelTraitSet traitSet = exchange.getTraitSet();
+
+        IgniteRel input = visitChildren(exchange.getInput());
 
-        Sender sender = new Sender(cluster, input.getTraitSet(), 
visitChildren(input));
+        Sender sender = new Sender(cluster, traitSet, input);
 
-        roots.add(sender);
+        b.add(new Fragment(sender));
 
-        return new Receiver(cluster, exchange.getTraitSet(), sender);
+        return new Receiver(cluster, traitSet, sender);
     }
 
     @Override public IgniteRel visitFilter(IgniteLogicalFilter filter) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
similarity index 70%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
index f234ae7..76af490 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
@@ -14,13 +14,14 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.trait;
 
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
 
 /**
  *
  */
-public interface PartitionsDistributionRegistry {
-    PartitionsDistribution partitionsDistribution(int cacheId, 
AffinityTopologyVersion topVer);
+public interface DistributionFunction {
+    List<ClusterNode> destination(Object row);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
similarity index 67%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
index f234ae7..8bc129e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
@@ -14,13 +14,14 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.trait;
 
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
 
 /**
  *
  */
-public interface PartitionsDistributionRegistry {
-    PartitionsDistribution partitionsDistribution(int cacheId, 
AffinityTopologyVersion topVer);
+public interface DistributionFunctionFactory {
+    DistributionFunction create(SourceDistribution targetDistr, 
ImmutableIntList keys);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index 3dabee1..2510c2c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -16,6 +16,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.util.Objects;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.util.ImmutableIntList;
@@ -45,10 +46,10 @@ public interface DistributionTrait extends RelTrait {
         }
     }
 
-    DistributionTrait ANY = IgniteDistributions.single();
-
     DistributionType type();
 
+    DistributionFunctionFactory functionFactory();
+
     @Override default RelTraitDef getTraitDef() {
         return DistributionTraitDef.INSTANCE;
     }
@@ -66,7 +67,8 @@ public interface DistributionTrait extends RelTrait {
             return true;
 
         if (type() == other.type())
-            return type() != DistributionType.HASH || 
keys().equals(other.keys());
+            return type() != DistributionType.HASH
+                || (Objects.equals(keys(), other.keys()) && 
Objects.equals(functionFactory(), other.functionFactory()));
 
         return other.type() == DistributionType.RANDOM && type() == 
DistributionType.HASH;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
index facc258..ef614bc 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
@@ -64,6 +64,6 @@ public class DistributionTraitDef extends 
RelTraitDef<DistributionTrait> {
     }
 
     @Override public DistributionTrait getDefault() {
-        return DistributionTrait.ANY;
+        return IgniteDistributions.any();
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
index 5b7b748..099c96b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
@@ -26,16 +26,22 @@ import org.apache.calcite.util.ImmutableIntList;
 public class DistributionTraitImpl implements DistributionTrait {
     private final DistributionType type;
     private final ImmutableIntList keys;
+    private final DistributionFunctionFactory functionFactory;
 
-    public DistributionTraitImpl(DistributionType type, ImmutableIntList keys) 
{
+    public DistributionTraitImpl(DistributionType type, ImmutableIntList keys, 
DistributionFunctionFactory functionFactory) {
         this.type = type;
         this.keys = keys;
+        this.functionFactory = functionFactory;
     }
 
     @Override public DistributionType type() {
         return type;
     }
 
+    @Override public DistributionFunctionFactory functionFactory() {
+        return functionFactory;
+    }
+
     @Override public ImmutableIntList keys() {
         return keys;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 7537d63..59f69fe 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -25,11 +25,12 @@ import static 
org.apache.ignite.internal.processors.query.calcite.trait.Distribu
  *
  */
 public class IgniteDistributions {
-    /** */
-    private static final DistributionTraitDef traitDef = 
DistributionTraitDef.INSTANCE;
-    private static final DistributionTrait SINGLE = traitDef.canonize(new 
DistributionTraitImpl(DistributionTrait.DistributionType.SINGLE, 
ImmutableIntList.of()));
-    private static final DistributionTrait RANDOM = traitDef.canonize(new 
DistributionTraitImpl(DistributionTrait.DistributionType.RANDOM, 
ImmutableIntList.of()));
-    private static final DistributionTrait ANY    = traitDef.canonize(new 
DistributionTraitImpl(DistributionTrait.DistributionType.ANY, 
ImmutableIntList.of()));
+    private static final DistributionFunctionFactory NO_OP_FACTORY = (t,k) -> 
null;
+
+    private static final DistributionTrait BROADCAST = new 
DistributionTraitImpl(DistributionTrait.DistributionType.BROADCAST, 
ImmutableIntList.of(), allTargetsFunction());
+    private static final DistributionTrait SINGLE = new 
DistributionTraitImpl(DistributionTrait.DistributionType.SINGLE, 
ImmutableIntList.of(), singleTargetFunction());
+    private static final DistributionTrait RANDOM = new 
DistributionTraitImpl(DistributionTrait.DistributionType.RANDOM, 
ImmutableIntList.of(), randomTargetFunction());
+    private static final DistributionTrait ANY    = new 
DistributionTraitImpl(DistributionTrait.DistributionType.ANY, 
ImmutableIntList.of(), noOpFunction());
 
     public static DistributionTrait any() {
         return ANY;
@@ -43,7 +44,27 @@ public class IgniteDistributions {
         return SINGLE;
     }
 
-    public static DistributionTrait hash(List<Integer> keys) {
-        return traitDef.canonize(new DistributionTraitImpl(HASH, 
ImmutableIntList.copyOf(keys)));
+    public static DistributionTrait broadcast() {
+        return BROADCAST;
+    }
+
+    public static DistributionTrait hash(List<Integer> keys, 
DistributionFunctionFactory factory) {
+        return new DistributionTraitImpl(HASH, ImmutableIntList.copyOf(keys), 
factory);
+    }
+
+    public static DistributionFunctionFactory noOpFunction() {
+        return NO_OP_FACTORY;
+    }
+
+    public static DistributionFunctionFactory singleTargetFunction() {
+        return noOpFunction(); // TODO
+    }
+
+    public static DistributionFunctionFactory allTargetsFunction() {
+        return noOpFunction(); // TODO
+    }
+
+    public static DistributionFunctionFactory randomTargetFunction() {
+        return noOpFunction(); // TODO
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index b638815..739d411 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -31,7 +31,6 @@ import org.apache.calcite.plan.RelOptNode;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
@@ -52,11 +51,11 @@ public final class Commons {
         return ctx == null ? Contexts.empty() : 
Contexts.of(ctx.unwrap(Object[].class));
     }
 
-    public static <T> @Nullable T contextParameter(Context ctx, Class<T> 
paramType, Supplier<T> paramSrc) {
+    public static <T> @Nullable T provided(Context ctx, Class<T> paramType, 
Supplier<T> paramSrc) {
         T param = ctx.unwrap(paramType);
 
         if (param != null)
-            return param;
+            return null; // Provided by parent context.
 
         return paramSrc.get();
     }
@@ -81,26 +80,10 @@ public final class Commons {
         return b.build();
     }
 
-    public static RelOptRuleOperand any(Class<? extends RelNode> first, 
RelTrait trait){
-        return RelOptRule.operand(first, trait, RelOptRule.any());
-    }
-
-    public static RelOptRuleOperand any(Class<? extends RelNode> first){
-        return RelOptRule.operand(first, RelOptRule.any());
-    }
-
     public static RelOptRuleOperand any(Class<? extends RelNode> first, 
Class<? extends RelNode> second) {
         return RelOptRule.operand(first, RelOptRule.operand(second, 
RelOptRule.any()));
     }
 
-    public static RelOptRuleOperand some(Class<? extends RelNode> rel, 
RelOptRuleOperand first, RelOptRuleOperand... rest){
-        return RelOptRule.operand(rel, RelOptRule.some(first, rest));
-    }
-
-    public static RelOptRuleOperand some(Class<? extends RelNode> rel, 
RelTrait trait, RelOptRuleOperand first, RelOptRuleOperand... rest){
-        return RelOptRule.operand(rel, trait, RelOptRule.some(first, rest));
-    }
-
     public static <T extends RelNode> RelOp<T, Boolean> 
transformSubset(RelOptRuleCall call, RelNode input, BiFunction<T, RelNode, 
RelNode> transformFun) {
         return rel -> {
             if (!(input instanceof RelSubset))
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 884b9ca..d9ebd5b 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.calcite;
 
 
-import java.util.List;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.ConventionTraitDef;
@@ -40,13 +39,12 @@ import 
org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
 import 
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry;
-import org.apache.ignite.internal.processors.query.calcite.splitter.SplitTask;
-import 
org.apache.ignite.internal.processors.query.calcite.splitter.TaskSplitter;
+import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -54,7 +52,7 @@ import org.junit.Test;
 /**
  *
  */
-@WithSystemProperty(key = "calcite.debug", value = "true")
+//@WithSystemProperty(key = "calcite.debug", value = "true")
 public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
     private static CalciteQueryProcessor proc;
@@ -62,6 +60,9 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
 
     private static PartitionsDistribution developerDistribution;
     private static PartitionsDistribution projectDistribution;
+    private static PartitionsDistribution randomDistribution;
+    private static PartitionsDistribution singleDistribution;
+    private static PartitionsDistributionRegistry registry;
 
     @BeforeClass
     public static void setupClass() {
@@ -80,12 +81,6 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 .field("cityId", Integer.class)
                 .build()));
 
-        developerDistribution = new PartitionsDistribution();
-
-        developerDistribution.parts = 5;
-        developerDistribution.nodes = new int[]{0,1,2};
-        developerDistribution.nodeParts = new int[][]{{1,2},{3,4},{5}};
-
         publicSchema.addTable("Project", new IgniteTable("Project", "Project",
             RowType.builder()
                 .keyField("id", Integer.class, true)
@@ -93,12 +88,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 .field("ver", Integer.class)
                 .build()));
 
-        projectDistribution = new PartitionsDistribution();
 
-        projectDistribution.excessive = true;
-        projectDistribution.parts = 5;
-        projectDistribution.nodes = new int[]{0,1,2};
-        projectDistribution.nodeParts = new int[][]{{1,2,3,5},{2,3,4},{1,4,5}};
 
         publicSchema.addTable("Country", new IgniteTable("Country", "Country",
             RowType.builder()
@@ -117,6 +107,48 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
         schema = Frameworks.createRootSchema(false);
 
         schema.add("PUBLIC", publicSchema);
+
+        developerDistribution = new PartitionsDistribution();
+
+        developerDistribution.parts = 5;
+        developerDistribution.nodes = new int[]{0,1,2};
+        developerDistribution.nodeParts = new int[][]{{1,2},{3,4},{5}};
+
+        projectDistribution = new PartitionsDistribution();
+
+        projectDistribution.excessive = true;
+        projectDistribution.parts = 5;
+        projectDistribution.nodes = new int[]{0,1,2};
+        projectDistribution.nodeParts = new int[][]{{1,2,3,5},{2,3,4},{1,4,5}};
+
+        randomDistribution = new PartitionsDistribution();
+        randomDistribution.parts = 3;
+        randomDistribution.nodes = new int[]{0,1,2};
+        randomDistribution.nodeParts = new int[][]{{1},{2},{3}};
+
+        singleDistribution = new PartitionsDistribution();
+        singleDistribution.parts = 1;
+        singleDistribution.nodes = new int[]{0};
+        singleDistribution.nodeParts = new int[][]{{1}};
+
+        registry = new PartitionsDistributionRegistry() {
+            @Override public PartitionsDistribution get(int cacheId, 
AffinityTopologyVersion topVer) {
+                if (cacheId == CU.cacheId("Developer"))
+                    return developerDistribution;
+                if (cacheId == CU.cacheId("Project"))
+                    return projectDistribution;
+
+                throw new AssertionError("Unexpected cache id:" + cacheId);
+            }
+
+            @Override public PartitionsDistribution 
random(AffinityTopologyVersion topVer) {
+                return randomDistribution;
+            }
+
+            @Override public PartitionsDistribution single() {
+                return singleDistribution;
+            }
+        };
     }
 
     @Test
@@ -128,15 +160,6 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
             "ON d.projectId = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        PartitionsDistributionRegistry registry = (id, top) -> {
-            if (id == CU.cacheId("Developer"))
-                return developerDistribution;
-            if (id == CU.cacheId("Project"))
-                return projectDistribution;
-
-            throw new AssertionError("Unexpected cache id:" + id);
-        };
-
         Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
 
         assertNotNull(ctx);
@@ -181,8 +204,12 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
 
         assertNotNull(relRoot);
 
-        List<SplitTask> fragments = new TaskSplitter().go((IgniteRel) 
relRoot.rel);
+        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
 
-        assertNotNull(fragments);
+        assertNotNull(plan);
     }
 }
\ No newline at end of file

Reply via email to