This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new 8e5e2ba  IGNITE-13817 fix traits passThrough propagation (#8544)
8e5e2ba is described below

commit 8e5e2baa002ce07e69c95e0235271f3dac4a171c
Author: korlov42 <[email protected]>
AuthorDate: Wed Dec 16 16:26:19 2020 +0300

    IGNITE-13817 fix traits passThrough propagation (#8544)
---
 .../query/calcite/exec/exp/agg/Accumulators.java   | 186 ++++++++++++++++++++-
 .../query/calcite/exec/rel/MergeJoinNode.java      |   3 +-
 .../calcite/metadata/IgniteMdCumulativeCost.java   |   4 +-
 .../calcite/rel/AbstractIgniteNestedLoopJoin.java  | 105 +++---------
 .../query/calcite/rel/IgniteAggregate.java         |  49 ++----
 .../rel/IgniteCorrelatedNestedLoopJoin.java        |  20 +--
 .../query/calcite/rel/IgniteExchange.java          |   4 +-
 .../processors/query/calcite/rel/IgniteFilter.java |  32 +---
 .../query/calcite/rel/IgniteIndexScan.java         |   7 +-
 .../query/calcite/rel/IgniteMergeJoin.java         | 100 ++---------
 .../query/calcite/rel/IgniteNestedLoopJoin.java    |  12 +-
 .../query/calcite/rel/IgniteProject.java           |  38 ++---
 .../query/calcite/rel/IgniteTableScan.java         |   7 +-
 .../query/calcite/rel/IgniteTableSpool.java        |   6 +-
 .../query/calcite/rel/IgniteUnionAll.java          |  34 +---
 .../calcite/rule/CorrelatedNestedLoopJoinRule.java |   7 +-
 .../query/calcite/trait/RelRegistrar.java          | 114 -------------
 .../processors/query/calcite/trait/TraitUtils.java |  39 +++--
 .../query/calcite/trait/TraitsAwareIgniteRel.java  |  36 ++--
 .../CalciteBasicSecondaryIndexIntegrationTest.java |   4 +-
 .../query/calcite/CalciteQueryProcessorTest.java   | 125 +++++++++++++-
 .../calcite/exec/rel/MergeJoinExecutionTest.java   |   3 +-
 22 files changed, 465 insertions(+), 470 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
index bce0dde..f458fee 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
@@ -45,6 +45,8 @@ public class Accumulators {
                 return avgFactory(call);
             case "SUM":
                 return sumFactory(call);
+            case "$SUM0":
+                return sumEmptyIsZeroFactory(call);
             case "MIN":
                 return minFactory(call);
             case "MAX":
@@ -89,6 +91,23 @@ public class Accumulators {
     }
 
     /** */
+    private static Supplier<Accumulator> sumEmptyIsZeroFactory(AggregateCall 
call) {
+        switch (call.type.getSqlTypeName()) {
+            case DOUBLE:
+            case REAL:
+            case FLOAT:
+                return DoubleSumEmptyIsZero.FACTORY;
+            case DECIMAL:
+                return DecimalSumEmptyIsZero.FACTORY;
+            case INTEGER:
+                return IntSumEmptyIsZero.FACTORY;
+            case BIGINT:
+            default:
+                return LongSumEmptyIsZero.FACTORY;
+        }
+    }
+
+    /** */
     private static Supplier<Accumulator> minFactory(AggregateCall call) {
         switch (call.type.getSqlTypeName()) {
             case DOUBLE:
@@ -278,7 +297,7 @@ public class Accumulators {
 
         /** {@inheritDoc} */
         @Override public List<RelDataType> argumentTypes(IgniteTypeFactory 
typeFactory) {
-            return F.asList();
+            return 
F.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(BIGINT),
 false));
         }
 
         /** {@inheritDoc} */
@@ -479,6 +498,171 @@ public class Accumulators {
     }
 
     /** */
+    private static class DoubleSumEmptyIsZero implements Accumulator {
+        /** */
+        public static final Supplier<Accumulator> FACTORY = 
DoubleSumEmptyIsZero::new;
+
+        /** */
+        private double sum;
+
+        /** {@inheritDoc} */
+        @Override public void add(Object... args) {
+            Double in = (Double) args[0];
+
+            if (in == null)
+                return;
+
+            sum += in;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(Accumulator other) {
+            DoubleSumEmptyIsZero other0 = (DoubleSumEmptyIsZero) other;
+
+            sum += other0.sum;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object end() {
+            return sum;
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<RelDataType> argumentTypes(IgniteTypeFactory 
typeFactory) {
+            return 
F.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(DOUBLE),
 true));
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType returnType(IgniteTypeFactory typeFactory) 
{
+            return 
typeFactory.createTypeWithNullability(typeFactory.createSqlType(DOUBLE), true);
+        }
+    }
+
+    /** */
+    private static class IntSumEmptyIsZero implements Accumulator {
+        /** */
+        public static final Supplier<Accumulator> FACTORY = 
IntSumEmptyIsZero::new;
+
+        /** */
+        private int sum;
+
+        /** {@inheritDoc} */
+        @Override public void add(Object... args) {
+            Integer in = (Integer) args[0];
+
+            if (in == null)
+                return;
+
+            sum += in;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(Accumulator other) {
+            IntSumEmptyIsZero other0 = (IntSumEmptyIsZero) other;
+
+            sum += other0.sum;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object end() {
+            return sum;
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<RelDataType> argumentTypes(IgniteTypeFactory 
typeFactory) {
+            return 
F.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(INTEGER),
 true));
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType returnType(IgniteTypeFactory typeFactory) 
{
+            return 
typeFactory.createTypeWithNullability(typeFactory.createSqlType(INTEGER), true);
+        }
+    }
+
+    /** */
+    private static class LongSumEmptyIsZero implements Accumulator {
+        /** */
+        public static final Supplier<Accumulator> FACTORY = 
LongSumEmptyIsZero::new;
+
+        /** */
+        private long sum;
+
+        /** */
+        /** {@inheritDoc} */
+        @Override public void add(Object... args) {
+            Long in = (Long) args[0];
+
+            if (in == null)
+                return;
+
+            sum += in;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(Accumulator other) {
+            LongSumEmptyIsZero other0 = (LongSumEmptyIsZero) other;
+
+            sum += other0.sum;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object end() {
+            return sum;
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<RelDataType> argumentTypes(IgniteTypeFactory 
typeFactory) {
+            return 
F.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(BIGINT),
 true));
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType returnType(IgniteTypeFactory typeFactory) 
{
+            return 
typeFactory.createTypeWithNullability(typeFactory.createSqlType(BIGINT), true);
+        }
+    }
+
+    /** */
+    private static class DecimalSumEmptyIsZero implements Accumulator {
+        /** */
+        public static final Supplier<Accumulator> FACTORY = 
DecimalSumEmptyIsZero::new;
+
+        /** */
+        private BigDecimal sum;
+
+        /** {@inheritDoc} */
+        @Override public void add(Object... args) {
+            BigDecimal in = (BigDecimal) args[0];
+
+            if (in == null)
+                return;
+
+            sum = sum == null ? in : sum.add(in);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(Accumulator other) {
+            DecimalSumEmptyIsZero other0 = (DecimalSumEmptyIsZero) other;
+
+            sum = sum == null ? other0.sum : sum.add(other0.sum);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object end() {
+            return sum != null ? sum : BigDecimal.ZERO;
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<RelDataType> argumentTypes(IgniteTypeFactory 
typeFactory) {
+            return 
F.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(DECIMAL),
 true));
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType returnType(IgniteTypeFactory typeFactory) 
{
+            return 
typeFactory.createTypeWithNullability(typeFactory.createSqlType(DECIMAL), true);
+        }
+    }
+
+    /** */
     private static class DoubleMinMax implements Accumulator {
         /** */
         public static final Supplier<Accumulator> MIN_FACTORY = () -> new 
DoubleMinMax(true);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
index c43dd60..feaa373 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
@@ -469,7 +469,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
         @Override protected void join() throws IgniteCheckedException {
             inLoop = true;
             try {
-                while (requested > 0 && (left != null || 
!leftInBuf.isEmpty())) {
+                while (requested > 0 && (left != null || !leftInBuf.isEmpty())
+                    && !(right == null && rightInBuf.isEmpty() && 
rightMaterialization == null && waitingRight != NOT_WAITING)) {
                     checkState();
 
                     if (left == null) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdCumulativeCost.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdCumulativeCost.java
index 3f71f65..7099dbb 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdCumulativeCost.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdCumulativeCost.java
@@ -93,9 +93,7 @@ public class IgniteMdCumulativeCost implements 
MetadataHandler<BuiltInMetadata.C
         if (rightCost.isInfinite())
             return rightCost;
 
-        
cost.plus(leftCost).plus(rightCost.multiplyBy(left.estimateRowCount(mq) / 
corIds.size()));
-
-        return cost;
+        return 
cost.plus(leftCost).plus(rightCost.multiplyBy(left.estimateRowCount(mq) / 
corIds.size()));
     }
 
     /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
index b1a729a..00041b0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
@@ -33,7 +33,6 @@ import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelNodes;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
@@ -224,7 +223,7 @@ public abstract class AbstractIgniteNestedLoopJoin extends 
Join implements Trait
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
         // We preserve left collation since it's translated into a nested loop 
join with an outer loop
         // over a left edge. The code below checks whether a desired collation 
is possible and requires
         // appropriate collation from the left edge.
@@ -234,8 +233,8 @@ public abstract class AbstractIgniteNestedLoopJoin extends 
Join implements Trait
         RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
 
         if (collation.equals(RelCollations.EMPTY))
-            return ImmutableList.of(Pair.of(nodeTraits,
-                ImmutableList.of(left.replace(RelCollations.EMPTY), 
right.replace(RelCollations.EMPTY))));
+            return Pair.of(nodeTraits,
+                ImmutableList.of(left.replace(RelCollations.EMPTY), 
right.replace(RelCollations.EMPTY)));
 
         if (!projectsLeft(collation))
             collation = RelCollations.EMPTY;
@@ -248,24 +247,12 @@ public abstract class AbstractIgniteNestedLoopJoin 
extends Join implements Trait
             }
         }
 
-        return ImmutableList.of(Pair.of(nodeTraits.replace(collation),
-            ImmutableList.of(left.replace(collation), 
right.replace(RelCollations.EMPTY))));
+        return Pair.of(nodeTraits.replace(collation),
+            ImmutableList.of(left.replace(collation), 
right.replace(RelCollations.EMPTY)));
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) 
{
-        // The node is rewindable only if both sources are rewindable.
-
-        RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
-
-        RewindabilityTrait rewindability = 
TraitUtils.rewindability(nodeTraits);
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(rewindability),
-            ImmutableList.of(left.replace(rewindability), 
right.replace(rewindability))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
         // Tere are several rules:
         // 1) any join is possible on broadcast or single distribution
         // 2) hash distributed join is possible when join keys equal to source 
distribution keys
@@ -277,23 +264,14 @@ public abstract class AbstractIgniteNestedLoopJoin 
extends Join implements Trait
 
         RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
 
-        List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
-
-        RelTraitSet outTraits, leftTraits, rightTraits;
-
         IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
 
         RelDistribution.Type distrType = distribution.getType();
         switch (distrType) {
             case BROADCAST_DISTRIBUTED:
             case SINGLETON:
-                outTraits = nodeTraits.replace(distribution);
-                leftTraits = left.replace(distribution);
-                rightTraits = right.replace(distribution);
-
-                res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, 
rightTraits)));
+                return Pair.of(nodeTraits, Commons.transform(inputTraits, t -> 
t.replace(distribution)));
 
-                break;
             case HASH_DISTRIBUTED:
             case RANDOM_DISTRIBUTED:
                 // Such join may be replaced as a cross join with a filter 
uppon it.
@@ -307,55 +285,18 @@ public abstract class AbstractIgniteNestedLoopJoin 
extends Join implements Trait
                     ? distribution.function()
                     : DistributionFunction.hash();
 
-                IgniteDistribution outDistr; // TODO distribution multitrait 
support
-
-                outDistr = hash(joinInfo.leftKeys, function);
+                IgniteDistribution outDistr = hash(joinInfo.leftKeys, 
function);
 
                 if (distrType != HASH_DISTRIBUTED || 
outDistr.satisfies(distribution)) {
-                    outTraits = nodeTraits.replace(outDistr);
-                    leftTraits = left.replace(hash(joinInfo.leftKeys, 
function));
-                    rightTraits = right.replace(hash(joinInfo.rightKeys, 
function));
-
-                    res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, 
rightTraits)));
-
-                    if (joinType == INNER || joinType == LEFT) {
-                        outTraits = nodeTraits.replace(outDistr);
-                        leftTraits = left.replace(hash(joinInfo.leftKeys, 
function));
-                        rightTraits = right.replace(broadcast());
-
-                        res.add(Pair.of(outTraits, 
ImmutableList.of(leftTraits, rightTraits)));
-                    }
+                    return Pair.of(nodeTraits.replace(outDistr),
+                        ImmutableList.of(left.replace(outDistr), 
right.replace(hash(joinInfo.rightKeys, function))));
                 }
 
-                outDistr = hash(joinInfo.rightKeys, function);
-
-                if (distrType != HASH_DISTRIBUTED || 
outDistr.satisfies(distribution)) {
-                    outTraits = nodeTraits.replace(outDistr);
-                    leftTraits = left.replace(hash(joinInfo.leftKeys, 
function));
-                    rightTraits = right.replace(hash(joinInfo.rightKeys, 
function));
-
-                    res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, 
rightTraits)));
-
-                    if (joinType == INNER || joinType == RIGHT) {
-                        outTraits = nodeTraits.replace(outDistr);
-                        leftTraits = left.replace(broadcast());
-                        rightTraits = right.replace(hash(joinInfo.rightKeys, 
function));
-
-                        res.add(Pair.of(outTraits, 
ImmutableList.of(leftTraits, rightTraits)));
-                    }
-                }
-
-                break;
-
             default:
-                break;
+                // NO-OP
         }
 
-        if (!res.isEmpty())
-            return res;
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(single()),
-            ImmutableList.of(left.replace(single()), 
right.replace(single()))));
+        return Pair.of(nodeTraits.replace(single()), 
Commons.transform(inputTraits, t -> t.replace(single())));
     }
 
     /** {@inheritDoc} */
@@ -370,18 +311,8 @@ public abstract class AbstractIgniteNestedLoopJoin extends 
Join implements Trait
         // Joins can be flipped, and for many algorithms, both versions are 
viable
         // and have the same cost. To make the results stable between versions 
of
         // the planner, make one of the versions slightly more expensive.
-        switch (joinType) {
-            case SEMI:
-            case ANTI:
-                // SEMI and ANTI join cannot be flipped
-                break;
-            case RIGHT:
-                rowCount = RelMdUtil.addEpsilon(rowCount);
-                break;
-            default:
-                if (RelNodes.COMPARATOR.compare(left, right) > 0)
-                    rowCount = RelMdUtil.addEpsilon(rowCount);
-        }
+        if (joinType == RIGHT)
+            rowCount = RelMdUtil.addEpsilon(rowCount);
 
         final double rightRowCount = right.estimateRowCount(mq);
         final double leftRowCount = left.estimateRowCount(mq);
@@ -391,11 +322,17 @@ public abstract class AbstractIgniteNestedLoopJoin 
extends Join implements Trait
         if (Double.isInfinite(rightRowCount))
             rowCount = rightRowCount;
 
+        if (!Double.isInfinite(leftRowCount) && 
!Double.isInfinite(rightRowCount) && leftRowCount > rightRowCount)
+            rowCount = RelMdUtil.addEpsilon(rowCount);
+
         RelDistribution.Type type = distribution().getType();
 
-        if (type == BROADCAST_DISTRIBUTED || type == SINGLETON)
+        if (type == SINGLETON)
             rowCount = RelMdUtil.addEpsilon(rowCount);
 
+        if (type == BROADCAST_DISTRIBUTED)
+            rowCount = RelMdUtil.addEpsilon(RelMdUtil.addEpsilon(rowCount));
+
         return planner.getCostFactory().makeCost(rowCount, 0, 0);
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
index 4e16167..64662fe 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.ArrayList;
 import java.util.List;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTrait;
@@ -76,16 +77,7 @@ public class IgniteAggregate extends Aggregate implements 
TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) 
{
-        // Aggregate is rewindable if its input is rewindable.
-
-        RewindabilityTrait rewindability = 
TraitUtils.rewindability(nodeTraits);
-
-        return ImmutableList.of(Pair.of(nodeTraits, 
ImmutableList.of(inputTraits.get(0).replace(rewindability))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
         // Distribution propagation is based on next rules:
         // 1) Any aggregation is possible on single or broadcast distribution.
         // 2) hash-distributed aggregation is possible in case it's a simple 
aggregate having hash distributed input
@@ -94,8 +86,6 @@ public class IgniteAggregate extends Aggregate implements 
TraitsAwareIgniteRel {
 
         RelTraitSet in = inputTraits.get(0);
 
-        List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
-
         IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
 
         RelDistribution.Type distrType = distribution.getType();
@@ -103,19 +93,14 @@ public class IgniteAggregate extends Aggregate implements 
TraitsAwareIgniteRel {
         switch (distrType) {
             case SINGLETON:
             case BROADCAST_DISTRIBUTED:
-                res.add(Pair.of(nodeTraits, 
ImmutableList.of(in.replace(distribution))));
-
-                if (isSimple(this))
-                    res.add(Pair.of(nodeTraits, 
ImmutableList.of(in.replace(random())))); // Map-reduce aggregate
-
-                break;
+                return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(distribution)));
 
             case RANDOM_DISTRIBUTED:
                 if (!groupSet.isEmpty() && isSimple(this)) {
                     IgniteDistribution outDistr = hash(range(0, 
groupSet.cardinality()));
                     IgniteDistribution inDistr = hash(groupSet.asList());
 
-                    res.add(Pair.of(nodeTraits.replace(outDistr), 
ImmutableList.of(in.replace(inDistr))));
+                    return Pair.of(nodeTraits.replace(outDistr), 
ImmutableList.of(in.replace(inDistr)));
                 }
 
                 break;
@@ -138,11 +123,8 @@ public class IgniteAggregate extends Aggregate implements 
TraitsAwareIgniteRel {
                         srcKeys.add(src);
                     }
 
-                    if (srcKeys.size() == keys.size()) {
-                        res.add(Pair.of(nodeTraits, 
ImmutableList.of(in.replace(hash(srcKeys, distribution.function())))));
-
-                        break;
-                    }
+                    if (srcKeys.size() == keys.size())
+                        return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(hash(srcKeys, distribution.function()))));
                 }
 
                 break;
@@ -151,18 +133,14 @@ public class IgniteAggregate extends Aggregate implements 
TraitsAwareIgniteRel {
                 break;
         }
 
-        if (!res.isEmpty())
-            return res;
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(single()))));
+        return Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(single())));
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
         // Since it's a hash aggregate it erases collation.
-
-        return 
ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            
ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY))));
+        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+            ImmutableList.of(inputTraits.get(0).replace(RelCollations.EMPTY)));
     }
 
     /** {@inheritDoc} */
@@ -246,13 +224,6 @@ public class IgniteAggregate extends Aggregate implements 
TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCorrelation(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        return ImmutableList.of(Pair.of(nodeTraits,
-            
ImmutableList.of(inTraits.get(0).replace(TraitUtils.correlation(nodeTraits)))));
-    }
-
-    /** {@inheritDoc} */
     @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveCorrelation(RelTraitSet nodeTraits,
         List<RelTraitSet> inTraits) {
         return 
ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
index 7eaacf8..386e51c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.rel;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.calcite.plan.RelOptCluster;
@@ -113,27 +114,26 @@ public class IgniteCorrelatedNestedLoopJoin extends 
AbstractIgniteNestedLoopJoin
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
         // We preserve left edge collation only if batch size == 1
         if (variablesSet.size() == 1)
             return super.passThroughCollation(nodeTraits, inputTraits);
 
         RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
 
-        return 
ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            ImmutableList.of(left.replace(RelCollations.EMPTY), 
right.replace(RelCollations.EMPTY))));
+        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+            ImmutableList.of(left.replace(RelCollations.EMPTY), 
right.replace(RelCollations.EMPTY)));
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) 
{
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) 
{
         // Correlated nested loop requires rewindable right edge.
-
         RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
 
         RewindabilityTrait rewindability = 
TraitUtils.rewindability(nodeTraits);
 
-        return ImmutableList.of(Pair.of(nodeTraits.replace(rewindability),
-            ImmutableList.of(left.replace(rewindability), 
right.replace(RewindabilityTrait.REWINDABLE))));
+        return Pair.of(nodeTraits.replace(rewindability),
+            ImmutableList.of(left.replace(rewindability), 
right.replace(RewindabilityTrait.REWINDABLE)));
     }
 
     /** {@inheritDoc} */
@@ -143,19 +143,19 @@ public class IgniteCorrelatedNestedLoopJoin extends 
AbstractIgniteNestedLoopJoin
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCorrelation(RelTraitSet nodeTraits,
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCorrelation(RelTraitSet nodeTraits,
         List<RelTraitSet> inTraits) {
         CorrelationTrait nodeCorr = TraitUtils.correlation(nodeTraits);
 
         Set<CorrelationId> selfCorrIds = new 
HashSet<>(CorrelationTrait.correlations(variablesSet).correlationIds());
         selfCorrIds.addAll(nodeCorr.correlationIds());
 
-        return ImmutableList.of(Pair.of(nodeTraits,
+        return Pair.of(nodeTraits,
             ImmutableList.of(
                 inTraits.get(0).replace(nodeCorr),
                 
inTraits.get(1).replace(CorrelationTrait.correlations(selfCorrIds))
             )
-        ));
+        );
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
index 75dad60..9035416 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
@@ -28,6 +28,8 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 
 import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
 
@@ -71,7 +73,7 @@ public class IgniteExchange extends Exchange implements 
IgniteRel {
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
         double rowCount = mq.getRowCount(this);
-        double bytesPerRow = getRowType().getFieldCount() * 4;
+        double bytesPerRow = getRowType().getFieldCount() * 4 * 
(TraitUtils.distribution(this) == IgniteDistributions.broadcast() ? 10 : 1);
         return planner.getCostFactory().makeCost(rowCount * bytesPerRow, 
rowCount, 0);
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index ca101d4..4dc5e2d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -77,27 +77,6 @@ public class IgniteFilter extends Filter implements 
TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughRewindability(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        return ImmutableList.of(Pair.of(nodeTraits,
-            
ImmutableList.of(inTraits.get(0).replace(TraitUtils.rewindability(nodeTraits)))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughDistribution(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        return ImmutableList.of(Pair.of(nodeTraits,
-            
ImmutableList.of(inTraits.get(0).replace(TraitUtils.distribution(nodeTraits)))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCollation(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        return ImmutableList.of(Pair.of(nodeTraits,
-            
ImmutableList.of(inTraits.get(0).replace(TraitUtils.collation(nodeTraits)))));
-    }
-
-    /** {@inheritDoc} */
     @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveRewindability(RelTraitSet nodeTraits,
         List<RelTraitSet> inTraits) {
         if (!TraitUtils.rewindability(inTraits.get(0)).rewindable() && 
RexUtils.hasCorrelation(getCondition()))
@@ -122,15 +101,16 @@ public class IgniteFilter extends Filter implements 
TraitsAwareIgniteRel {
     }
 
     /** */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCorrelation(RelTraitSet nodeTraits,
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCorrelation(RelTraitSet nodeTraits,
         List<RelTraitSet> inTraits) {
         Set<CorrelationId> corrSet = 
RexUtils.extractCorrelationIds(getCondition());
 
-        if (corrSet.isEmpty() || 
TraitUtils.correlation(nodeTraits).correlationIds().containsAll(corrSet))
-            return ImmutableList.of(Pair.of(nodeTraits,
-                
ImmutableList.of(inTraits.get(0).replace(TraitUtils.correlation(nodeTraits)))));
+        CorrelationTrait correlation = TraitUtils.correlation(nodeTraits);
+
+        if (corrSet.isEmpty() || 
correlation.correlationIds().containsAll(corrSet))
+            return Pair.of(nodeTraits, ImmutableList.of(inTraits.get(0)));
 
-        return ImmutableList.of();
+        return null;
     }
 
     /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
index 975a7db..42e99fe 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
@@ -44,7 +44,12 @@ public class IgniteIndexScan extends AbstractIndexScan 
implements SourceAwareIgn
      */
     public IgniteIndexScan(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
-        sourceId = ((Number)input.get("sourceId")).longValue();
+
+        Object srcIdObj = input.get("sourceId");
+        if (srcIdObj != null)
+            sourceId = ((Number)srcIdObj).longValue();
+        else
+            sourceId = -1;
     }
 
     /**
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
index 4df1ce2..ad9bc2c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
@@ -293,7 +293,7 @@ public class IgniteMergeJoin extends Join implements 
TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCollation(
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(
         RelTraitSet nodeTraits,
         List<RelTraitSet> inputTraits
     ) {
@@ -361,34 +361,17 @@ public class IgniteMergeJoin extends Join implements 
TraitsAwareIgniteRel {
         RelCollation leftCollation = createCollation(newLeftCollation);
         RelCollation rightCollation = createCollation(newRightCollation);
 
-        return ImmutableList.of(
-            Pair.of(
-                nodeTraits.replace(preserveNodeCollation ? collation : 
leftCollation),
-                ImmutableList.of(
-                    left.replace(leftCollation),
-                    right.replace(rightCollation)
-                )
+        return Pair.of(
+            nodeTraits.replace(preserveNodeCollation ? collation : 
leftCollation),
+            ImmutableList.of(
+                left.replace(leftCollation),
+                right.replace(rightCollation)
             )
         );
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughRewindability(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        // The node is rewindable only if both sources are rewindable.
-
-        RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
-
-        RewindabilityTrait rewindability = 
TraitUtils.rewindability(nodeTraits);
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(rewindability),
-            ImmutableList.of(left.replace(rewindability), 
right.replace(rewindability))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughDistribution(
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughDistribution(
         RelTraitSet nodeTraits,
         List<RelTraitSet> inputTraits
     ) {
@@ -403,23 +386,14 @@ public class IgniteMergeJoin extends Join implements 
TraitsAwareIgniteRel {
 
         RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
 
-        List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
-
-        RelTraitSet outTraits, leftTraits, rightTraits;
-
         IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
 
         RelDistribution.Type distrType = distribution.getType();
         switch (distrType) {
             case BROADCAST_DISTRIBUTED:
             case SINGLETON:
-                outTraits = nodeTraits.replace(distribution);
-                leftTraits = left.replace(distribution);
-                rightTraits = right.replace(distribution);
+                return Pair.of(nodeTraits, Commons.transform(inputTraits, t -> 
t.replace(distribution)));
 
-                res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, 
rightTraits)));
-
-                break;
             case HASH_DISTRIBUTED:
             case RANDOM_DISTRIBUTED:
                 // Such join may be replaced as a cross join with a filter 
uppon it.
@@ -433,66 +407,18 @@ public class IgniteMergeJoin extends Join implements 
TraitsAwareIgniteRel {
                     ? distribution.function()
                     : DistributionFunction.hash();
 
-                IgniteDistribution outDistr; // TODO distribution multitrait 
support
-
-                outDistr = hash(joinInfo.leftKeys, function);
+                IgniteDistribution outDistr = hash(joinInfo.leftKeys, 
function);
 
                 if (distrType != HASH_DISTRIBUTED || 
outDistr.satisfies(distribution)) {
-                    outTraits = nodeTraits.replace(outDistr);
-                    leftTraits = left.replace(hash(joinInfo.leftKeys, 
function));
-                    rightTraits = right.replace(hash(joinInfo.rightKeys, 
function));
-
-                    res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, 
rightTraits)));
-
-                    if (joinType == INNER || joinType == LEFT) {
-                        outTraits = nodeTraits.replace(outDistr);
-                        leftTraits = left.replace(hash(joinInfo.leftKeys, 
function));
-                        rightTraits = right.replace(broadcast());
-
-                        res.add(Pair.of(outTraits, 
ImmutableList.of(leftTraits, rightTraits)));
-                    }
-                }
-
-                outDistr = hash(joinInfo.rightKeys, function);
-
-                if (distrType != HASH_DISTRIBUTED || 
outDistr.satisfies(distribution)) {
-                    outTraits = nodeTraits.replace(outDistr);
-                    leftTraits = left.replace(hash(joinInfo.leftKeys, 
function));
-                    rightTraits = right.replace(hash(joinInfo.rightKeys, 
function));
-
-                    res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, 
rightTraits)));
-
-                    if (joinType == INNER || joinType == RIGHT) {
-                        outTraits = nodeTraits.replace(outDistr);
-                        leftTraits = left.replace(broadcast());
-                        rightTraits = right.replace(hash(joinInfo.rightKeys, 
function));
-
-                        res.add(Pair.of(outTraits, 
ImmutableList.of(leftTraits, rightTraits)));
-                    }
+                    return Pair.of(nodeTraits.replace(outDistr),
+                        ImmutableList.of(left.replace(outDistr), 
right.replace(hash(joinInfo.rightKeys, function))));
                 }
 
-                break;
-
             default:
-                break;
+                // NO-OP
         }
 
-        if (!res.isEmpty())
-            return res;
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(single()),
-            ImmutableList.of(left.replace(single()), 
right.replace(single()))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCorrelation(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        return ImmutableList.of(Pair.of(nodeTraits,
-            ImmutableList.of(
-                inTraits.get(0).replace(TraitUtils.correlation(nodeTraits)),
-                inTraits.get(1).replace(TraitUtils.correlation(nodeTraits))
-            )
-        ));
+        return Pair.of(nodeTraits.replace(single()), 
Commons.transform(inputTraits, t -> t.replace(single())));
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteNestedLoopJoin.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteNestedLoopJoin.java
index 1dd9f35..fb204f8 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteNestedLoopJoin.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteNestedLoopJoin.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.rel;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.calcite.plan.RelOptCluster;
@@ -91,17 +92,6 @@ public class IgniteNestedLoopJoin extends 
AbstractIgniteNestedLoopJoin {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCorrelation(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        return ImmutableList.of(Pair.of(nodeTraits,
-            ImmutableList.of(
-                inTraits.get(0).replace(TraitUtils.correlation(nodeTraits)),
-                inTraits.get(1).replace(TraitUtils.correlation(nodeTraits))
-            )
-        ));
-    }
-
-    /** {@inheritDoc} */
     @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveCorrelation(RelTraitSet nodeTraits,
         List<RelTraitSet> inTraits) {
         // left correlations
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index 02e822f..b28048d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptCluster;
@@ -89,17 +90,7 @@ public class IgniteProject extends Project implements 
TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) 
{
-        // The node is rewindable if its input is rewindable.
-
-        RelTraitSet in = inputTraits.get(0);
-        RewindabilityTrait rewindability = 
TraitUtils.rewindability(nodeTraits);
-
-        return ImmutableList.of(Pair.of(nodeTraits, 
ImmutableList.of(in.replace(rewindability))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
         // All distribution types except hash distribution are propagated as 
is.
         // In case of hash distribution we need to project distribution keys.
         // In case one of distribution keys is erased by projection result 
distribution
@@ -109,7 +100,7 @@ public class IgniteProject extends Project implements 
TraitsAwareIgniteRel {
         IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
 
         if (distribution.getType() != HASH_DISTRIBUTED)
-            return ImmutableList.of(Pair.of(nodeTraits, 
ImmutableList.of(in.replace(distribution))));
+            return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(distribution)));
 
         Mappings.TargetMapping mapping = getPartialMapping(
             input.getRowType().getFieldCount(), getProjects());
@@ -127,13 +118,13 @@ public class IgniteProject extends Project implements 
TraitsAwareIgniteRel {
         }
 
         if (srcKeys.size() == keys.size())
-            return ImmutableList.of(Pair.of(nodeTraits, 
ImmutableList.of(in.replace(hash(srcKeys, distribution.function())))));
+            return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(hash(srcKeys, distribution.function()))));
 
-        return ImmutableList.of(Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(single()))));
+        return Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(single())));
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
         // The code below projects required collation. In case we cannot 
calculate required source collation
         // (e.g. one of required sorted fields is result of a function call), 
input and output collations are erased.
 
@@ -142,7 +133,7 @@ public class IgniteProject extends Project implements 
TraitsAwareIgniteRel {
         List<RelFieldCollation> fieldCollations = 
TraitUtils.collation(nodeTraits).getFieldCollations();
 
         if (fieldCollations.isEmpty())
-            return ImmutableList.of(Pair.of(nodeTraits, 
ImmutableList.of(in.replace(RelCollations.EMPTY))));
+            return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(RelCollations.EMPTY)));
 
         Map<Integer, Integer> targets = new HashMap<>();
         for (Ord<RexNode> project : Ord.zip(getProjects())) {
@@ -160,9 +151,9 @@ public class IgniteProject extends Project implements 
TraitsAwareIgniteRel {
         }
 
         if (inFieldCollations.size() == fieldCollations.size())
-            return ImmutableList.of(Pair.of(nodeTraits, 
ImmutableList.of(in.replace(RelCollations.of(inFieldCollations)))));
+            return Pair.of(nodeTraits, 
ImmutableList.of(in.replace(RelCollations.of(inFieldCollations))));
 
-        return 
ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY), 
ImmutableList.of(in.replace(RelCollations.EMPTY))));
+        return Pair.of(nodeTraits.replace(RelCollations.EMPTY), 
ImmutableList.of(in.replace(RelCollations.EMPTY)));
     }
 
     /** {@inheritDoc} */
@@ -196,16 +187,15 @@ public class IgniteProject extends Project implements 
TraitsAwareIgniteRel {
     }
 
     /** */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCorrelation(RelTraitSet nodeTraits,
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCorrelation(RelTraitSet nodeTraits,
         List<RelTraitSet> inTraits) {
         Set<CorrelationId> corrIds = 
RexUtils.extractCorrelationIds(getProjects());
         Set<CorrelationId> traitCorrIds = 
TraitUtils.correlation(nodeTraits).correlationIds();
 
         if (!traitCorrIds.containsAll(corrIds))
-            return ImmutableList.of();
+            return null;
 
-        return ImmutableList.of(Pair.of(nodeTraits,
-            
ImmutableList.of(inTraits.get(0).replace(TraitUtils.correlation(nodeTraits)))));
+        return Pair.of(nodeTraits, 
ImmutableList.of(inTraits.get(0).replace(TraitUtils.correlation(nodeTraits))));
     }
 
     /** */
@@ -215,8 +205,7 @@ public class IgniteProject extends Project implements 
TraitsAwareIgniteRel {
 
         
corrIds.addAll(TraitUtils.correlation(inTraits.get(0)).correlationIds());
 
-        return 
ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(corrIds)),
-            inTraits));
+        return 
ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(corrIds)),
 inTraits));
     }
 
     /** {@inheritDoc} */
@@ -226,6 +215,7 @@ public class IgniteProject extends Project implements 
TraitsAwareIgniteRel {
         return planner.getCostFactory().makeCost(rowCount, 0, 0);
     }
 
+    /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> 
inputs) {
         return new IgniteProject(cluster, getTraitSet(), sole(inputs), 
getProjects(), getRowType());
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
index 7c99c91..0668173 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
@@ -44,7 +44,12 @@ public class IgniteTableScan extends 
ProjectableFilterableTableScan implements S
      */
     public IgniteTableScan(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
-        sourceId = ((Number)input.get("sourceId")).longValue();
+
+        Object srcIdObj = input.get("sourceId");
+        if (srcIdObj != null)
+            sourceId = ((Number)srcIdObj).longValue();
+        else
+            sourceId = -1;
     }
 
     /**
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
index c96e3e2..e3a3b4d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
@@ -49,12 +49,10 @@ public class IgniteTableSpool extends Spool implements 
IgniteRel {
      * @param input Serialized representation.
      */
     public IgniteTableSpool(RelInput input) {
-        super(
+        this(
             changeTraits(input, IgniteConvention.INSTANCE).getCluster(),
             changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(),
-            changeTraits(input, IgniteConvention.INSTANCE).getInput(),
-            changeTraits(input, IgniteConvention.INSTANCE).getEnum("readType", 
Type.class),
-            changeTraits(input, 
IgniteConvention.INSTANCE).getEnum("writeType", Type.class)
+            changeTraits(input, IgniteConvention.INSTANCE).getInput()
         );
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteUnionAll.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteUnionAll.java
index 1c03e7f..e05f59b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteUnionAll.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteUnionAll.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.rel;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -67,31 +68,11 @@ public class IgniteUnionAll extends Union implements 
TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) 
{
-        // Union node requires the same traits from all its inputs.
-
-        RewindabilityTrait rewindability = 
TraitUtils.rewindability(nodeTraits);
-
-        return ImmutableList.of(Pair.of(nodeTraits,
-            Commons.transform(inputTraits, t -> t.replace(rewindability))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Union node requires the same traits from all its inputs.
-
-        IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
-
-        return ImmutableList.of(Pair.of(nodeTraits,
-            Commons.transform(inputTraits, t -> t.replace(distribution))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
         // Union node erases collation. TODO union all using merge sort 
algorythm
 
-        return 
ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            Commons.transform(inputTraits, t -> 
t.replace(RelCollations.EMPTY))));
+        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+            Commons.transform(inputTraits, t -> 
t.replace(RelCollations.EMPTY)));
     }
 
     /** {@inheritDoc} */
@@ -135,13 +116,6 @@ public class IgniteUnionAll extends Union implements 
TraitsAwareIgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCorrelation(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        return ImmutableList.of(Pair.of(nodeTraits,
-            Commons.transform(inTraits, t -> 
t.replace(TraitUtils.correlation(nodeTraits)))));
-    }
-
-    /** {@inheritDoc} */
     @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveCorrelation(RelTraitSet nodeTraits,
         List<RelTraitSet> inTraits) {
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/CorrelatedNestedLoopJoinRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/CorrelatedNestedLoopJoinRule.java
index 601a429..d4f2da0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/CorrelatedNestedLoopJoinRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/CorrelatedNestedLoopJoinRule.java
@@ -114,15 +114,14 @@ public class CorrelatedNestedLoopJoinRule extends 
ConverterRule {
             conditionList.add(condition2);
         }
 
-        RelTraitSet filterInTraits = rel.getRight().getTraitSet();
-        RelNode filterInput = convert(rel.getRight(), filterInTraits);
+        RelTraitSet filterInTraits = 
rel.getRight().getTraitSet().replace(RewindabilityTrait.REWINDABLE);
 
         // Push a filter with batchSize disjunctions
-        relBuilder.push(filterInput).filter(relBuilder.or(conditionList));
+        relBuilder.push(rel.getRight().copy(filterInTraits, 
rel.getRight().getInputs())).filter(relBuilder.or(conditionList));
         RelNode right = relBuilder.build();
 
         CorrelationTrait corrTrait = 
CorrelationTrait.correlations(correlationIds);
-        right = right.copy(right.getTraitSet().replace(corrTrait), 
right.getInputs());
+        right = right.copy(filterInTraits.replace(corrTrait), 
right.getInputs());
 
         JoinRelType joinType = rel.getJoinType();
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RelRegistrar.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RelRegistrar.java
deleted file mode 100644
index 5862614..0000000
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RelRegistrar.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.trait;
-
-import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.AbstractRelNode;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.Litmus;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
-import org.apache.ignite.internal.util.typedef.F;
-
-/** */
-public class RelRegistrar extends AbstractRelNode implements IgniteRel {
-    /** */
-    private final List<RelNode> rels;
-
-    /** */
-    private final RelNode orig;
-
-    /** */
-    public RelRegistrar(RelOptCluster cluster, RelTraitSet traitSet, RelNode 
orig, List<RelNode> rels) {
-        super(cluster, traitSet);
-
-        assert !F.isEmpty(rels);
-
-        this.rels = rels;
-        this.orig = orig;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected RelDataType deriveRowType() {
-        return orig.getRowType();
-    }
-
-    /** {@inheritDoc} */
-    @Override public RelWriter explainTerms(RelWriter pw) {
-        return pw.item("orig", orig).item("requiredTraits", getTraitSet());
-    }
-
-    /** {@inheritDoc} */
-    @Override public RelNode onRegister(RelOptPlanner planner) {
-        RelNode r = null;
-        for (RelNode rel : rels) {
-            assert RelOptUtil.equal("original row type",
-                orig.getRowType(),
-                "rowtype of registring rel",
-                rel.getRowType(),
-                Litmus.THROW);
-
-            r = planner.ensureRegistered(rel, orig);
-
-            assert r == rel || RelOptUtil.equal("rowtype of rel before 
registration",
-                rel.getRowType(),
-                "rowtype of rel after registration",
-                r.getRowType(),
-                Litmus.THROW);
-
-            assert r.isValid(Litmus.THROW, null);
-
-            if (!r.getTraitSet().satisfies(getTraitSet()))
-                RelOptRule.convert(r, getTraitSet()); // require traits 
enforcing
-        }
-
-        assert r != null;
-
-        return r;
-    }
-
-    /** {@inheritDoc} */
-    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-        assert inputs.isEmpty();
-        return new RelRegistrar(getCluster(), traitSet, orig, rels);
-    }
-
-    /** {@inheritDoc} */
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
-        return planner.getCostFactory().makeInfiniteCost();
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
-        throw new AssertionError();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> 
inputs) {
-        throw new AssertionError();
-    }
-}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 5273de7..9ea6bb1 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -54,6 +54,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchang
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.apache.calcite.plan.RelOptUtil.permutationPushDownProject;
 import static 
org.apache.calcite.rel.RelDistribution.Type.BROADCAST_DISTRIBUTED;
 import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
@@ -147,7 +149,6 @@ public class TraitUtils {
                 RelOptRule.convert(
                     rel,
                     rel.getTraitSet()
-                        .replace(any())
                         .replace(CorrelationTrait.UNCORRELATED)
                 ),
                 toTrait);
@@ -365,27 +366,23 @@ public class TraitUtils {
     }
 
     /** */
-    public static RelNode passThrough(TraitsAwareIgniteRel rel, RelTraitSet 
outTraits) {
-        if (outTraits.getConvention() != IgniteConvention.INSTANCE || 
rel.getInputs().isEmpty())
+    public static Pair<RelTraitSet, List<RelTraitSet>> 
passThrough(TraitsAwareIgniteRel rel, RelTraitSet requiredTraits) {
+        if (requiredTraits.getConvention() != IgniteConvention.INSTANCE || 
rel.getInputs().isEmpty())
             return null;
 
         List<RelTraitSet> inTraits = 
Collections.nCopies(rel.getInputs().size(),
             rel.getCluster().traitSetOf(IgniteConvention.INSTANCE));
 
-        List<RelNode> nodes = new 
PropagationContext(ImmutableSet.of(Pair.of(outTraits, inTraits)))
-            .propagate(rel::passThroughCollation)
-            .propagate(rel::passThroughDistribution)
-            .propagate(rel::passThroughRewindability)
-            .propagate(rel::passThroughCorrelation)
-            .nodes(rel::createNode);
-
-        if (nodes.isEmpty())
-            return null;
+        List<Pair<RelTraitSet, List<RelTraitSet>>> traits = new 
PropagationContext(ImmutableSet.of(Pair.of(requiredTraits, inTraits)))
+            .propagate((in, outs) -> 
singletonListFromNullable(rel.passThroughCollation(in, outs)))
+            .propagate((in, outs) -> 
singletonListFromNullable(rel.passThroughDistribution(in, outs)))
+            .propagate((in, outs) -> 
singletonListFromNullable(rel.passThroughRewindability(in, outs)))
+            .propagate((in, outs) -> 
singletonListFromNullable(rel.passThroughCorrelation(in, outs)))
+            .combinations();
 
-        if (nodes.size() == 1)
-            return F.first(nodes);
+        assert traits.size() <= 1;
 
-        return new RelRegistrar(rel.getCluster(), outTraits, rel, nodes);
+        return F.first(traits);
     }
 
     /** */
@@ -406,6 +403,13 @@ public class TraitUtils {
             .nodes(rel::createNode);
     }
 
+    /**
+     * @param elem Elem.
+     */
+    private static <T> List<T> singletonListFromNullable(@Nullable T elem) {
+        return elem == null ? emptyList() : singletonList(elem);
+    }
+
     /** */
     private static Set<Pair<RelTraitSet, List<RelTraitSet>>> 
combinations(RelTraitSet outTraits, List<List<RelTraitSet>> inTraits) {
         Set<Pair<RelTraitSet, List<RelTraitSet>>> out = new HashSet<>();
@@ -467,6 +471,11 @@ public class TraitUtils {
                 b.add(nodesCreator.create(variant.left, variant.right));
             return b.build();
         }
+
+        /** */
+        public List<Pair<RelTraitSet, List<RelTraitSet>>> combinations() {
+            return ImmutableList.copyOf(combinations);
+        }
     }
 
     /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsAwareIgniteRel.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsAwareIgniteRel.java
index 04f7b7e..d168b16 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsAwareIgniteRel.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsAwareIgniteRel.java
@@ -18,10 +18,12 @@
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
 import java.util.List;
+
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.DeriveMode;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
@@ -30,18 +32,13 @@ import 
org.apache.ignite.internal.processors.query.calcite.util.Commons;
 /** */
 public interface TraitsAwareIgniteRel extends IgniteRel {
     /** {@inheritDoc} */
-    @Override public default RelNode passThrough(RelTraitSet required) {
-        return TraitUtils.passThrough(this, required);
-    }
-
-    /** {@inheritDoc} */
     @Override public default List<RelNode> derive(List<List<RelTraitSet>> 
inTraits) {
         return TraitUtils.derive(this, inTraits);
     }
 
     /** {@inheritDoc} */
     @Override default Pair<RelTraitSet, List<RelTraitSet>> 
passThroughTraits(RelTraitSet required) {
-        throw new RuntimeException(getClass().getName() + 
"#passThroughTraits() is not implemented.");
+        return TraitUtils.passThrough(this, required);
     }
 
     /** {@inheritDoc} */
@@ -73,7 +70,11 @@ public interface TraitsAwareIgniteRel extends IgniteRel {
      * @param inTraits Relational node input traits.
      * @return List of possible input-output traits combinations.
      */
-    List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inTraits);
+    default Pair<RelTraitSet, List<RelTraitSet>> 
passThroughRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inTraits) {
+        RewindabilityTrait rewindability = 
TraitUtils.rewindability(nodeTraits);
+
+        return Pair.of(nodeTraits, Commons.transform(inTraits, t -> 
t.replace(rewindability)));
+    }
 
     /**
      * Propagates distribution trait in up-to-bottom manner.
@@ -82,7 +83,11 @@ public interface TraitsAwareIgniteRel extends IgniteRel {
      * @param inTraits Relational node input traits.
      * @return List of possible input-output traits combinations.
      */
-    List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inTraits);
+    default Pair<RelTraitSet, List<RelTraitSet>> 
passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inTraits) {
+        IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
+
+        return Pair.of(nodeTraits, Commons.transform(inTraits, t -> 
t.replace(distribution)));
+    }
 
     /**
      * Propagates collation trait in up-to-bottom manner.
@@ -91,7 +96,14 @@ public interface TraitsAwareIgniteRel extends IgniteRel {
      * @param inTraits Relational node input traits.
      * @return List of possible input-output traits combinations.
      */
-    List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inTraits);
+    default Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inTraits) {
+        if (inTraits.size() > 1)
+            throw new RuntimeException(getClass().getName() + 
"#passThroughCollation() is not implemented.");
+
+        RelCollation collation = TraitUtils.collation(nodeTraits);
+
+        return Pair.of(nodeTraits, Commons.transform(inTraits, t -> 
t.replace(collation)));
+    }
 
     /**
      * Propagates correlation trait in up-to-bottom manner.
@@ -100,7 +112,11 @@ public interface TraitsAwareIgniteRel extends IgniteRel {
      * @param inTraits Relational node input traits.
      * @return List of possible input-output traits combinations.
      */
-    List<Pair<RelTraitSet, List<RelTraitSet>>> 
passThroughCorrelation(RelTraitSet nodeTraits, List<RelTraitSet> inTraits);
+    default Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCorrelation(RelTraitSet nodeTraits, List<RelTraitSet> inTraits) {
+        CorrelationTrait correlation = TraitUtils.correlation(nodeTraits);
+
+        return Pair.of(nodeTraits, Commons.transform(inTraits, t -> 
t.replace(correlation)));
+    }
 
     /**
      * Propagates rewindability trait in bottom-up manner.
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteBasicSecondaryIndexIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteBasicSecondaryIndexIntegrationTest.java
index 52145ec..ea9c789 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteBasicSecondaryIndexIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteBasicSecondaryIndexIntegrationTest.java
@@ -111,7 +111,7 @@ public class CalciteBasicSecondaryIndexIntegrationTest 
extends GridCommonAbstrac
     @Test
     public void testIndexLoopJoin() {
         assertQuery("" +
-            "SELECT d1.name, d2.name FROM Developer d1, Developer d2 WHERE 
d1.id = d2.id")
+            "SELECT /*+ DISABLE_RULE('MergeJoinConverter', 
'NestedLoopJoinConverter') */ d1.name, d2.name FROM Developer d1, Developer d2 
WHERE d1.id = d2.id")
             .matches(containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
             .returns("Bach", "Bach")
             .returns("Beethoven", "Beethoven")
@@ -123,7 +123,7 @@ public class CalciteBasicSecondaryIndexIntegrationTest 
extends GridCommonAbstrac
     @Test
     public void testMergeJoin() {
         assertQuery("" +
-            "SELECT /*+ DISABLE_RULE('CorrelatedNestedLoopJoin') */ d1.name, 
d2.name FROM Developer d1, Developer d2 WHERE d1.depId = d2.depId")
+            "SELECT d1.name, d2.name FROM Developer d1, Developer d2 WHERE 
d1.depId = d2.depId")
             .matches(containsSubPlan("IgniteMergeJoin"))
             .returns("Bach", "Bach")
             .returns("Beethoven", "Beethoven")
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 922ef97..29753c0 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -18,7 +18,10 @@
 package org.apache.ignite.internal.processors.query.calcite;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import com.google.common.collect.ImmutableMap;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -71,6 +74,126 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
 
     /** */
     @Test
+    public void testCountWithJoin() throws Exception {
+        IgniteCache<Integer, RISK> RISK = ignite.getOrCreateCache(new 
CacheConfiguration<Integer, RISK>()
+            .setName("RISK")
+            .setSqlSchema("PUBLIC")
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, 
RISK.class).setTableName("RISK")))
+            .setBackups(1)
+        );
+
+        IgniteCache<Integer, TRADE> TRADE = ignite.getOrCreateCache(new 
CacheConfiguration<Integer, TRADE>()
+            .setName("TRADE")
+            .setSqlSchema("PUBLIC")
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, 
TRADE.class).setTableName("TRADE")))
+            .setBackups(1)
+        );
+
+        IgniteCache<Integer, BATCH> BATCH = ignite.getOrCreateCache(new 
CacheConfiguration<Integer, BATCH>()
+            .setName("BATCH")
+            .setSqlSchema("PUBLIC")
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, 
BATCH.class).setTableName("BATCH")))
+            .setCacheMode(CacheMode.REPLICATED)
+        );
+
+        Map<Integer, RISK> mRisk = new HashMap<>(65000);
+
+        for (int i = 0; i < 65000; i++)
+            mRisk.put(1, new RISK(i));
+
+        RISK.putAll(mRisk);
+
+        Map<Integer, TRADE> mTrade = new HashMap<>(200);
+
+        for (int i = 0; i < 200; i++)
+            mTrade.put(1, new TRADE(i));
+
+        TRADE.putAll(mTrade);
+
+        for (int i = 0; i < 80; i++)
+            BATCH.put(1, new BATCH(i));
+
+        awaitPartitionMapExchange(true, true, null);
+
+        QueryEngine engine = Commons.lookupComponent(grid(1).context(), 
QueryEngine.class);
+
+        List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
+            "SELECT count(*)" +
+                " FROM RISK R," +
+                " TRADE T," +
+                " BATCH B " +
+                "WHERE R.BATCHKEY = B.BATCHKEY " +
+                "AND R.TRADEID = T.TRADEID " +
+                "AND R.TRADEVER = T.TRADEVER " +
+                "AND T.BOOK = 'BOOK' " +
+                "AND B.IS = TRUE");
+
+        System.err.println(query.get(0).getAll());
+    }
+
+    /** */
+    public static class RISK {
+        /** */
+        @QuerySqlField
+        public Integer BATCHKEY;
+
+        /** */
+        @QuerySqlField
+        public Integer TRADEID;
+
+        /** */
+        @QuerySqlField
+        public Integer TRADEVER;
+
+        /** */
+        public RISK(Integer in) {
+            BATCHKEY = in;
+            TRADEID = in;
+            TRADEVER = in;
+        }
+    }
+
+    /** */
+    public static class TRADE {
+        /** */
+        @QuerySqlField
+        public Integer TRADEID;
+
+        /** */
+        @QuerySqlField
+        public Integer TRADEVER;
+
+        /** */
+        @QuerySqlField
+        public String BOOK;
+
+        /** */
+        public TRADE(Integer in) {
+            TRADEID = in;
+            TRADEVER = in;
+            BOOK = ThreadLocalRandom.current().nextBoolean() ? "BOOK" : "";
+        }
+    }
+
+    /** */
+    public static class BATCH {
+        /** */
+        @QuerySqlField
+        public Integer BATCHKEY;
+
+        /** */
+        @QuerySqlField
+        public Boolean IS;
+
+        /** */
+        public BATCH(Integer in) {
+            BATCHKEY = in;
+            IS = ThreadLocalRandom.current().nextBoolean();
+        }
+    }
+
+    /** */
+    @Test
     public void unionAll() throws Exception {
         IgniteCache<Integer, Employer> employer1 = ignite.getOrCreateCache(new 
CacheConfiguration<Integer, Employer>()
             .setName("employer1")
@@ -297,7 +420,6 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
 
     /** */
     @Test
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-13727";)
     public void testUnionWithDistinct() throws Exception {
         populateTables();
 
@@ -485,6 +607,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     }
 
     /** */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-13849";)
     @Test
     public void queryMultiStatement() throws Exception {
         IgniteCache<Integer, Developer> developer = 
grid(1).getOrCreateCache(new CacheConfiguration<Integer, Developer>()
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinExecutionTest.java
index 7adacf2..6ad9e42 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinExecutionTest.java
@@ -38,6 +38,7 @@ import static org.apache.calcite.rel.core.JoinRelType.INNER;
 import static org.apache.calcite.rel.core.JoinRelType.LEFT;
 import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
 import static org.apache.calcite.rel.core.JoinRelType.SEMI;
+import static org.hamcrest.core.IsEqual.equalTo;
 
 /** */
 @SuppressWarnings("TypeMayBeWeakened")
@@ -374,7 +375,7 @@ public class MergeJoinExecutionTest extends 
AbstractExecutionTest {
         while (node.hasNext())
             rows.add(node.next());
 
-        Assert.assertArrayEquals(expRes, rows.toArray(EMPTY));
+        Assert.assertThat(rows.toArray(EMPTY), equalTo(expRes));
     }
 
     /**

Reply via email to