This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new ecbb326  pending
ecbb326 is described below

commit ecbb326567bf7a0d53a5a6ad74f005ee0ed0c34e
Author: Igor Seliverstov <[email protected]>
AuthorDate: Fri Nov 22 15:51:19 2019 +0300

    pending
---
 .../query/calcite/cluster/RegistryImpl.java        |  75 ++++++++------
 .../calcite/metadata/IgniteMdFragmentInfo.java     |   2 +-
 .../query/calcite/metadata/NodesMapping.java       |  52 +++++-----
 .../query/calcite/prepare/IgnitePlanner.java       |  56 +++++++++--
 .../processors/query/calcite/rel/Receiver.java     |  35 ++++++-
 .../processors/query/calcite/rel/Sender.java       |  22 +++++
 .../query/calcite/serialize/ConversionContext.java |  92 ++++++++++++++++++
 ...Expression.java => DynamicParamExpression.java} |   8 +-
 .../{ExpressionType.java => ExpDataType.java}      |   6 +-
 .../query/calcite/serialize/ExpImplementor.java    |   2 +
 .../calcite/serialize/ExpToRexTranslator.java      |   4 +
 .../query/calcite/serialize/FilterNode.java        |  50 ++++++++++
 .../processors/query/calcite/serialize/Graph.java  |  52 ++++++----
 .../query/calcite/serialize/GraphNode.java         |   4 +-
 .../{GraphNode.java => GraphToRelConverter.java}   |   8 +-
 .../calcite/serialize/InputRefExpression.java      |   4 +-
 .../query/calcite/serialize/JoinNode.java          |  68 +++++++++++++
 .../query/calcite/serialize/LiteralExpression.java |   4 +-
 .../calcite/serialize/LocalRefExpression.java      |   4 +-
 .../query/calcite/serialize/ProjectNode.java       |  46 +++++++++
 .../query/calcite/serialize/ReceiverNode.java      |  46 +++++++++
 .../serialize/{GraphNode.java => RelGraph.java}    |   2 +-
 .../query/calcite/serialize/RelGraphNode.java      |  17 ++--
 .../calcite/serialize/RelToGraphConverter.java     |  95 ++++++++++--------
 .../calcite/serialize/RexToExpTranslator.java      |   2 +-
 .../query/calcite/serialize/SenderNode.java        |  21 ++--
 ...GraphNode.java => SerializedCorrelationId.java} |  26 +++--
 .../calcite/serialize/SerializedTraitSet.java      |  75 ++++++++++++++
 .../serialize/{FieldType.java => SimpleType.java}  |  10 +-
 .../query/calcite/serialize/StructType.java        |  12 +--
 .../{LiteralExpression.java => TableScanNode.java} |  22 +++--
 .../query/calcite/splitter/Fragment.java           |  32 ++++--
 .../query/calcite/trait/AllTargetsFactory.java     |  11 ++-
 .../query/calcite/trait/DestinationFunction.java   |   4 +-
 .../query/calcite/trait/DistributionTrait.java     |  35 +++++--
 .../query/calcite/trait/HashFunctionFactory.java   |  13 ++-
 .../query/calcite/trait/IgniteDistributions.java   |  15 ++-
 .../query/calcite/trait/NoOpFactory.java           |   7 +-
 .../query/calcite/trait/RandomTargetFactory.java   |  10 +-
 .../query/calcite/trait/SingleTargetFactory.java   |  13 ++-
 .../processors/query/calcite/util/Commons.java     | 108 +++++++--------------
 .../query/calcite/CalciteQueryProcessorTest.java   |  85 +++++++++++++++-
 42 files changed, 949 insertions(+), 306 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
index 95ad49f..8e5773b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.cluster;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.function.ToIntFunction;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
@@ -29,6 +30,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
@@ -37,6 +39,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunc
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.type.RowType;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -62,11 +65,13 @@ public class RegistryImpl implements DistributionRegistry, 
LocationRegistry {
     }
 
     @Override public NodesMapping local() {
-        return new 
NodesMapping(Collections.singletonList(ctx.discovery().localNode()), null, 
(byte) 0);
+        return new 
NodesMapping(Collections.singletonList(ctx.discovery().localNode().id()), null, 
(byte) 0);
     }
 
     @Override public NodesMapping random(AffinityTopologyVersion topVer) {
-        return new 
NodesMapping(ctx.discovery().discoCache(topVer).serverNodes(), null, (byte) 0);
+        List<ClusterNode> nodes = 
ctx.discovery().discoCache(topVer).serverNodes();
+
+        return new NodesMapping(Commons.transform(nodes, ClusterNode::id), 
null, (byte) 0);
     }
 
     @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
@@ -79,35 +84,35 @@ public class RegistryImpl implements DistributionRegistry, 
LocationRegistry {
         byte flags = NodesMapping.HAS_PARTITIONED_CACHES;
 
         List<List<ClusterNode>> assignments = 
cctx.affinity().assignments(topVer);
+        List<List<UUID>> res;
 
         if (cctx.config().getWriteSynchronizationMode() == 
CacheWriteSynchronizationMode.PRIMARY_SYNC) {
-            List<List<ClusterNode>> assignments0 = new 
ArrayList<>(assignments.size());
+            res = new ArrayList<>(assignments.size());
 
             for (List<ClusterNode> partNodes : assignments)
-                assignments0.add(F.isEmpty(partNodes) ? 
Collections.emptyList() : Collections.singletonList(F.first(partNodes)));
-
-            assignments = assignments0;
+                res.add(F.isEmpty(partNodes) ? Collections.emptyList() : 
Collections.singletonList(F.first(partNodes).id()));
         }
         else if (!cctx.topology().rebalanceFinished(topVer)) {
-            flags |= NodesMapping.HAS_MOVING_PARTITIONS;
+            res = new ArrayList<>(assignments.size());
 
-            List<List<ClusterNode>> assignments0 = new 
ArrayList<>(assignments.size());
+            flags |= NodesMapping.HAS_MOVING_PARTITIONS;
 
             for (int part = 0; part < assignments.size(); part++) {
-                List<ClusterNode> partNodes = assignments0.get(part), 
partNodes0 = new ArrayList<>(partNodes.size());
+                List<ClusterNode> partNodes = assignments.get(part);
+                List<UUID> partIds = new ArrayList<>(partNodes.size());
 
-                for (ClusterNode partNode : partNodes) {
-                    if (cctx.topology().partitionState(partNode.id(), part) == 
GridDhtPartitionState.OWNING)
-                        partNodes0.add(partNode);
+                for (ClusterNode node : partNodes) {
+                    if (cctx.topology().partitionState(node.id(), part) == 
GridDhtPartitionState.OWNING)
+                        partIds.add(node.id());
                 }
 
-                assignments0.add(partNodes0);
+                res.add(partIds);
             }
-
-            assignments = assignments0;
         }
+        else
+            res = Commons.transform(assignments, nodes -> 
Commons.transform(nodes, ClusterNode::id));
 
-        return new NodesMapping(null, assignments, flags);
+        return new NodesMapping(null, res, flags);
     }
 
     private NodesMapping replicatedLocation(GridCacheContext cctx, 
AffinityTopologyVersion topVer) {
@@ -116,32 +121,38 @@ public class RegistryImpl implements 
DistributionRegistry, LocationRegistry {
         if (cctx.config().getNodeFilter() != null)
             flags |= NodesMapping.PARTIALLY_REPLICATED;
 
+        GridDhtPartitionTopology topology = cctx.topology();
+
         List<ClusterNode> nodes = 
cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.cacheId());
+        List<UUID> res;
 
-        if (!cctx.topology().rebalanceFinished(topVer)) {
+        if (!topology.rebalanceFinished(topVer)) {
             flags |= NodesMapping.PARTIALLY_REPLICATED;
 
-            List<ClusterNode> nodes0 = new ArrayList<>(nodes.size());
+            res = new ArrayList<>(nodes.size());
 
-            int parts = cctx.topology().partitions();
+            int parts = topology.partitions();
 
-            parent:
             for (ClusterNode node : nodes) {
-                for (int part = 0; part < parts; part++) {
-                    if (cctx.topology().partitionState(node.id(), part) != 
GridDhtPartitionState.OWNING)
-                        continue parent;
-                }
-
-                nodes0.add(node);
+                if (isOwner(node.id(), topology, parts))
+                    res.add(node.id());
             }
-
-            nodes = nodes0;
         }
+        else
+            res = Commons.transform(nodes, ClusterNode::id);
 
-        return new NodesMapping(nodes, null, flags);
+        return new NodesMapping(res, null, flags);
+    }
+
+    private boolean isOwner(UUID nodeId, GridDhtPartitionTopology topology, 
int parts) {
+        for (int p = 0; p < parts; p++) {
+            if (topology.partitionState(nodeId, p) != 
GridDhtPartitionState.OWNING)
+                return false;
+        }
+        return true;
     }
 
-    private static class AffinityFactory extends 
AbstractDestinationFunctionFactory {
+    private final static class AffinityFactory extends 
AbstractDestinationFunctionFactory {
         private final int cacheId;
         private final Object key;
 
@@ -153,10 +164,10 @@ public class RegistryImpl implements 
DistributionRegistry, LocationRegistry {
         @Override public DestinationFunction create(Context ctx, NodesMapping 
mapping, ImmutableIntList keys) {
             assert keys.size() == 1 && mapping != null && 
!F.isEmpty(mapping.assignments());
 
-            List<List<ClusterNode>> assignments = mapping.assignments();
+            List<List<UUID>> assignments = mapping.assignments();
 
             if (U.assertionsEnabled()) {
-                for (List<ClusterNode> assignment : assignments) {
+                for (List<UUID> assignment : assignments) {
                     assert F.isEmpty(assignment) || assignment.size() == 1;
                 }
             }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
index a1f09a7..4a96cf3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
@@ -93,7 +93,7 @@ public class IgniteMdFragmentInfo implements 
MetadataHandler<FragmentMetadata> {
     }
 
     public FragmentInfo getFragmentInfo(Receiver rel, RelMetadataQuery mq) {
-        return new FragmentInfo(rel.source());
+        return new FragmentInfo(rel.sourceFragment());
     }
 
     public FragmentInfo getFragmentInfo(IgniteTableScan rel, RelMetadataQuery 
mq) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
index fdbed1d..16bafc0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
@@ -22,7 +22,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
-import org.apache.ignite.cluster.ClusterNode;
+import java.util.UUID;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.typedef.F;
@@ -38,21 +38,21 @@ public class NodesMapping implements Serializable {
     public static final byte PARTIALLY_REPLICATED = 1 << 3;
     public static final byte DEDUPLICATED = 1 << 4;
 
-    private final List<ClusterNode> nodes;
-    private final List<List<ClusterNode>> assignments;
+    private final List<UUID> nodes;
+    private final List<List<UUID>> assignments;
     private final byte flags;
 
-    public NodesMapping(List<ClusterNode> nodes, List<List<ClusterNode>> 
assignments, byte flags) {
+    public NodesMapping(List<UUID> nodes, List<List<UUID>> assignments, byte 
flags) {
         this.nodes = nodes;
         this.assignments = assignments;
         this.flags = flags;
     }
 
-    public List<ClusterNode> nodes() {
+    public List<UUID> nodes() {
         return nodes;
     }
 
-    public List<List<ClusterNode>> assignments() {
+    public List<List<UUID>> assignments() {
         return assignments;
     }
 
@@ -62,7 +62,7 @@ public class NodesMapping implements Serializable {
         if ((flags & PARTIALLY_REPLICATED) == 0)
             return new NodesMapping(U.firstNotNull(nodes, other.nodes), 
mergeAssignments(other, null), flags);
 
-        List<ClusterNode> nodes;
+        List<UUID> nodes;
 
         if (this.nodes == null)
             nodes = other.nodes;
@@ -81,11 +81,11 @@ public class NodesMapping implements Serializable {
         if (assignments == null || !excessive())
             return this;
 
-        HashSet<ClusterNode> nodes0 = new HashSet<>();
-        List<List<ClusterNode>> assignments0 = new 
ArrayList<>(assignments.size());
+        HashSet<UUID> nodes0 = new HashSet<>();
+        List<List<UUID>> assignments0 = new ArrayList<>(assignments.size());
 
-        for (List<ClusterNode> partNodes : assignments) {
-            ClusterNode node = F.first(partNodes);
+        for (List<UUID> partNodes : assignments) {
+            UUID node = F.first(partNodes);
 
             if (node == null)
                 throw new LocationMappingException("Failed to map fragment to 
location.");
@@ -97,14 +97,14 @@ public class NodesMapping implements Serializable {
         return new NodesMapping(new ArrayList<>(nodes0), assignments0, 
(byte)(flags | DEDUPLICATED));
     }
 
-    public int[] partitions(ClusterNode node) {
+    public int[] partitions(UUID node) {
         if (assignments == null)
             return null;
 
         GridIntList parts = new GridIntList(assignments.size());
 
         for (int i = 0; i < assignments.size(); i++) {
-            List<ClusterNode> assignment = assignments.get(i);
+            List<UUID> assignment = assignments.get(i);
             if (Objects.equals(node, F.first(assignment)))
                 parts.add(i);
         }
@@ -132,25 +132,25 @@ public class NodesMapping implements Serializable {
         return (flags & PARTIALLY_REPLICATED) == PARTIALLY_REPLICATED;
     }
 
-    private List<List<ClusterNode>> mergeAssignments(NodesMapping other, 
List<ClusterNode> nodes) throws LocationMappingException {
-        byte flags = (byte) (this.flags | other.flags); 
List<List<ClusterNode>> left = assignments, right = other.assignments;
+    private List<List<UUID>> mergeAssignments(NodesMapping other, List<UUID> 
nodes) throws LocationMappingException {
+        byte flags = (byte) (this.flags | other.flags); List<List<UUID>> left 
= assignments, right = other.assignments;
 
         if (left == null && right == null)
             return null; // nothing to intersect;
 
         if (left == null || right == null || (flags & HAS_MOVING_PARTITIONS) 
== 0) {
-            List<List<ClusterNode>> assignments = U.firstNotNull(left, right);
+            List<List<UUID>> assignments = U.firstNotNull(left, right);
 
             if (nodes == null || (flags & PARTIALLY_REPLICATED) == 0)
                 return assignments;
 
-            List<List<ClusterNode>> assignments0 = new 
ArrayList<>(assignments.size());
-            HashSet<ClusterNode> nodesSet = new HashSet<>(nodes);
+            List<List<UUID>> assignments0 = new 
ArrayList<>(assignments.size());
+            HashSet<UUID> nodesSet = new HashSet<>(nodes);
 
-            for (List<ClusterNode> partNodes : assignments) {
-                List<ClusterNode> partNodes0 = new 
ArrayList<>(partNodes.size());
+            for (List<UUID> partNodes : assignments) {
+                List<UUID> partNodes0 = new ArrayList<>(partNodes.size());
 
-                for (ClusterNode partNode : partNodes) {
+                for (UUID partNode : partNodes) {
                     if (nodesSet.contains(partNode))
                         partNodes0.add(partNode);
                 }
@@ -164,14 +164,14 @@ public class NodesMapping implements Serializable {
             return assignments0;
         }
 
-        List<List<ClusterNode>> assignments = new ArrayList<>(left.size());
-        HashSet<ClusterNode> nodesSet = nodes != null ? new HashSet<>(nodes) : 
null;
+        List<List<UUID>> assignments = new ArrayList<>(left.size());
+        HashSet<UUID> nodesSet = nodes != null ? new HashSet<>(nodes) : null;
 
         for (int i = 0; i < left.size(); i++) {
-            List<ClusterNode> leftNodes = left.get(i), partNodes = new 
ArrayList<>(leftNodes.size());
-            HashSet<ClusterNode> rightNodesSet = new HashSet<>(right.get(i));
+            List<UUID> leftNodes = left.get(i), partNodes = new 
ArrayList<>(leftNodes.size());
+            HashSet<UUID> rightNodesSet = new HashSet<>(right.get(i));
 
-            for (ClusterNode partNode : leftNodes) {
+            for (UUID partNode : leftNodes) {
                 if (rightNodesSet.contains(partNode) && (nodesSet == null || 
nodesSet.contains(partNode)))
                     partNodes.add(partNode);
             }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index e7f4bb8..c9f9f26 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -26,11 +26,13 @@ import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.config.CalciteConnectionConfigImpl;
 import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCostImpl;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
@@ -69,11 +71,18 @@ import org.apache.calcite.tools.RuleSet;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.calcite.util.Pair;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
 import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
+import 
org.apache.ignite.internal.processors.query.calcite.serialize.ConversionContext;
 import org.apache.ignite.internal.processors.query.calcite.serialize.Graph;
+import org.apache.ignite.internal.processors.query.calcite.serialize.RelGraph;
+import 
org.apache.ignite.internal.processors.query.calcite.serialize.RelGraphNode;
+import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
@@ -200,6 +209,28 @@ public class IgnitePlanner implements Planner, 
RelOptTable.ViewExpander {
         return rel(sql).rel;
     }
 
+    public RelNode convert(RelGraph graph) {
+        ready();
+
+        CalciteCatalogReader catalogReader = createCatalogReader();
+        RexBuilder rexBuilder = createRexBuilder();
+        RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
+        RelBuilder relBuilder = createRelBuilder(cluster, catalogReader);
+
+        ConversionContext ctx = new ConversionContext(this, relBuilder, 
operatorTable);
+
+        return F.first(convertRecursive(ctx, graph, graph.nodes().subList(0, 
1)));
+    }
+
+    private List<RelNode> convertRecursive(ConversionContext ctx, RelGraph 
graph, List<Ord<RelGraphNode>> src) {
+        ImmutableList.Builder<RelNode> b = ImmutableList.builder();
+
+        for (Ord<RelGraphNode> node : src)
+            b.add(node.e.toRel(ctx, convertRecursive(ctx, graph, 
graph.children(node.i))));
+
+        return b.build();
+    }
+
     /** {@inheritDoc} */
     @Override public RelRoot rel(SqlNode sql) {
         ready();
@@ -215,18 +246,21 @@ public class IgnitePlanner implements Planner, 
RelOptTable.ViewExpander {
             new SqlToRelConverter(this, validator, createCatalogReader(), 
cluster, convertletTable, config);
         RelRoot root = sqlToRelConverter.convertQuery(sql, false, true);
         root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
-        RelBuilder relBuilder = config.getRelBuilderFactory().create(cluster, 
null);
+        RelBuilder relBuilder = createRelBuilder(cluster, null);
         root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel, 
relBuilder));
         return root;
     }
 
-    public RelNode convert(Graph graph) {
+    public QueryPlan plan(RelNode rel) {
         ready();
 
-        return null; // TODO
+        if (rel.getConvention() != IgniteRel.IGNITE_CONVENTION)
+            throw new IllegalArgumentException("IGNITE_CONVENTION is 
required.");
+
+        return new Splitter().go((IgniteRel) rel);
     }
 
-    public Graph convert(RelNode node) {
+    public Graph graph(RelNode node) {
         ready();
 
         return null; // TODO
@@ -265,7 +299,7 @@ public class IgnitePlanner implements Planner, 
RelOptTable.ViewExpander {
 
         RelRoot root = sqlToRelConverter.convertQuery(sqlNode, true, false);
         RelRoot root2 = root.withRel(sqlToRelConverter.flattenTypes(root.rel, 
true));
-        RelBuilder relBuilder = config.getRelBuilderFactory().create(cluster, 
null);
+        RelBuilder relBuilder = createRelBuilder(cluster, null);
         return root2.withRel(RelDecorrelator.decorrelateQuery(root.rel, 
relBuilder));
     }
 
@@ -328,19 +362,23 @@ public class IgnitePlanner implements Planner, 
RelOptTable.ViewExpander {
         return typeFactory;
     }
 
-    public SqlConformance conformance() {
+    private SqlConformance conformance() {
         return connectionConfig.conformance();
     }
 
-    public SqlOperatorTable operatorTable() {
+    private SqlOperatorTable operatorTable() {
         return operatorTable;
     }
 
-    public RexBuilder createRexBuilder() {
+    private RexBuilder createRexBuilder() {
         return new RexBuilder(typeFactory);
     }
 
-    public CalciteCatalogReader createCatalogReader() {
+    private RelBuilder createRelBuilder(RelOptCluster cluster, RelOptSchema 
schema) {
+        return sqlToRelConverterConfig.getRelBuilderFactory().create(cluster, 
schema);
+    }
+
+    private CalciteCatalogReader createCatalogReader() {
         SchemaPlus rootSchema = rootSchema(defaultSchema);
 
         return new CalciteCatalogReader(
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
index 47cd875..cde4290 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
@@ -20,6 +20,7 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.AbstractRelNode;
 import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
 import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 
@@ -27,16 +28,31 @@ import 
org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
  *
  */
 public final class Receiver extends AbstractRelNode implements IgniteRel {
-    private final Fragment source;
+    private final Fragment sourceFragment;
+    private final NodesMapping sourceMapping;
 
     /**
      * @param cluster Cluster this relational expression belongs to
      * @param traits Trait set.
      */
-    public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType 
rowType, Fragment source) {
+    public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType 
rowType, Fragment sourceFragment) {
         super(cluster, traits);
         this.rowType = rowType;
-        this.source = source;
+        this.sourceFragment = sourceFragment;
+
+        sourceMapping = null;
+    }
+
+    /**
+     * @param cluster Cluster this relational expression belongs to
+     * @param traits Trait set.
+     */
+    public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType 
rowType, NodesMapping sourceMapping) {
+        super(cluster, traits);
+        this.rowType = rowType;
+        this.sourceMapping = sourceMapping;
+
+        sourceFragment = null;
     }
 
     /** {@inheritDoc} */
@@ -44,7 +60,16 @@ public final class Receiver extends AbstractRelNode 
implements IgniteRel {
         return implementor.implement(this);
     }
 
-    public Fragment source() {
-        return source;
+    public Fragment sourceFragment() {
+        return sourceFragment;
+    }
+
+    public NodesMapping sourceMapping() {
+        if (sourceFragment != null)
+            return sourceFragment.mapping();
+
+        assert sourceMapping != null;
+
+        return sourceMapping;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
index 8e7c956..aa279b2 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
@@ -20,9 +20,12 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 import org.jetbrains.annotations.NotNull;
 
@@ -47,6 +50,14 @@ public final class Sender extends SingleRel implements 
IgniteRel {
         this.targetDistr = targetDistr;
     }
 
+    private Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+        @NotNull DistributionTrait targetDistr, @NotNull NodesMapping 
targetMapping) {
+        super(cluster, traits, input);
+
+        this.targetDistr = targetDistr;
+        this.targetMapping = targetMapping;
+    }
+
     /** {@inheritDoc} */
     @Override public <T> T implement(RelImplementor<T> implementor) {
         return implementor.implement(this);
@@ -67,4 +78,15 @@ public final class Sender extends SingleRel implements 
IgniteRel {
     public DestinationFunction targetFunction(org.apache.calcite.plan.Context 
ctx) {
         return targetDistr.destinationFunctionFactory().create(ctx, 
targetMapping, targetDistr.keys());
     }
+
+    public static Sender create(RelNode input, DistributionTrait targetDistr, 
NodesMapping targetMapping) {
+        RelOptCluster cluster = input.getCluster();
+        RelMetadataQuery mq = cluster.getMetadataQuery();
+
+        RelTraitSet traits = cluster.traitSet()
+            .replace(IgniteRel.IGNITE_CONVENTION)
+            .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteMdDistribution.distribution(input, mq));
+
+        return new Sender(cluster, traits, input, targetDistr, targetMapping);
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ConversionContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ConversionContext.java
new file mode 100644
index 0000000..a2ed800
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ConversionContext.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.List;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+
+/**
+ *
+ */
+public class ConversionContext implements RelOptTable.ToRelContext {
+    private final IgnitePlanner planner;
+    private final SqlOperatorTable operatorTable;
+    private final RelBuilder relBuilder;
+    private final ExpToRexTranslator expTranslator;
+
+    public ConversionContext(IgnitePlanner planner, RelBuilder relBuilder, 
SqlOperatorTable operatorTable) {
+        this.planner = planner;
+        this.relBuilder = relBuilder;
+        this.operatorTable = operatorTable;
+
+        expTranslator = new ExpToRexTranslator(rexBuilder(), typeFactory(), 
operatorTable);
+    }
+
+    public IgnitePlanner planner() {
+        return planner;
+    }
+
+    public RelDataTypeFactory typeFactory() {
+        return cluster().getTypeFactory();
+    }
+
+    public SqlOperatorTable operatorTable() {
+        return operatorTable;
+    }
+
+    public RelOptSchema schema() {
+        return relBuilder().getRelOptSchema();
+    }
+
+    public RelOptCluster cluster() {
+        return relBuilder().getCluster();
+    }
+
+    public Context context() {
+        return cluster().getPlanner().getContext();
+    }
+
+    public RelBuilder relBuilder() {
+        return relBuilder;
+    }
+
+    public RexBuilder rexBuilder() {
+        return cluster().getRexBuilder();
+    }
+
+    public ExpToRexTranslator expressionTranslator() {
+        return expTranslator;
+    }
+
+    @Override public RelOptCluster getCluster() {
+        return cluster();
+    }
+
+    @Override public RelRoot expandView(RelDataType rowType, String 
queryString, List<String> schemaPath, List<String> viewPath) {
+        return planner.expandView(rowType, queryString, schemaPath, viewPath);
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/DynamicParamExpression.java
similarity index 82%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/DynamicParamExpression.java
index bc12e25..8019797 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/DynamicParamExpression.java
@@ -21,12 +21,12 @@ import org.apache.calcite.rel.type.RelDataType;
 /**
  *
  */
-public class InputRefExpression implements LogicalExpression {
-    public final ExpressionType type;
+public class DynamicParamExpression implements LogicalExpression {
+    public final ExpDataType type;
     public final int index;
 
-    public InputRefExpression(RelDataType type, int index) {
-        this.type = ExpressionType.fromType(type);
+    public DynamicParamExpression(RelDataType type, int index) {
+        this.type = ExpDataType.fromType(type);
         this.index = index;
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpressionType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpDataType.java
similarity index 83%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpressionType.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpDataType.java
index 05762d8..4294854 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpressionType.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpDataType.java
@@ -23,9 +23,9 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 /**
  *
  */
-public interface ExpressionType extends Serializable {
-    static ExpressionType fromType(RelDataType type) {
-        return type.isStruct() ? StructType.fromType(type) : 
FieldType.fromType(type);
+public interface ExpDataType extends Serializable {
+    static ExpDataType fromType(RelDataType type) {
+        return type.isStruct() ? StructType.fromType(type) : 
SimpleType.fromType(type);
     }
 
     RelDataType toRelDataType(RelDataTypeFactory factory);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java
index 1c08bd9..a76db0b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java
@@ -27,4 +27,6 @@ public interface ExpImplementor<T> {
     T implement(LiteralExpression literalExpression);
 
     T implement(LocalRefExpression localRefExpression);
+
+    T implement(DynamicParamExpression dynamicParamExpression);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java
index 820043f..9b09ef0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java
@@ -90,6 +90,10 @@ public class ExpToRexTranslator implements 
ExpImplementor<RexNode> {
         return new RexLocalRef(exp.index, exp.type.toRelDataType(typeFactory));
     }
 
+    @Override public RexNode implement(DynamicParamExpression exp) {
+        return builder.makeDynamicParam(exp.type.toRelDataType(typeFactory), 
exp.index);
+    }
+
     private SqlOperator op(String name, SqlSyntax syntax) {
         return ops.get(Pair.of(name, syntax));
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FilterNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FilterNode.java
new file mode 100644
index 0000000..4c87ccc
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FilterNode.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class FilterNode extends RelGraphNode {
+    private final int[] variables;
+    private final LogicalExpression condition;
+
+    private FilterNode(LogicalExpression condition, int[] variables) {
+        this.variables = variables;
+        this.condition = condition;
+    }
+
+    public static FilterNode create(IgniteFilter rel, RexToExpTranslator 
expTranslator) {
+        return new FilterNode(expTranslator.translate(rel.getCondition()),
+            
rel.getVariablesSet().stream().mapToInt(CorrelationId::getId).toArray());
+    }
+
+    @Override public RelNode toRel(ConversionContext ctx, List<RelNode> 
children) {
+        return IgniteFilter.create(
+            F.first(children),
+            condition.implement(ctx.expressionTranslator()),
+            
Arrays.stream(variables).mapToObj(CorrelationId::new).collect(Collectors.toSet()));
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
index 06b7b04..0427a5f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
@@ -19,16 +19,34 @@ package 
org.apache.ignite.internal.processors.query.calcite.serialize;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.GridIntList;
 
 /**
  *
  */
-public class Graph implements Serializable {
-    private final List<GraphNode> nodes = new ArrayList<>();
+public class Graph<T extends GraphNode> implements Serializable {
+    private final List<T> nodes = new ArrayList<>();
     private final List<GridIntList> edges = new ArrayList<>();
 
-    int addNode(GraphNode node) {
+    public List<Ord<T>> nodes() {
+        return Ord.zip(nodes);
+    }
+
+    public List<GridIntList> edges() {
+        return Commons.transform(edges, GridIntList::copy);
+    }
+
+    public int addNode(int parentId, T node) {
+        int id = addNode(node);
+
+        addEdge(parentId, id);
+
+        return id;
+    }
+
+    public int addNode(T node) {
         assert nodes.size() == edges.size();
 
         int id = nodes.size();
@@ -39,26 +57,22 @@ public class Graph implements Serializable {
         return id;
     }
 
-    void addEdge(int parentId, int childId) {
-        edges.get(parentId).add(childId);
-    }
-
-    int addChild(int parentId, GraphNode node) {
-        int id = addNode(node);
-
-        edges.get(parentId).add(id);
+    public void addEdge(int parentId, int childId) {
+        assert parentId == -1 || (parentId >= 0 && parentId < edges.size());
+        assert nodes.size() == edges.size();
 
-        return id;
+        if (parentId != -1)
+            edges.get(parentId).add(childId);
     }
 
-    List<GraphNode> children(int parentId) {
-        GridIntList childrenIds = edges.get(parentId);
-        ArrayList<GraphNode> children = new ArrayList<>(childrenIds.size());
+    public List<Ord<T>> children(int parentId) {
+        GridIntList children = edges.get(parentId);
+
+        ArrayList<Ord<T>> ords = new ArrayList<>(children.size());
 
-        for (int i = 0; i < childrenIds.size(); i++) {
-            children.add(nodes.get(i));
-        }
+        for (int i = 0; i < children.size(); i++)
+            ords.add(Ord.of(children.get(i), nodes.get(children.get(i))));
 
-        return children;
+        return ords;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
index f2f533a..82e3f22 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
@@ -16,8 +16,10 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
+import java.io.Serializable;
+
 /**
  *
  */
-public interface GraphNode {
+public interface GraphNode extends Serializable {
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphToRelConverter.java
similarity index 75%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphToRelConverter.java
index f2f533a..abf4cdf 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphToRelConverter.java
@@ -16,8 +16,14 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
+import org.apache.calcite.rel.RelNode;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+
 /**
  *
  */
-public interface GraphNode {
+public class GraphToRelConverter {
+    public RelNode convert(IgnitePlanner planner, Graph graph) {
+        return null;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
index bc12e25..9a52eb3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
@@ -22,11 +22,11 @@ import org.apache.calcite.rel.type.RelDataType;
  *
  */
 public class InputRefExpression implements LogicalExpression {
-    public final ExpressionType type;
+    public final ExpDataType type;
     public final int index;
 
     public InputRefExpression(RelDataType type, int index) {
-        this.type = ExpressionType.fromType(type);
+        this.type = ExpDataType.fromType(type);
         this.index = index;
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/JoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/JoinNode.java
new file mode 100644
index 0000000..f08a928
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/JoinNode.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+
+/**
+ *
+ */
+public class JoinNode extends RelGraphNode {
+    private final LogicalExpression condition;
+    private final int[] variables;
+    private final JoinRelType joinType;
+    private final boolean semiDone;
+
+    private JoinNode(RelTraitSet traits, LogicalExpression condition, int[] 
variables, JoinRelType joinType, boolean semiDone) {
+        super(traits);
+        this.condition = condition;
+        this.variables = variables;
+        this.joinType = joinType;
+        this.semiDone = semiDone;
+    }
+
+    public static JoinNode create(IgniteJoin rel, RexToExpTranslator 
expTranslator) {
+        return new JoinNode(rel.getTraitSet(),
+            expTranslator.translate(rel.getCondition()),
+            
rel.getVariablesSet().stream().mapToInt(CorrelationId::getId).toArray(),
+            rel.getJoinType(),
+            rel.isSemiJoinDone());
+    }
+
+    @Override public RelNode toRel(ConversionContext ctx, List<RelNode> 
children) {
+        assert children.size() == 2;
+
+        RelNode left = children.get(0);
+        RelNode right = children.get(1);
+
+        return new IgniteJoin(ctx.cluster(),
+            traitSet.toTraitSet(ctx.cluster()),
+            left,
+            right,
+            ctx.expressionTranslator().translate(condition),
+            
Arrays.stream(variables).mapToObj(CorrelationId::new).collect(Collectors.toSet()),
+            joinType,
+            semiDone);
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
index 3c95b96..2dba7d1 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
@@ -22,11 +22,11 @@ import org.apache.calcite.rel.type.RelDataType;
  *
  */
 public class LiteralExpression implements LogicalExpression {
-    public final ExpressionType type;
+    public final ExpDataType type;
     public final Comparable value;
 
     public LiteralExpression(RelDataType type, Comparable value) {
-        this.type = ExpressionType.fromType(type);
+        this.type = ExpDataType.fromType(type);
         this.value = value;
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
index 4f3ed54..bc5a77e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
@@ -22,11 +22,11 @@ import org.apache.calcite.rel.type.RelDataType;
  *
  */
 public class LocalRefExpression implements LogicalExpression {
-    public final ExpressionType type;
+    public final ExpDataType type;
     public final int index;
 
     public LocalRefExpression(RelDataType type, int index) {
-        this.type = ExpressionType.fromType(type);
+        this.type = ExpDataType.fromType(type);
         this.index = index;
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ProjectNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ProjectNode.java
new file mode 100644
index 0000000..07c2cf5
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ProjectNode.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.List;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class ProjectNode extends RelGraphNode {
+    private final List<LogicalExpression> projects;
+    private final ExpDataType dataType;
+
+    private ProjectNode(List<LogicalExpression> projects, ExpDataType 
dataType) {
+        this.projects = projects;
+        this.dataType = dataType;
+    }
+
+    public static ProjectNode create(IgniteProject rel, RexToExpTranslator 
rexTranslator) {
+        return new ProjectNode(rexTranslator.translate(rel.getProjects()),
+            ExpDataType.fromType(rel.getRowType()));
+    }
+
+    @Override public RelNode toRel(ConversionContext ctx, List<RelNode> 
children) {
+        return IgniteProject.create(F.first(children),
+            ctx.expressionTranslator().translate(projects),
+            dataType.toRelDataType(ctx.typeFactory()));
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverNode.java
new file mode 100644
index 0000000..93405cd
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverNode.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.List;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+
+
+/**
+ *
+ */
+public class ReceiverNode extends RelGraphNode {
+    private final ExpDataType dataType;
+    private final NodesMapping sourceMapping;
+
+    private ReceiverNode(RelTraitSet traits, ExpDataType dataType, 
NodesMapping sourceMapping) {
+        super(traits);
+        this.dataType = dataType;
+        this.sourceMapping = sourceMapping;
+    }
+
+    public static ReceiverNode create(Receiver rel) {
+        return new ReceiverNode(rel.getTraitSet(), 
ExpDataType.fromType(rel.getRowType()), rel.sourceMapping());
+    }
+
+    @Override public RelNode toRel(ConversionContext ctx, List<RelNode> 
children) {
+        return new Receiver(ctx.cluster(), traitSet.toTraitSet(ctx.cluster()), 
dataType.toRelDataType(ctx.typeFactory()), sourceMapping);
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraph.java
similarity index 93%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraph.java
index f2f533a..c00837a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraph.java
@@ -19,5 +19,5 @@ package 
org.apache.ignite.internal.processors.query.calcite.serialize;
 /**
  *
  */
-public interface GraphNode {
+public class RelGraph extends Graph<RelGraphNode> {
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
index 14a2019..164274f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
@@ -16,17 +16,22 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import java.io.Serializable;
-import org.apache.calcite.plan.RelTrait;
+import java.util.List;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
 
 /**
  *
  */
-public class RelGraphNode implements GraphNode, Serializable {
-    protected RelTrait[] traits;
+public abstract class RelGraphNode implements GraphNode {
+    protected SerializedTraitSet traitSet;
 
-    public RelGraphNode(RelTraitSet traits) {
-        this.traits = traits.toArray(new RelTrait[0]);
+    protected RelGraphNode() {
     }
+
+    protected RelGraphNode(RelTraitSet traits) {
+        traitSet = new SerializedTraitSet(traits);
+    }
+
+    public abstract RelNode toRel(ConversionContext ctx, List<RelNode> 
children);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
index 40a9158..7de26c7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
@@ -17,9 +17,9 @@
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
 import java.util.ArrayDeque;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
-import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
@@ -28,67 +28,80 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
+import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
  */
-public class RelToGraphConverter implements RelImplementor<Pair<Integer, 
List<IgniteRel>>> {
-    private Deque<Pair<Integer, List<IgniteRel>>> stack = new ArrayDeque<>();
-    private Graph graph;
-    private int parentId;
+public class RelToGraphConverter {
+    private final RexToExpTranslator rexTranslator = new RexToExpTranslator();
 
-    public Graph convert(IgniteRel root) {
-        stack = new ArrayDeque<>();
-        graph = new Graph();
-        parentId = -1;
+    private RelGraph graph;
+    private int curParent;
 
-        stack.push(root.implement(this));
+    private static final class Item {
+        final int parentId;
+        final List<IgniteRel> children;
 
-        while (!stack.isEmpty()) {
-            Pair<Integer, List<IgniteRel>> pair = stack.pop();
+        private Item(int parentId, List<IgniteRel> children) {
+            this.parentId = parentId;
+            this.children = children;
+        }
+    }
 
-            parentId = pair.left;
+    private final class Implementor implements RelImplementor<Item> {
+        @Override public Item implement(IgniteFilter rel) {
+            return new Item(graph.addNode(curParent, FilterNode.create(rel, 
rexTranslator)), Commons.cast(rel.getInputs()));
+        }
 
-            for (IgniteRel child : pair.right) {
-                stack.push(child.implement(this));
-            }
+        @Override public Item implement(IgniteJoin rel) {
+            return new Item(graph.addNode(curParent, JoinNode.create(rel, 
rexTranslator)), Commons.cast(rel.getInputs()));
         }
 
-        return graph;
-    }
+        @Override public Item implement(IgniteProject rel) {
+            return new Item(graph.addNode(curParent, ProjectNode.create(rel, 
rexTranslator)), Commons.cast(rel.getInputs()));
+        }
 
-    @Override public Pair<Integer, List<IgniteRel>> implement(IgniteFilter 
rel) {
-        return null;
-    }
+        @Override public Item implement(IgniteTableScan rel) {
+            return new Item(graph.addNode(curParent, 
TableScanNode.create(rel)), Commons.cast(rel.getInputs()));
+        }
 
-    @Override public Pair<Integer, List<IgniteRel>> implement(IgniteJoin rel) {
-        return null;
-    }
+        @Override public Item implement(Receiver rel) {
+            return new Item(graph.addNode(curParent, 
ReceiverNode.create(rel)), Collections.emptyList());
+        }
 
-    @Override public Pair<Integer, List<IgniteRel>> implement(IgniteProject 
rel) {
-        return null;
-    }
+        @Override public Item implement(Sender rel) {
+            return new Item(graph.addNode(curParent, SenderNode.create(rel)), 
Commons.cast(rel.getInputs()));
+        }
 
-    @Override public Pair<Integer, List<IgniteRel>> implement(IgniteTableScan 
rel) {
-        return null;
-    }
+        @Override public Item implement(IgniteExchange rel) {
+            throw new UnsupportedOperationException();
+        }
 
-    @Override public Pair<Integer, List<IgniteRel>> implement(Receiver rel) {
-        return null;
+        @Override public Item implement(IgniteRel other) {
+            throw new AssertionError();
+        }
     }
 
-    @Override public Pair<Integer, List<IgniteRel>> implement(Sender rel) {
-        assert parentId == -1;
+    public RelGraph convert(IgniteRel root) {
+        graph = new RelGraph();
 
-        return null;
-    }
+        Implementor implementor = new Implementor();
+        Deque<Item> stack = new ArrayDeque<>();
+        stack.push(new Item(-1, F.asList(root)));
 
-    @Override public Pair<Integer, List<IgniteRel>> implement(IgniteExchange 
rel) {
-        throw new UnsupportedOperationException();
-    }
+        while (!stack.isEmpty()) {
+            Item item = stack.pop();
+
+            curParent = item.parentId;
+
+            for (IgniteRel child : item.children) {
+                stack.push(child.implement(implementor));
+            }
+        }
 
-    @Override public Pair<Integer, List<IgniteRel>> implement(IgniteRel other) 
{
-        throw new AssertionError();
+        return graph;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
index dfb92d3..e2ff02a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
@@ -76,7 +76,7 @@ public class RexToExpTranslator implements 
RexVisitor<LogicalExpression> {
     }
 
     @Override public LogicalExpression visitDynamicParam(RexDynamicParam 
dynamicParam) {
-        throw new UnsupportedOperationException();
+        return new DynamicParamExpression(dynamicParam.getType(), 
dynamicParam.getIndex());
     }
 
     @Override public LogicalExpression visitRangeRef(RexRangeRef rangeRef) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java
index 41820d6..e23b705 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java
@@ -16,21 +16,30 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
+import java.util.List;
+import org.apache.calcite.rel.RelNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
  */
 public class SenderNode extends RelGraphNode {
-    private DistributionTrait targetDistr;
-    private NodesMapping targetMapping;
+    private final DistributionTrait targetDistr;
+    private final NodesMapping targetMapping;
 
-    public SenderNode(Sender sender) {
-        super(sender.getTraitSet());
+    private SenderNode(DistributionTrait targetDistr, NodesMapping 
targetMapping) {
+        this.targetDistr = targetDistr;
+        this.targetMapping = targetMapping;
+    }
+
+    public static SenderNode create(Sender rel) {
+        return new SenderNode(rel.targetDistribution(), rel.targetMapping());
+    }
 
-        targetDistr = sender.targetDistribution();
-        targetMapping = sender.targetMapping();
+    @Override public RelNode toRel(ConversionContext ctx, List<RelNode> 
children) {
+        return Sender.create(F.first(children), targetDistr, targetMapping);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializedCorrelationId.java
similarity index 61%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializedCorrelationId.java
index 14a2019..cc2c378 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializedCorrelationId.java
@@ -17,16 +17,30 @@
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
 import java.io.Serializable;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.CorrelationId;
 
 /**
  *
  */
-public class RelGraphNode implements GraphNode, Serializable {
-    protected RelTrait[] traits;
+public class SerializedCorrelationId implements Serializable {
+    private final int id;
+    private final String name;
 
-    public RelGraphNode(RelTraitSet traits) {
-        this.traits = traits.toArray(new RelTrait[0]);
+    public SerializedCorrelationId(int id, String name) {
+        this.id = id;
+        this.name = name;
+    }
+
+    public SerializedCorrelationId(CorrelationId corrId) {
+        id = corrId.getId();
+        name = corrId.getName();
+    }
+
+    public int id() {
+        return id;
+    }
+
+    public String name() {
+        return name;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializedTraitSet.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializedTraitSet.java
new file mode 100644
index 0000000..f35d933
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializedTraitSet.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+
+/**
+ *
+ */
+public class SerializedTraitSet implements Serializable {
+    private static final Byte IGNITE_CONVENTION = 0;
+
+    private final List<Serializable> traits;
+
+    public SerializedTraitSet(RelTraitSet traits) {
+        this.traits = translate(traits);
+    }
+
+    public RelTraitSet toTraitSet(RelOptCluster cluster) {
+        RelTraitSet traits = cluster.traitSet();
+
+        for (Serializable trait : this.traits) {
+            traits.replace(fromSerializable(trait));
+        }
+
+        return traits.simplify();
+    }
+
+    private List<Serializable> translate(List<RelTrait> traits) {
+        ArrayList<Serializable> res = new ArrayList<>(traits.size());
+        for (RelTrait trait : traits) {
+            res.add(toSerializable(trait));
+        }
+
+        return res;
+    }
+
+    private Serializable toSerializable(RelTrait trait) {
+        if (trait instanceof Serializable)
+            return (Serializable) trait;
+        if (trait == IgniteRel.IGNITE_CONVENTION)
+            return IGNITE_CONVENTION;
+
+        throw new AssertionError();
+    }
+
+    private RelTrait fromSerializable(Serializable trait) {
+        if (trait instanceof RelTrait)
+            return (RelTrait) trait;
+        if (IGNITE_CONVENTION.equals(trait))
+            return IgniteRel.IGNITE_CONVENTION;
+
+        throw new AssertionError();
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FieldType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SimpleType.java
similarity index 81%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FieldType.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SimpleType.java
index e1e1b3a..54b7fb5 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FieldType.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SimpleType.java
@@ -24,22 +24,22 @@ import org.apache.calcite.sql.type.SqlTypeName;
 /**
  *
  */
-public class FieldType implements ExpressionType {
+public class SimpleType implements ExpDataType {
     private final Class clazz;
     private final SqlTypeName typeName;
     private final int precision;
     private final int scale;
 
-    public static FieldType fromType(RelDataType type) {
+    public static SimpleType fromType(RelDataType type) {
         assert !type.isStruct();
 
         if (type instanceof RelDataTypeFactoryImpl.JavaType)
-            return new FieldType(((RelDataTypeFactoryImpl.JavaType) 
type).getJavaClass(), null, 0, 0);
+            return new SimpleType(((RelDataTypeFactoryImpl.JavaType) 
type).getJavaClass(), null, 0, 0);
 
-        return new FieldType(null, type.getSqlTypeName(), type.getPrecision(), 
type.getScale());
+        return new SimpleType(null, type.getSqlTypeName(), 
type.getPrecision(), type.getScale());
     }
 
-    private FieldType(Class clazz, SqlTypeName typeName, int precision, int 
scale) {
+    private SimpleType(Class clazz, SqlTypeName typeName, int precision, int 
scale) {
         this.clazz = clazz;
         this.typeName = typeName;
         this.precision = precision;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java
index 45d5e44..e72b9dc 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java
@@ -24,22 +24,22 @@ import org.apache.calcite.rel.type.RelDataTypeField;
 /**
  *
  */
-public class StructType implements ExpressionType {
-    private final LinkedHashMap<String, FieldType> fields;
+public class StructType implements ExpDataType {
+    private final LinkedHashMap<String, ExpDataType> fields;
 
-    public static StructType fromType(RelDataType type) {
+    static StructType fromType(RelDataType type) {
         assert type.isStruct();
 
-        LinkedHashMap<String, FieldType> fields = new LinkedHashMap<>();
+        LinkedHashMap<String, ExpDataType> fields = new LinkedHashMap<>();
 
         for (RelDataTypeField field : type.getFieldList()) {
-            fields.put(field.getName(), FieldType.fromType(field.getType()));
+            fields.put(field.getName(), ExpDataType.fromType(field.getType()));
         }
 
         return new StructType(fields);
     }
 
-    private StructType(LinkedHashMap<String, FieldType> fields) {
+    private StructType(LinkedHashMap<String, ExpDataType> fields) {
         this.fields = fields;
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableScanNode.java
similarity index 56%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableScanNode.java
index 3c95b96..3604f17 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableScanNode.java
@@ -16,21 +16,25 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import org.apache.calcite.rel.type.RelDataType;
+import java.util.List;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 
 /**
  *
  */
-public class LiteralExpression implements LogicalExpression {
-    public final ExpressionType type;
-    public final Comparable value;
+public class TableScanNode extends RelGraphNode {
+    private final List<String> tableName;
 
-    public LiteralExpression(RelDataType type, Comparable value) {
-        this.type = ExpressionType.fromType(type);
-        this.value = value;
+    private TableScanNode(List<String> tableName) {
+        this.tableName = tableName;
     }
 
-    @Override public <T> T implement(ExpImplementor<T> implementor) {
-        return implementor.implement(this);
+    public static TableScanNode create(IgniteTableScan rel) {
+        return new TableScanNode(rel.getTable().getQualifiedName());
+    }
+
+    @Override public RelNode toRel(ConversionContext ctx, List<RelNode> 
children) {
+        return ctx.schema().getTableForMember(tableName).toRel(ctx);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index 8e9d9ed..55264d6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.util.typedef.F;
  *
  */
 public class Fragment {
-    public final RelNode rel;
+    private final RelNode rel;
 
     private NodesMapping mapping;
     private ImmutableIntList localInputs;
@@ -49,14 +49,34 @@ public class Fragment {
         init(null, ctx, mq);
     }
 
-    public void init(Fragment parent, Context ctx, RelMetadataQuery mq) {
+    public RelNode root() {
+        return rel;
+    }
+
+    public NodesMapping mapping() {
+        return mapping;
+    }
+
+    public ImmutableIntList localInputs() {
+        return localInputs;
+    }
+
+    public ImmutableList<Fragment> remoteInputs() {
+        return remoteInputs;
+    }
+
+    public boolean isRemote() {
+        return rel instanceof Sender;
+    }
+
+    private void init(Fragment parent, Context ctx, RelMetadataQuery mq) {
         FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(rel, mq);
 
         remoteInputs = info.remoteInputs();
         localInputs = info.localInputs();
 
         if (info.mapping() == null)
-            mapping = remote() ? registry(ctx).random(topologyVersion(ctx)) : 
registry(ctx).local();
+            mapping = isRemote() ? registry(ctx).random(topologyVersion(ctx)) 
: registry(ctx).local();
         else {
             try {
                 mapping = info.mapping().deduplicate();
@@ -67,7 +87,7 @@ public class Fragment {
         }
 
         if (parent != null) {
-            assert remote();
+            assert isRemote();
 
             ((Sender)rel).init(parent.mapping);
         }
@@ -78,10 +98,6 @@ public class Fragment {
         }
     }
 
-    private boolean remote() {
-        return rel instanceof Sender;
-    }
-
     private LocationRegistry registry(Context ctx) {
         return ctx.unwrap(LocationRegistry.class);
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
index 535c3bb..7e69adf 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
@@ -16,20 +16,21 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.io.ObjectStreamException;
 import java.util.List;
+import java.util.UUID;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.cluster.ClusterNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 
 /**
  *
  */
-class AllTargetsFactory extends AbstractDestinationFunctionFactory {
+final class AllTargetsFactory extends AbstractDestinationFunctionFactory {
     static final DestinationFunctionFactory INSTANCE = new AllTargetsFactory();
 
     @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
-        List<ClusterNode> nodes = m.nodes();
+        List<UUID> nodes = m.nodes();
 
         return r -> nodes;
     }
@@ -37,4 +38,8 @@ class AllTargetsFactory extends 
AbstractDestinationFunctionFactory {
     @Override public Object key() {
         return "AllTargetsFactory";
     }
+
+    private Object readResolve() throws ObjectStreamException {
+        return INSTANCE;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
index 7577767..3d4dc41 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
@@ -17,11 +17,11 @@
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
 import java.util.List;
-import org.apache.ignite.cluster.ClusterNode;
+import java.util.UUID;
 
 /**
  *
  */
 public interface DestinationFunction {
-    List<ClusterNode> destination(Object row);
+    List<UUID> destination(Object row);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index 756bf6b..0fe775a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -16,8 +16,11 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamException;
 import java.io.Serializable;
-import java.util.Arrays;
 import java.util.Objects;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTrait;
@@ -29,13 +32,13 @@ import org.apache.calcite.util.ImmutableIntList;
  */
 public final class DistributionTrait implements RelTrait, Serializable {
     private DistributionType type;
-    private int[] keys;
+    private ImmutableIntList keys;
     private DestinationFunctionFactory functionFactory;
 
     public DistributionTrait() {
     }
 
-    public DistributionTrait(DistributionType type, int[] keys, 
DestinationFunctionFactory functionFactory) {
+    public DistributionTrait(DistributionType type, ImmutableIntList keys, 
DestinationFunctionFactory functionFactory) {
         this.type = type;
         this.keys = keys;
         this.functionFactory = functionFactory;
@@ -50,7 +53,7 @@ public final class DistributionTrait implements RelTrait, 
Serializable {
     }
 
     public ImmutableIntList keys() {
-        return ImmutableIntList.of(keys);
+        return keys;
     }
 
     @Override public void register(RelOptPlanner planner) {}
@@ -62,18 +65,18 @@ public final class DistributionTrait implements RelTrait, 
Serializable {
         if (o instanceof DistributionTrait) {
             DistributionTrait that = (DistributionTrait) o;
 
-            return type == that.type() && Arrays.equals(keys, that.keys);
+            return type == that.type() && Objects.equals(keys, that.keys);
         }
 
         return false;
     }
 
     @Override public int hashCode() {
-        return Objects.hash(type, Arrays.hashCode(keys));
+        return Objects.hash(type, keys);
     }
 
     @Override public String toString() {
-        return type + (type == DistributionType.HASH ? Arrays.toString(keys) : 
"");
+        return type + (type == DistributionType.HASH ? String.valueOf(keys) : 
"");
     }
 
     @Override public RelTraitDef getTraitDef() {
@@ -94,9 +97,25 @@ public final class DistributionTrait implements RelTrait, 
Serializable {
 
         if (type() == other.type())
             return type() != DistributionType.HASH
-                || (Arrays.equals(keys, other.keys)
+                || (Objects.equals(keys, other.keys)
                     && Objects.equals(functionFactory, other.functionFactory));
 
         return other.type() == DistributionType.RANDOM && type() == 
DistributionType.HASH;
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.writeObject(type);
+        out.writeObject(keys.toIntArray());
+        out.writeObject(functionFactory);
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        type = (DistributionType) in.readObject();
+        keys = ImmutableIntList.of((int[])in.readObject());
+        functionFactory = (DestinationFunctionFactory) in.readObject();
+    }
+
+    private Object readResolve() throws ObjectStreamException {
+        return DistributionTraitDef.INSTANCE.canonize(this);
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
index c6a3eb5..379a707 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
@@ -16,11 +16,12 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.io.ObjectStreamException;
 import java.util.List;
+import java.util.UUID;
 import java.util.function.ToIntFunction;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.cluster.ClusterNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -28,7 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 /**
  *
  */
-class HashFunctionFactory extends AbstractDestinationFunctionFactory {
+final class HashFunctionFactory extends AbstractDestinationFunctionFactory {
     static final DestinationFunctionFactory INSTANCE = new 
HashFunctionFactory();
 
     @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
@@ -50,10 +51,10 @@ class HashFunctionFactory extends 
AbstractDestinationFunctionFactory {
             return hash;
         };
 
-        List<List<ClusterNode>> assignments = m.assignments();
+        List<List<UUID>> assignments = m.assignments();
 
         if (U.assertionsEnabled()) {
-            for (List<ClusterNode> assignment : assignments) {
+            for (List<UUID> assignment : assignments) {
                 assert F.isEmpty(assignment) || assignment.size() == 1;
             }
         }
@@ -64,4 +65,8 @@ class HashFunctionFactory extends 
AbstractDestinationFunctionFactory {
     @Override public Object key() {
         return "HashFunctionFactory";
     }
+
+    private Object readResolve() throws ObjectStreamException {
+        return INSTANCE;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index a95734a..6a9e76d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -23,8 +23,8 @@ import java.util.List;
 import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableIntList;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
 
@@ -32,11 +32,10 @@ import static 
org.apache.ignite.internal.processors.query.calcite.trait.Distribu
  *
  */
 public class IgniteDistributions {
-    private static final int[] EMPTY_KEYS = new int[0];
-    private static final DistributionTrait BROADCAST = new 
DistributionTrait(DistributionType.BROADCAST, EMPTY_KEYS, 
AllTargetsFactory.INSTANCE);
-    private static final DistributionTrait SINGLE = new 
DistributionTrait(DistributionType.SINGLE, EMPTY_KEYS, 
SingleTargetFactory.INSTANCE);
-    private static final DistributionTrait RANDOM = new 
DistributionTrait(DistributionType.RANDOM, EMPTY_KEYS, 
RandomTargetFactory.INSTANCE);
-    private static final DistributionTrait ANY    = new 
DistributionTrait(DistributionType.ANY, EMPTY_KEYS, NoOpFactory.INSTANCE);
+    private static final DistributionTrait BROADCAST = new 
DistributionTrait(DistributionType.BROADCAST, ImmutableIntList.of(), 
AllTargetsFactory.INSTANCE);
+    private static final DistributionTrait SINGLE = new 
DistributionTrait(DistributionType.SINGLE, ImmutableIntList.of(), 
SingleTargetFactory.INSTANCE);
+    private static final DistributionTrait RANDOM = new 
DistributionTrait(DistributionType.RANDOM, ImmutableIntList.of(), 
RandomTargetFactory.INSTANCE);
+    private static final DistributionTrait ANY    = new 
DistributionTrait(DistributionType.ANY, ImmutableIntList.of(), 
NoOpFactory.INSTANCE);
 
     public static DistributionTrait any() {
         return ANY;
@@ -55,11 +54,11 @@ public class IgniteDistributions {
     }
 
     public static DistributionTrait hash(List<Integer> keys) {
-        return new DistributionTrait(HASH, U.toIntArray(keys), 
HashFunctionFactory.INSTANCE);
+        return new DistributionTrait(HASH, ImmutableIntList.copyOf(keys), 
HashFunctionFactory.INSTANCE);
     }
 
     public static DistributionTrait hash(List<Integer> keys, 
DestinationFunctionFactory factory) {
-        return new DistributionTrait(HASH, U.toIntArray(keys), factory);
+        return new DistributionTrait(HASH, ImmutableIntList.copyOf(keys), 
factory);
     }
 
     public static List<DistributionTrait> deriveDistributions(RelNode rel, 
RelMetadataQuery mq) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
index 5dff5a9..1988671 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
@@ -16,6 +16,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.io.ObjectStreamException;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
@@ -23,7 +24,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
 /**
  *
  */
-class NoOpFactory extends AbstractDestinationFunctionFactory {
+final class NoOpFactory extends AbstractDestinationFunctionFactory {
     static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
 
     @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
@@ -33,4 +34,8 @@ class NoOpFactory extends AbstractDestinationFunctionFactory {
     @Override public Object key() {
         return "NoOpFactory";
     }
+
+    private Object readResolve() throws ObjectStreamException {
+        return INSTANCE;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
index a4b27b7..8de55e5 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
@@ -16,22 +16,23 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.io.ObjectStreamException;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.cluster.ClusterNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 
 /**
  *
  */
-class RandomTargetFactory extends AbstractDestinationFunctionFactory {
+final class RandomTargetFactory extends AbstractDestinationFunctionFactory {
     static final DestinationFunctionFactory INSTANCE = new 
RandomTargetFactory();
 
     @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
-        List<ClusterNode> nodes = m.nodes();
+        List<UUID> nodes = m.nodes();
 
         return r -> 
Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())));
     }
@@ -40,4 +41,7 @@ class RandomTargetFactory extends 
AbstractDestinationFunctionFactory {
         return "RandomTargetFactory";
     }
 
+    private Object readResolve() throws ObjectStreamException {
+        return INSTANCE;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
index 4d21a60..1de22fd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
@@ -16,20 +16,23 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.io.ObjectStreamException;
+import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.cluster.ClusterNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
  */
-class SingleTargetFactory extends AbstractDestinationFunctionFactory {
+final class SingleTargetFactory extends AbstractDestinationFunctionFactory {
     static final DestinationFunctionFactory INSTANCE = new 
SingleTargetFactory();
 
     @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
-        List<ClusterNode> nodes = m.nodes().subList(0, 1);
+        List<UUID> nodes = Collections.singletonList(F.first(m.nodes()));
 
         return r -> nodes;
     }
@@ -37,4 +40,8 @@ class SingleTargetFactory extends 
AbstractDestinationFunctionFactory {
     @Override public Object key() {
         return "SingleTargetFactory";
     }
+
+    private Object readResolve() throws ObjectStreamException {
+        return INSTANCE;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index aab2302..22486e8 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.calcite.plan.Context;
@@ -44,7 +45,7 @@ import 
org.apache.ignite.internal.processors.query.QueryContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.type.RowType;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -138,97 +139,56 @@ public final class Commons {
         };
     }
 
-    public static int[] intersect(int[] left, int[] right) {
-        if (F.isEmpty(left) || F.isEmpty(right))
-            return EMPTY;
-
-        int[] res = null;
-
-        int i = 0, j = 0, k = 0, size = Math.min(left.length, right.length);
-
-        while (i < left.length && j < right.length) {
-            if (left[i] < right[j])
-                i++;
-            else if (right[j] < left[i])
-                j++;
-            else {
-                if (res == null)
-                    res = new int[size];
-
-                res[k++] = left[i];
-
-                i++;
-                j++;
-            }
-        }
-
-        if (k == 0)
-            return EMPTY;
-
-        return res.length == k ? res : Arrays.copyOf(res, k);
-    }
-
     public static <T> List<T> intersect(List<T> left, List<T> right) {
         if (F.isEmpty(left) || F.isEmpty(right))
             return Collections.emptyList();
-
-        HashSet<T> set = new HashSet<>(right);
-
-        return 
left.stream().filter(set::contains).collect(Collectors.toList());
+        else if (left.size() > right.size())
+            return intersect0(right, left);
+        else
+            return intersect0(left, right);
     }
 
-    public static <T> List<T> union(List<T> left, List<T> right) {
-        Set<T> set = U.newHashSet(left.size() + right.size());
+    public static <T> List<T> intersect0(List<T> left, List<T> right) {
+        List<T> res = new ArrayList<>(Math.min(left.size(), right.size()));
+        HashSet<T> set = new HashSet<>(left);
 
-        set.addAll(left);
-        set.addAll(right);
+        for (T t : right) {
+            if (set.contains(t))
+                res.add(t);
+        }
 
-        return new ArrayList<>(set);
+        return res;
     }
 
-    public static int[] union(int[] left, int[] right) {
-        if (F.isEmpty(left) && F.isEmpty(right))
-            return EMPTY;
-
-        int min = Math.min(left.length, right.length);
-        int max = left.length + right.length;
-        int expected = Math.max(min, (int) (max * 1.5));
-
-        int[] res = new int[U.ceilPow2(expected)];
-
-        int i = 0, j = 0, k = 0;
+    public static <T> List<T> concat(List<T> col, T... elements) {
+        ArrayList<T> res = new ArrayList<>(col.size() + elements.length);
 
-        while (i < left.length && j < right.length) {
-            res = ensureSize(res, k + 1);
+        res.addAll(col);
+        res.addAll(Arrays.asList(elements));
 
-            if (left[i] < right[j])
-                res[k++] = left[i++];
-            else if (right[j] < left[i])
-                res[k++] = right[j++];
-            else {
-                res[k++] = left[i];
+        return res;
+    }
 
-                i++;
-                j++;
-            }
-        }
+    @SuppressWarnings("unchecked")
+    public static <T> List<T> cast(List<?> src) {
+        return (List)src;
+    }
 
-        if (k == 0)
-            return EMPTY;
+    public static <T,R> List<R> transform(@NotNull List<T> src, @NotNull 
Function<T,R> mapFun) {
+        List<R> list = new ArrayList<>(src.size());
 
-        return res.length == k ? res : Arrays.copyOf(res, k);
-    }
+        for (T t : src)
+            list.add(mapFun.apply(t));
 
-    private static int[] ensureSize(int[] array, int size) {
-        return size < array.length ? array : Arrays.copyOf(array, 
U.ceilPow2(size));
+        return list;
     }
 
-    public static <T> List<T> concat(List<T> col, T... elements) {
-        ArrayList<T> res = new ArrayList<>(col.size() + elements.length);
+    public static <T,R> Set<R> transform(@NotNull Set<T> src, @NotNull 
Function<T,R> mapFun) {
+        Set<R> set = new HashSet<>(src.size());
 
-        res.addAll(col);
-        Collections.addAll(res, elements);
+        for (T t : src)
+            set.add(mapFun.apply(t));
 
-        return res;
+        return set;
     }
 }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index bc7d179..c6463dd 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -35,7 +35,6 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.BinaryConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
@@ -55,6 +54,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import 
org.apache.ignite.internal.processors.query.calcite.serialize.LogicalExpression;
+import org.apache.ignite.internal.processors.query.calcite.serialize.RelGraph;
+import 
org.apache.ignite.internal.processors.query.calcite.serialize.RelToGraphConverter;
 import 
org.apache.ignite.internal.processors.query.calcite.serialize.RexToExpTranslator;
 import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
@@ -66,10 +67,10 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.marshaller.MarshallerContextTestImpl;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.systemview.jmx.JmxSystemViewExporterSpi;
-import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.BeforeClass;
@@ -85,7 +86,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     private static SchemaPlus schema;
 
     private static TestRegistry registry;
-    private static List<ClusterNode> nodes;
+    private static List<UUID> nodes;
 
     @BeforeClass
     public static void setupClass() {
@@ -134,7 +135,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
         nodes = new ArrayList<>(4);
 
         for (int i = 0; i < 4; i++) {
-            nodes.add(new GridTestNode(UUID.randomUUID()));
+            nodes.add(UUID.randomUUID());
         }
 
         registry = new TestRegistry();
@@ -362,6 +363,82 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     }
 
     @Test
+    public void testPlanSerializationDeserialization() throws Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.id = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE
+        };
+
+        byte[] convertedBytes;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            RelNode rel = planner.convert(sqlNode);
+
+            // Transformation chain
+            rel = planner.transform(PlannerType.HEP, 
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteRel.IGNITE_CONVENTION)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, 
rel, desired);
+
+            assertNotNull(rel);
+
+            QueryPlan plan = planner.plan(rel);
+
+            assertNotNull(plan);
+
+            assertTrue(plan.fragments().size() == 2);
+
+            plan.init(ctx);
+
+            RelGraph graph = new RelToGraphConverter().convert((IgniteRel) 
plan.fragments().get(1).root());
+
+            convertedBytes = new JdkMarshaller().marshal(graph);
+
+            assertNotNull(convertedBytes);
+        }
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)) {
+            assertNotNull(planner);
+
+            RelGraph graph = new JdkMarshaller().unmarshal(convertedBytes, 
getClass().getClassLoader());
+
+            assertNotNull(graph);
+
+            RelNode rel = planner.convert(graph);
+
+            assertNotNull(rel);
+        }
+    }
+
+    @Test
     public void testSplitterCollocatedPartitionedPartitioned() throws 
Exception {
         String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +

Reply via email to