This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new c082b7b  pending
c082b7b is described below

commit c082b7b14dae76455ca40c8a279598f373d7410a
Author: Igor Seliverstov <gvvinbl...@gmail.com>
AuthorDate: Tue Nov 19 20:04:59 2019 +0300

    pending
---
 .../query/calcite/CalciteQueryProcessor.java       |  4 +-
 .../query/calcite/cluster/RegistryImpl.java        |  6 +-
 .../calcite/metadata/DistributionRegistry.java     |  2 +-
 .../query/calcite/metadata/NodesMapping.java       | 13 +--
 .../query/calcite/prepare/IgnitePlanner.java       | 21 +++--
 .../query/calcite/rel/IgniteExchange.java          |  4 +-
 .../processors/query/calcite/rel/IgniteFilter.java |  4 +-
 .../processors/query/calcite/rel/IgniteJoin.java   |  4 +-
 .../query/calcite/rel/IgniteProject.java           |  4 +-
 .../processors/query/calcite/rel/IgniteRel.java    |  4 +-
 .../query/calcite/rel/IgniteTableScan.java         |  4 +-
 .../processors/query/calcite/rel/Receiver.java     |  4 +-
 .../processors/query/calcite/rel/Sender.java       | 12 ++-
 .../query/calcite/rule/IgniteJoinRule.java         |  4 +-
 .../query/calcite/schema/IgniteTable.java          |  1 +
 .../query/calcite/serialize/CallExpression.java    | 29 +++----
 ...rializationContext.java => ExpImplementor.java} | 18 ++--
 .../calcite/serialize/ExpToRexTranslator.java      | 96 ++++++++++++++++++++++
 .../{Expression.java => ExpressionType.java}       | 13 ++-
 .../query/calcite/serialize/FieldType.java         | 59 +++++++++++++
 .../processors/query/calcite/serialize/Graph.java  |  3 +-
 .../calcite/serialize/InputRefExpression.java      | 14 ++--
 .../query/calcite/serialize/LiteralExpression.java | 14 ++--
 .../calcite/serialize/LocalRefExpression.java      | 15 ++--
 .../{Expression.java => LogicalExpression.java}    |  7 +-
 .../{Expression.java => RelGraphNode.java}         | 13 ++-
 .../calcite/serialize/RelToGraphConverter.java     | 53 +++++++-----
 .../calcite/serialize/RexToExpTranslator.java      | 52 ++++++------
 .../SenderNode.java}                               | 18 ++--
 .../query/calcite/serialize/StructType.java        | 51 ++++++++++++
 ...ava => AbstractDestinationFunctionFactory.java} | 32 +++-----
 ...FunctionFactory.java => AllTargetsFactory.java} | 16 +++-
 .../calcite/trait/DestinationFunctionFactory.java  |  7 +-
 .../query/calcite/trait/DistributionTrait.java     | 28 ++++---
 .../query/calcite/trait/DistributionType.java      |  4 +-
 .../query/calcite/trait/HashFunctionFactory.java   | 67 +++++++++++++++
 .../query/calcite/trait/IgniteDistributions.java   | 82 ++----------------
 ...nationFunctionFactory.java => NoOpFactory.java} | 12 ++-
 ...nctionFactory.java => RandomTargetFactory.java} | 19 ++++-
 ...nctionFactory.java => SingleTargetFactory.java} | 16 +++-
 .../IgniteTypeFactory.java}                        | 16 ++--
 .../IgniteTypeSystem.java}                         | 11 +--
 .../query/calcite/{schema => type}/RowType.java    |  2 +-
 .../processors/query/calcite/util/Commons.java     |  2 +-
 .../util/{Implementor.java => RelImplementor.java} |  2 +-
 .../query/calcite/CalciteQueryProcessorTest.java   | 69 ++++++++++++++--
 46 files changed, 623 insertions(+), 308 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index f188665..760a71c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -23,7 +23,6 @@ import org.apache.calcite.config.Lex;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.fun.SqlLibrary;
 import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory;
@@ -43,6 +42,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner
 import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.QueryExecution;
 import 
org.apache.ignite.internal.processors.query.calcite.schema.CalciteSchemaHolder;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import 
org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.resources.LoggerResource;
@@ -83,7 +83,7 @@ public class CalciteQueryProcessor implements QueryEngine {
             .context(Contexts.of(this))
             // Custom cost factory to use during optimization
             .costFactory(null)
-            .typeSystem(RelDataTypeSystem.DEFAULT)
+            .typeSystem(IgniteTypeSystem.DEFAULT)
             .build();
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
index 48c9618..95ad49f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
@@ -32,11 +32,11 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.AbstractDestinationFunctionFactory;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.RowType;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -141,7 +141,7 @@ public class RegistryImpl implements DistributionRegistry, 
LocationRegistry {
         return new NodesMapping(nodes, null, flags);
     }
 
-    private static class AffinityFactory implements DestinationFunctionFactory 
{
+    private static class AffinityFactory extends 
AbstractDestinationFunctionFactory {
         private final int cacheId;
         private final Object key;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
index 32cf357..3a20908 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
@@ -16,8 +16,8 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.type.RowType;
 
 /**
  *
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
index d40af2a..fdbed1d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
@@ -16,6 +16,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -30,12 +31,12 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 /**
  *
  */
-public class NodesMapping {
-    public static final byte HAS_MOVING_PARTITIONS = 0x1;
-    public static final byte HAS_REPLICATED_CACHES = 0x2;
-    public static final byte HAS_PARTITIONED_CACHES = 0x4;
-    public static final byte PARTIALLY_REPLICATED = 0x8;
-    public static final byte DEDUPLICATED = 0x16;
+public class NodesMapping implements Serializable {
+    public static final byte HAS_MOVING_PARTITIONS = 1;
+    public static final byte HAS_REPLICATED_CACHES = 1 << 1;
+    public static final byte HAS_PARTITIONED_CACHES = 1 << 2;
+    public static final byte PARTIALLY_REPLICATED = 1 << 3;
+    public static final byte DEDUPLICATED = 1 << 4;
 
     private final List<ClusterNode> nodes;
     private final List<List<ClusterNode>> assignments;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index 0f37a01..e7f4bb8 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -26,7 +26,6 @@ import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.config.CalciteConnectionConfigImpl;
 import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCostImpl;
@@ -73,6 +72,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetada
 import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
 import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
 import org.apache.ignite.internal.processors.query.calcite.serialize.Graph;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
 
 /**
  *
@@ -114,9 +115,9 @@ public class IgnitePlanner implements Planner, 
RelOptTable.ViewExpander {
         connectionConfig = connConfig();
 
         RelDataTypeSystem typeSystem = connectionConfig
-            .typeSystem(RelDataTypeSystem.class, RelDataTypeSystem.DEFAULT);
+            .typeSystem(RelDataTypeSystem.class, IgniteTypeSystem.DEFAULT);
 
-        typeFactory = new JavaTypeFactoryImpl(typeSystem);
+        typeFactory = new IgniteTypeFactory(typeSystem);
     }
 
     private CalciteConnectionConfig connConfig() {
@@ -157,7 +158,7 @@ public class IgnitePlanner implements Planner, 
RelOptTable.ViewExpander {
             planner.setExecutor(executor);
             metadataProvider = new 
CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER, planner);
 
-            validator = new IgniteSqlValidator(operatorTable, 
createCatalogReader(), typeFactory, conformance());
+            validator = new IgniteSqlValidator(operatorTable(), 
createCatalogReader(), typeFactory, conformance());
             validator.setIdentifierExpansion(true);
 
             for (RelTraitDef def : traitDefs) {
@@ -247,7 +248,7 @@ public class IgnitePlanner implements Planner, 
RelOptTable.ViewExpander {
         SqlConformance conformance = conformance();
         CalciteCatalogReader catalogReader =
             createCatalogReader().withSchemaPath(schemaPath);
-        SqlValidator validator = new IgniteSqlValidator(operatorTable, 
catalogReader, typeFactory, conformance);
+        SqlValidator validator = new IgniteSqlValidator(operatorTable(), 
catalogReader, typeFactory, conformance);
         validator.setIdentifierExpansion(true);
 
         RexBuilder rexBuilder = createRexBuilder();
@@ -327,15 +328,19 @@ public class IgnitePlanner implements Planner, 
RelOptTable.ViewExpander {
         return typeFactory;
     }
 
-    private SqlConformance conformance() {
+    public SqlConformance conformance() {
         return connectionConfig.conformance();
     }
 
-    private RexBuilder createRexBuilder() {
+    public SqlOperatorTable operatorTable() {
+        return operatorTable;
+    }
+
+    public RexBuilder createRexBuilder() {
         return new RexBuilder(typeFactory);
     }
 
-    private CalciteCatalogReader createCatalogReader() {
+    public CalciteCatalogReader createCatalogReader() {
         SchemaPlus rootSchema = rootSchema(defaultSchema);
 
         return new CalciteCatalogReader(
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
index 13f9b92..51be890 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
@@ -27,7 +27,7 @@ import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Util;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
+import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 
 /**
  *
@@ -56,7 +56,7 @@ public final class IgniteExchange extends SingleRel 
implements IgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T implement(Implementor<T> implementor) {
+    @Override public <T> T implement(RelImplementor<T> implementor) {
         return implementor.implement(this);
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index 9e261d4..5a96897 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -29,7 +29,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
+import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 
 public final class IgniteFilter extends Filter implements IgniteRel {
   private final Set<CorrelationId> variablesSet;
@@ -50,7 +50,7 @@ public final class IgniteFilter extends Filter implements 
IgniteRel {
   }
 
   /** {@inheritDoc} */
-  @Override public <T> T implement(Implementor<T> implementor) {
+  @Override public <T> T implement(RelImplementor<T> implementor) {
     return implementor.implement(this);
   }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
index ad60afd..e576897 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexNode;
-import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
+import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 
 public final class IgniteJoin extends Join implements IgniteRel {
   private final boolean semiJoinDone;
@@ -49,7 +49,7 @@ public final class IgniteJoin extends Join implements 
IgniteRel {
   }
 
   /** {@inheritDoc} */
-  @Override public <T> T implement(Implementor<T> implementor) {
+  @Override public <T> T implement(RelImplementor<T> implementor) {
     return implementor.implement(this);
   }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index 85c7029..fe0d55e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -28,7 +28,7 @@ import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
+import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 
 public final class IgniteProject extends Project implements IgniteRel {
   public IgniteProject(
@@ -46,7 +46,7 @@ public final class IgniteProject extends Project implements 
IgniteRel {
   }
 
   /** {@inheritDoc} */
-  @Override public <T> T implement(Implementor<T> implementor) {
+  @Override public <T> T implement(RelImplementor<T> implementor) {
     return implementor.implement(this);
   }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
index 211dc20..dbfbb3f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
@@ -20,7 +20,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.rel;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
-import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
+import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 
 /**
  *
@@ -34,5 +34,5 @@ public interface IgniteRel extends RelNode {
         }
     };
 
-    <T> T implement(Implementor<T> implementor);
+    <T> T implement(RelImplementor<T> implementor);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
index 536290d..12b7b99 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
@@ -24,7 +24,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
+import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 
 public final class IgniteTableScan extends TableScan implements IgniteRel {
   public IgniteTableScan(RelOptCluster cluster, RelTraitSet traitSet, 
RelOptTable table) {
@@ -38,7 +38,7 @@ public final class IgniteTableScan extends TableScan 
implements IgniteRel {
   }
 
   /** {@inheritDoc} */
-  @Override public <T> T implement(Implementor<T> implementor) {
+  @Override public <T> T implement(RelImplementor<T> implementor) {
     return implementor.implement(this);
   }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
index 01a3dd7..47cd875 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
@@ -21,7 +21,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.AbstractRelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
-import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
+import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 
 /**
  *
@@ -40,7 +40,7 @@ public final class Receiver extends AbstractRelNode 
implements IgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T implement(Implementor<T> implementor) {
+    @Override public <T> T implement(RelImplementor<T> implementor) {
         return implementor.implement(this);
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
index 98a70d7..8e7c956 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.SingleRel;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
+import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -48,7 +48,7 @@ public final class Sender extends SingleRel implements 
IgniteRel {
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T implement(Implementor<T> implementor) {
+    @Override public <T> T implement(RelImplementor<T> implementor) {
         return implementor.implement(this);
     }
 
@@ -56,6 +56,14 @@ public final class Sender extends SingleRel implements 
IgniteRel {
         targetMapping = mapping;
     }
 
+    public DistributionTrait targetDistribution() {
+        return targetDistr;
+    }
+
+    public NodesMapping targetMapping() {
+        return targetMapping;
+    }
+
     public DestinationFunction targetFunction(org.apache.calcite.plan.Context 
ctx) {
         return targetDistr.destinationFunctionFactory().create(ctx, 
targetMapping, targetDistr.keys());
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java
index 9bd8527ee..3f380d3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java
@@ -74,10 +74,10 @@ public class IgniteJoinRule extends RelOptRule {
         }
 
         List<DistributionTrait> leftDists = Commons.concat(leftDerived,
-            IgniteDistributions.hash(join.analyzeCondition().leftKeys, 
IgniteDistributions.hashFunction()));
+            IgniteDistributions.hash(join.analyzeCondition().leftKeys));
 
         List<DistributionTrait> rightDists = Commons.concat(rightDerived,
-            IgniteDistributions.hash(join.analyzeCondition().rightKeys, 
IgniteDistributions.hashFunction()));
+            IgniteDistributions.hash(join.analyzeCondition().rightKeys));
 
         for (DistributionTrait leftDist0 : leftDists) {
             for (DistributionTrait rightDist0 : rightDists) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index d298e40..25d1931 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.type.RowType;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
 /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/CallExpression.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/CallExpression.java
index f3e5bb9..384256a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/CallExpression.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/CallExpression.java
@@ -16,34 +16,25 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import java.util.ArrayList;
 import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSyntax;
 
 /**
  *
  */
-public class CallExpression implements Expression {
-    private final RelDataType type;
-    private final SqlOperator op;
-    private final List<Expression> operands;
+public class CallExpression implements LogicalExpression {
+    public final String opName;
+    public final SqlSyntax opSyntax;
+    public final List<LogicalExpression> operands;
 
-    public CallExpression(RelDataType type, SqlOperator op, List<Expression> 
operands) {
-        this.type = type;
-        this.op = op;
+    public CallExpression(SqlOperator op, List<LogicalExpression> operands) {
         this.operands = operands;
+        opName = op.getName();
+        opSyntax = op.getSyntax();
     }
 
-    @Override public RexNode toRex(RexBuilder builder) {
-        ArrayList<RexNode> operands0 = new ArrayList<>(operands.size());
-
-        for (Expression operand : operands) {
-            operands0.add(operand.toRex(builder));
-        }
-
-        return builder.makeCall(type, op, operands0);
+    @Override public <T> T implement(ExpImplementor<T> implementor) {
+        return implementor.implement(this);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializationContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java
similarity index 69%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializationContext.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java
index d2fcb32..1c08bd9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializationContext.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java
@@ -16,17 +16,15 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptSchema;
-import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
-
 /**
  *
  */
-class SerializationContext {
-    RelOptCluster cluster;
-    RelOptSchema schema;
-    IgnitePlanner planner;
-    Context ctx;
+public interface ExpImplementor<T> {
+    T implement(CallExpression callExpression);
+
+    T implement(InputRefExpression inputRefExpression);
+
+    T implement(LiteralExpression literalExpression);
+
+    T implement(LocalRefExpression localRefExpression);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java
new file mode 100644
index 0000000..820043f
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class ExpToRexTranslator implements ExpImplementor<RexNode> {
+    private final RexBuilder builder;
+    private final RelDataTypeFactory typeFactory;
+    private final Map<Pair<String, SqlSyntax>, SqlOperator> ops;
+
+    public ExpToRexTranslator(RexBuilder builder, RelDataTypeFactory 
typeFactory, SqlOperatorTable opTable) {
+        this.builder = builder;
+        this.typeFactory = typeFactory;
+
+        List<SqlOperator> opList = opTable.getOperatorList();
+
+        HashMap<Pair<String, SqlSyntax>, SqlOperator> ops = new 
HashMap<>(opList.size());
+
+        for (SqlOperator op : opList) {
+            ops.put(Pair.of(op.getName(), op.getSyntax()), op);
+        }
+
+        this.ops = ops;
+    }
+
+    public List<RexNode> translate(List<LogicalExpression> exps) {
+        if (F.isEmpty(exps))
+            return Collections.emptyList();
+
+        if (exps.size() == 1)
+            return F.asList(translate(F.first(exps)));
+
+        List<RexNode> res = new ArrayList<>(exps.size());
+
+        for (LogicalExpression exp : exps) {
+            res.add(exp.implement(this));
+        }
+
+        return res;
+    }
+
+    public RexNode translate(LogicalExpression exp) {
+        return exp.implement(this);
+    }
+
+    @Override public RexNode implement(CallExpression exp) {
+        return builder.makeCall(op(exp.opName, exp.opSyntax), 
translate(exp.operands));
+    }
+
+    @Override public RexNode implement(InputRefExpression exp) {
+        return builder.makeInputRef(exp.type.toRelDataType(typeFactory), 
exp.index);
+    }
+
+    @Override public RexNode implement(LiteralExpression exp) {
+        return builder.makeLiteral(exp.value, 
exp.type.toRelDataType(typeFactory), false);
+    }
+
+    @Override public RexNode implement(LocalRefExpression exp) {
+        return new RexLocalRef(exp.index, exp.type.toRelDataType(typeFactory));
+    }
+
+    private SqlOperator op(String name, SqlSyntax syntax) {
+        return ops.get(Pair.of(name, syntax));
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpressionType.java
similarity index 66%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpressionType.java
index abe2b36..05762d8 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpressionType.java
@@ -16,12 +16,17 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
+import java.io.Serializable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
 
 /**
  *
  */
-public interface Expression {
-    RexNode toRex(RexBuilder builder);
+public interface ExpressionType extends Serializable {
+    static ExpressionType fromType(RelDataType type) {
+        return type.isStruct() ? StructType.fromType(type) : 
FieldType.fromType(type);
+    }
+
+    RelDataType toRelDataType(RelDataTypeFactory factory);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FieldType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FieldType.java
new file mode 100644
index 0000000..e1e1b3a
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FieldType.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ *
+ */
+public class FieldType implements ExpressionType {
+    private final Class clazz;
+    private final SqlTypeName typeName;
+    private final int precision;
+    private final int scale;
+
+    public static FieldType fromType(RelDataType type) {
+        assert !type.isStruct();
+
+        if (type instanceof RelDataTypeFactoryImpl.JavaType)
+            return new FieldType(((RelDataTypeFactoryImpl.JavaType) 
type).getJavaClass(), null, 0, 0);
+
+        return new FieldType(null, type.getSqlTypeName(), type.getPrecision(), 
type.getScale());
+    }
+
+    private FieldType(Class clazz, SqlTypeName typeName, int precision, int 
scale) {
+        this.clazz = clazz;
+        this.typeName = typeName;
+        this.precision = precision;
+        this.scale = scale;
+    }
+
+    @Override public RelDataType toRelDataType(RelDataTypeFactory factory) {
+        if (clazz != null)
+            return factory.createJavaType(clazz);
+        if (typeName.allowsNoPrecNoScale())
+            return factory.createSqlType(typeName);
+        if (typeName.allowsPrecNoScale())
+            return factory.createSqlType(typeName, precision);
+
+        return factory.createSqlType(typeName, precision, scale);
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
index 54bc932..06b7b04 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
@@ -16,6 +16,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.ignite.internal.util.GridIntList;
@@ -23,7 +24,7 @@ import org.apache.ignite.internal.util.GridIntList;
 /**
  *
  */
-public class Graph {
+public class Graph implements Serializable {
     private final List<GraphNode> nodes = new ArrayList<>();
     private final List<GridIntList> edges = new ArrayList<>();
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
index 421c4bb..bc12e25 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
@@ -17,22 +17,20 @@
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
 
 /**
  *
  */
-public class InputRefExpression implements Expression {
-    private final RelDataType type;
-    private final int index;
+public class InputRefExpression implements LogicalExpression {
+    public final ExpressionType type;
+    public final int index;
 
     public InputRefExpression(RelDataType type, int index) {
-        this.type = type;
+        this.type = ExpressionType.fromType(type);
         this.index = index;
     }
 
-    @Override public RexNode toRex(RexBuilder builder) {
-        return builder.makeInputRef(type, index);
+    @Override public <T> T implement(ExpImplementor<T> implementor) {
+        return implementor.implement(this);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
index 923fff4..3c95b96 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
@@ -17,22 +17,20 @@
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
 
 /**
  *
  */
-public class LiteralExpression implements Expression {
-    private final Comparable value;
-    private final RelDataType type;
+public class LiteralExpression implements LogicalExpression {
+    public final ExpressionType type;
+    public final Comparable value;
 
     public LiteralExpression(RelDataType type, Comparable value) {
+        this.type = ExpressionType.fromType(type);
         this.value = value;
-        this.type = type;
     }
 
-    @Override public RexNode toRex(RexBuilder builder) {
-        return builder.makeLiteral(value, type, false);
+    @Override public <T> T implement(ExpImplementor<T> implementor) {
+        return implementor.implement(this);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
index b0947ac..4f3ed54 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
@@ -17,23 +17,20 @@
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexLocalRef;
-import org.apache.calcite.rex.RexNode;
 
 /**
  *
  */
-public class LocalRefExpression implements Expression {
-    private final RelDataType type;
-    private final int index;
+public class LocalRefExpression implements LogicalExpression {
+    public final ExpressionType type;
+    public final int index;
 
     public LocalRefExpression(RelDataType type, int index) {
-        this.type = type;
+        this.type = ExpressionType.fromType(type);
         this.index = index;
     }
 
-    @Override public RexNode toRex(RexBuilder builder) {
-        return new RexLocalRef(index, type);
+    @Override public <T> T implement(ExpImplementor<T> implementor) {
+        return implementor.implement(this);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LogicalExpression.java
similarity index 83%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LogicalExpression.java
index abe2b36..bc8a5bc 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LogicalExpression.java
@@ -16,12 +16,11 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
+import java.io.Serializable;
 
 /**
  *
  */
-public interface Expression {
-    RexNode toRex(RexBuilder builder);
+public interface LogicalExpression extends Serializable {
+    <T> T implement(ExpImplementor<T> implementor);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
similarity index 70%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
index abe2b36..14a2019 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
@@ -16,12 +16,17 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
+import java.io.Serializable;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
 
 /**
  *
  */
-public interface Expression {
-    RexNode toRex(RexBuilder builder);
+public class RelGraphNode implements GraphNode, Serializable {
+    protected RelTrait[] traits;
+
+    public RelGraphNode(RelTraitSet traits) {
+        this.traits = traits.toArray(new RelTrait[0]);
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
index c5eaae5..40a9158 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
@@ -19,7 +19,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.serialize;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.List;
-import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
@@ -28,54 +28,67 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
-import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
+import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 
 /**
  *
  */
-public class RelToGraphConverter implements Implementor<List<RelNode>> {
-    private Deque<List<RelNode>> stack1 = new ArrayDeque<>();
-    private Deque<Integer> stack2 = new ArrayDeque<>();
+public class RelToGraphConverter implements RelImplementor<Pair<Integer, 
List<IgniteRel>>> {
+    private Deque<Pair<Integer, List<IgniteRel>>> stack = new ArrayDeque<>();
     private Graph graph;
+    private int parentId;
 
-    public Graph convert(RelNode root) {
-        stack1 = new ArrayDeque<>();
-        stack2 = new ArrayDeque<>();
-
+    public Graph convert(IgniteRel root) {
+        stack = new ArrayDeque<>();
         graph = new Graph();
+        parentId = -1;
 
-        return null;
+        stack.push(root.implement(this));
+
+        while (!stack.isEmpty()) {
+            Pair<Integer, List<IgniteRel>> pair = stack.pop();
+
+            parentId = pair.left;
+
+            for (IgniteRel child : pair.right) {
+                stack.push(child.implement(this));
+            }
+        }
+
+        return graph;
     }
 
-    @Override public List<RelNode> implement(IgniteExchange rel) {
+    @Override public Pair<Integer, List<IgniteRel>> implement(IgniteFilter 
rel) {
         return null;
     }
 
-    @Override public List<RelNode> implement(IgniteFilter rel) {
+    @Override public Pair<Integer, List<IgniteRel>> implement(IgniteJoin rel) {
         return null;
     }
 
-    @Override public List<RelNode> implement(IgniteJoin rel) {
+    @Override public Pair<Integer, List<IgniteRel>> implement(IgniteProject 
rel) {
         return null;
     }
 
-    @Override public List<RelNode> implement(IgniteProject rel) {
+    @Override public Pair<Integer, List<IgniteRel>> implement(IgniteTableScan 
rel) {
         return null;
     }
 
-    @Override public List<RelNode> implement(IgniteTableScan rel) {
+    @Override public Pair<Integer, List<IgniteRel>> implement(Receiver rel) {
         return null;
     }
 
-    @Override public List<RelNode> implement(Receiver rel) {
+    @Override public Pair<Integer, List<IgniteRel>> implement(Sender rel) {
+        assert parentId == -1;
+
         return null;
     }
 
-    @Override public List<RelNode> implement(Sender rel) {
-        return null;
+    @Override public Pair<Integer, List<IgniteRel>> implement(IgniteExchange 
rel) {
+        throw new UnsupportedOperationException();
     }
 
-    @Override public List<RelNode> implement(IgniteRel other) {
-        return null;
+    @Override public Pair<Integer, List<IgniteRel>> implement(IgniteRel other) 
{
+        throw new AssertionError();
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
index a30edfa..dfb92d3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
@@ -36,62 +36,66 @@ import org.apache.calcite.rex.RexVisitor;
 /**
  *
  */
-public class RexToExpTranslator implements RexVisitor<Expression> {
-     @Override public Expression visitInputRef(RexInputRef inputRef) {
+public class RexToExpTranslator implements RexVisitor<LogicalExpression> {
+    public List<LogicalExpression> translate(List<RexNode> operands) {
+        ArrayList<LogicalExpression> res = new ArrayList<>(operands.size());
+
+        for (RexNode operand : operands) {
+            res.add(translate(operand));
+        }
+
+        return res;
+    }
+
+    public LogicalExpression translate(RexNode rex) {
+        return rex.accept(this);
+    }
+
+     @Override public LogicalExpression visitInputRef(RexInputRef inputRef) {
         return new InputRefExpression(inputRef.getType(), inputRef.getIndex());
     }
 
-    @Override public Expression visitLocalRef(RexLocalRef localRef) {
+    @Override public LogicalExpression visitLocalRef(RexLocalRef localRef) {
         return new LocalRefExpression(localRef.getType(), localRef.getIndex());
     }
 
-    @Override public Expression visitLiteral(RexLiteral literal) {
+    @Override public LogicalExpression visitLiteral(RexLiteral literal) {
         return new LiteralExpression(literal.getType(), literal.getValue());
     }
 
-    @Override public Expression visitCall(RexCall call) {
-        return new CallExpression(call.getType(), call.getOperator(), 
visitList(call.getOperands()));
+    @Override public LogicalExpression visitCall(RexCall call) {
+        return new CallExpression(call.getOperator(), 
translate(call.getOperands()));
     }
 
-    @Override public Expression visitOver(RexOver over) {
+    @Override public LogicalExpression visitOver(RexOver over) {
         throw new UnsupportedOperationException();
     }
 
-    @Override public Expression visitCorrelVariable(RexCorrelVariable 
correlVariable) {
+    @Override public LogicalExpression visitCorrelVariable(RexCorrelVariable 
correlVariable) {
         throw new UnsupportedOperationException();
     }
 
-    @Override public Expression visitDynamicParam(RexDynamicParam 
dynamicParam) {
+    @Override public LogicalExpression visitDynamicParam(RexDynamicParam 
dynamicParam) {
         throw new UnsupportedOperationException();
     }
 
-    @Override public Expression visitRangeRef(RexRangeRef rangeRef) {
+    @Override public LogicalExpression visitRangeRef(RexRangeRef rangeRef) {
         throw new UnsupportedOperationException();
     }
 
-    @Override public Expression visitFieldAccess(RexFieldAccess fieldAccess) {
+    @Override public LogicalExpression visitFieldAccess(RexFieldAccess 
fieldAccess) {
         throw new UnsupportedOperationException();
     }
 
-    @Override public Expression visitSubQuery(RexSubQuery subQuery) {
+    @Override public LogicalExpression visitSubQuery(RexSubQuery subQuery) {
         throw new UnsupportedOperationException();
     }
 
-    @Override public Expression visitTableInputRef(RexTableInputRef fieldRef) {
+    @Override public LogicalExpression visitTableInputRef(RexTableInputRef 
fieldRef) {
         throw new UnsupportedOperationException();
     }
 
-    @Override public Expression visitPatternFieldRef(RexPatternFieldRef 
fieldRef) {
+    @Override public LogicalExpression visitPatternFieldRef(RexPatternFieldRef 
fieldRef) {
         throw new UnsupportedOperationException();
     }
-
-    public List<Expression> visitList(List<RexNode> operands) {
-        ArrayList<Expression> res = new ArrayList<>(operands.size());
-
-        for (RexNode operand : operands) {
-            res.add(operand.accept(this));
-        }
-
-        return res;
-    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java
similarity index 59%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java
index d239c5c..41820d6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java
@@ -14,19 +14,23 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 
 /**
  *
  */
-public interface DestinationFunctionFactory {
-    DestinationFunction create(Context ctx, NodesMapping mapping, 
ImmutableIntList keys);
+public class SenderNode extends RelGraphNode {
+    private DistributionTrait targetDistr;
+    private NodesMapping targetMapping;
 
-    default Object key() {
-        return getClass();
+    public SenderNode(Sender sender) {
+        super(sender.getTraitSet());
+
+        targetDistr = sender.targetDistribution();
+        targetMapping = sender.targetMapping();
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java
new file mode 100644
index 0000000..45d5e44
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.LinkedHashMap;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+
+/**
+ *
+ */
+public class StructType implements ExpressionType {
+    private final LinkedHashMap<String, FieldType> fields;
+
+    public static StructType fromType(RelDataType type) {
+        assert type.isStruct();
+
+        LinkedHashMap<String, FieldType> fields = new LinkedHashMap<>();
+
+        for (RelDataTypeField field : type.getFieldList()) {
+            fields.put(field.getName(), FieldType.fromType(field.getType()));
+        }
+
+        return new StructType(fields);
+    }
+
+    private StructType(LinkedHashMap<String, FieldType> fields) {
+        this.fields = fields;
+    }
+
+    @Override public RelDataType toRelDataType(RelDataTypeFactory factory) {
+        RelDataTypeFactory.Builder builder = new 
RelDataTypeFactory.Builder(factory);
+        fields.forEach((n,f) -> builder.add(n,f.toRelDataType(factory)));
+        return builder.build();
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AbstractDestinationFunctionFactory.java
similarity index 65%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AbstractDestinationFunctionFactory.java
index 60e37ed..e6b6bb5 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AbstractDestinationFunctionFactory.java
@@ -16,32 +16,20 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.util.Objects;
+
 /**
  *
  */
-public enum DistributionType {
-    HASH("hash"),
-    RANDOM("random"),
-    BROADCAST("broadcast"),
-    SINGLE("single"),
-    ANY("any");
-
-    /**
-     *
-     */
-    private final String description;
-
-    /**
-     *
-     */
-    DistributionType(String description) {
-        this.description = description;
+public abstract class AbstractDestinationFunctionFactory implements 
DestinationFunctionFactory {
+    @Override public int hashCode() {
+        return Objects.hashCode(key());
     }
 
-    /**
-     *
-     */
-    @Override public String toString() {
-        return description;
+    @Override public boolean equals(Object obj) {
+        if (obj instanceof DestinationFunctionFactory)
+            return Objects.equals(key(), ((DestinationFunctionFactory) 
obj).key());
+
+        return false;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
similarity index 66%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
index d239c5c..535c3bb 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
@@ -16,17 +16,25 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.util.List;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.cluster.ClusterNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 
 /**
  *
  */
-public interface DestinationFunctionFactory {
-    DestinationFunction create(Context ctx, NodesMapping mapping, 
ImmutableIntList keys);
+class AllTargetsFactory extends AbstractDestinationFunctionFactory {
+    static final DestinationFunctionFactory INSTANCE = new AllTargetsFactory();
 
-    default Object key() {
-        return getClass();
+    @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
+        List<ClusterNode> nodes = m.nodes();
+
+        return r -> nodes;
+    }
+
+    @Override public Object key() {
+        return "AllTargetsFactory";
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
index d239c5c..587c172 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
@@ -16,6 +16,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.io.Serializable;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
@@ -23,10 +24,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
 /**
  *
  */
-public interface DestinationFunctionFactory {
+public interface DestinationFunctionFactory extends Serializable {
     DestinationFunction create(Context ctx, NodesMapping mapping, 
ImmutableIntList keys);
 
-    default Object key() {
-        return getClass();
-    }
+    Object key();
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index 34773f2..756bf6b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -16,6 +16,8 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Objects;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTrait;
@@ -25,12 +27,15 @@ import org.apache.calcite.util.ImmutableIntList;
 /**
  *
  */
-public final class DistributionTrait implements RelTrait {
-    private final DistributionType type;
-    private final ImmutableIntList keys;
-    private final DestinationFunctionFactory functionFactory;
+public final class DistributionTrait implements RelTrait, Serializable {
+    private DistributionType type;
+    private int[] keys;
+    private DestinationFunctionFactory functionFactory;
 
-    public DistributionTrait(DistributionType type, ImmutableIntList keys, 
DestinationFunctionFactory functionFactory) {
+    public DistributionTrait() {
+    }
+
+    public DistributionTrait(DistributionType type, int[] keys, 
DestinationFunctionFactory functionFactory) {
         this.type = type;
         this.keys = keys;
         this.functionFactory = functionFactory;
@@ -45,7 +50,7 @@ public final class DistributionTrait implements RelTrait {
     }
 
     public ImmutableIntList keys() {
-        return keys;
+        return ImmutableIntList.of(keys);
     }
 
     @Override public void register(RelOptPlanner planner) {}
@@ -57,18 +62,18 @@ public final class DistributionTrait implements RelTrait {
         if (o instanceof DistributionTrait) {
             DistributionTrait that = (DistributionTrait) o;
 
-            return type == that.type() && keys.equals(that.keys());
+            return type == that.type() && Arrays.equals(keys, that.keys);
         }
 
         return false;
     }
 
     @Override public int hashCode() {
-        return Objects.hash(type, keys);
+        return Objects.hash(type, Arrays.hashCode(keys));
     }
 
     @Override public String toString() {
-        return type + (type == DistributionType.HASH ? keys.toString()  : "");
+        return type + (type == DistributionType.HASH ? Arrays.toString(keys) : 
"");
     }
 
     @Override public RelTraitDef getTraitDef() {
@@ -89,10 +94,9 @@ public final class DistributionTrait implements RelTrait {
 
         if (type() == other.type())
             return type() != DistributionType.HASH
-                || (Objects.equals(keys(), other.keys())
-                    && Objects.equals(destinationFunctionFactory().key(), 
other.destinationFunctionFactory().key()));
+                || (Arrays.equals(keys, other.keys)
+                    && Objects.equals(functionFactory, other.functionFactory));
 
         return other.type() == DistributionType.RANDOM && type() == 
DistributionType.HASH;
     }
-
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
index 60e37ed..c67964f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
@@ -26,9 +26,7 @@ public enum DistributionType {
     SINGLE("single"),
     ANY("any");
 
-    /**
-     *
-     */
+    /** */
     private final String description;
 
     /**
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
new file mode 100644
index 0000000..c6a3eb5
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.trait;
+
+import java.util.List;
+import java.util.function.ToIntFunction;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.cluster.ClusterNode;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+class HashFunctionFactory extends AbstractDestinationFunctionFactory {
+    static final DestinationFunctionFactory INSTANCE = new 
HashFunctionFactory();
+
+    @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
+        assert m != null && !F.isEmpty(m.assignments());
+
+        int[] fields = k.toIntArray();
+
+        ToIntFunction<Object> hashFun = r -> {
+            Object[] row = (Object[]) r;
+
+            if (row == null)
+                return 0;
+
+            int hash = 1;
+
+            for (int i : fields)
+                hash = 31 * hash + (row[i] == null ? 0 : row[i].hashCode());
+
+            return hash;
+        };
+
+        List<List<ClusterNode>> assignments = m.assignments();
+
+        if (U.assertionsEnabled()) {
+            for (List<ClusterNode> assignment : assignments) {
+                assert F.isEmpty(assignment) || assignment.size() == 1;
+            }
+        }
+
+        return r -> assignments.get(hashFun.applyAsInt(r) % 
assignments.size());
+    }
+
+    @Override public Object key() {
+        return "HashFunctionFactory";
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 8363bbd..a95734a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -20,15 +20,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.ToIntFunction;
 import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.cluster.ClusterNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
@@ -37,42 +32,11 @@ import static 
org.apache.ignite.internal.processors.query.calcite.trait.Distribu
  *
  */
 public class IgniteDistributions {
-    private static final DestinationFunctionFactory NO_OP_FACTORY = (ctx, m, 
k) -> null;
-    private static final DestinationFunctionFactory HASH_FACTORY = (ctx, m, k) 
-> {
-        assert m != null && !F.isEmpty(m.assignments());
-
-        int[] fields = k.toIntArray();
-
-        ToIntFunction<Object> hashFun = r -> {
-            Object[] row = (Object[]) r;
-
-            if (row == null)
-                return 0;
-
-            int hash = 1;
-
-            for (int i : fields)
-                hash = 31 * hash + (row[i] == null ? 0 : row[i].hashCode());
-
-            return hash;
-        };
-
-        List<List<ClusterNode>> assignments = m.assignments();
-
-        if (U.assertionsEnabled()) {
-            for (List<ClusterNode> assignment : assignments) {
-                assert F.isEmpty(assignment) || assignment.size() == 1;
-            }
-        }
-
-        return r -> assignments.get(hashFun.applyAsInt(r) % 
assignments.size());
-    };
-
-
-    private static final DistributionTrait BROADCAST = new 
DistributionTrait(DistributionType.BROADCAST, ImmutableIntList.of(), 
allTargetsFunction());
-    private static final DistributionTrait SINGLE = new 
DistributionTrait(DistributionType.SINGLE, ImmutableIntList.of(), 
singleTargetFunction());
-    private static final DistributionTrait RANDOM = new 
DistributionTrait(DistributionType.RANDOM, ImmutableIntList.of(), 
randomTargetFunction());
-    private static final DistributionTrait ANY    = new 
DistributionTrait(DistributionType.ANY, ImmutableIntList.of(), noOpFunction());
+    private static final int[] EMPTY_KEYS = new int[0];
+    private static final DistributionTrait BROADCAST = new 
DistributionTrait(DistributionType.BROADCAST, EMPTY_KEYS, 
AllTargetsFactory.INSTANCE);
+    private static final DistributionTrait SINGLE = new 
DistributionTrait(DistributionType.SINGLE, EMPTY_KEYS, 
SingleTargetFactory.INSTANCE);
+    private static final DistributionTrait RANDOM = new 
DistributionTrait(DistributionType.RANDOM, EMPTY_KEYS, 
RandomTargetFactory.INSTANCE);
+    private static final DistributionTrait ANY    = new 
DistributionTrait(DistributionType.ANY, EMPTY_KEYS, NoOpFactory.INSTANCE);
 
     public static DistributionTrait any() {
         return ANY;
@@ -90,40 +54,12 @@ public class IgniteDistributions {
         return BROADCAST;
     }
 
-    public static DistributionTrait hash(List<Integer> keys, 
DestinationFunctionFactory factory) {
-        return new DistributionTrait(HASH, ImmutableIntList.copyOf(keys), 
factory);
+    public static DistributionTrait hash(List<Integer> keys) {
+        return new DistributionTrait(HASH, U.toIntArray(keys), 
HashFunctionFactory.INSTANCE);
     }
 
-    public static DestinationFunctionFactory noOpFunction() {
-        return NO_OP_FACTORY;
-    }
-
-    public static DestinationFunctionFactory singleTargetFunction() {
-        return (ctx, m, k) -> {
-            List<ClusterNode> nodes = m.nodes().subList(0, 1);
-
-            return r -> nodes;
-        };
-    }
-
-    public static DestinationFunctionFactory allTargetsFunction() {
-        return (ctx, m, k) -> {
-            List<ClusterNode> nodes = m.nodes();
-
-            return r -> nodes;
-        };
-    }
-
-    public static DestinationFunctionFactory randomTargetFunction() {
-        return (ctx, m, k) -> {
-            List<ClusterNode> nodes = m.nodes();
-
-            return r -> 
Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())));
-        };
-    }
-
-    public static DestinationFunctionFactory hashFunction() {
-        return HASH_FACTORY;
+    public static DistributionTrait hash(List<Integer> keys, 
DestinationFunctionFactory factory) {
+        return new DistributionTrait(HASH, U.toIntArray(keys), factory);
     }
 
     public static List<DistributionTrait> deriveDistributions(RelNode rel, 
RelMetadataQuery mq) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
similarity index 74%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
index d239c5c..5dff5a9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
@@ -23,10 +23,14 @@ import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
 /**
  *
  */
-public interface DestinationFunctionFactory {
-    DestinationFunction create(Context ctx, NodesMapping mapping, 
ImmutableIntList keys);
+class NoOpFactory extends AbstractDestinationFunctionFactory {
+    static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
 
-    default Object key() {
-        return getClass();
+    @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
+        return null;
+    }
+
+    @Override public Object key() {
+        return "NoOpFactory";
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
similarity index 59%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
index d239c5c..a4b27b7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
@@ -16,17 +16,28 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.cluster.ClusterNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 
 /**
  *
  */
-public interface DestinationFunctionFactory {
-    DestinationFunction create(Context ctx, NodesMapping mapping, 
ImmutableIntList keys);
+class RandomTargetFactory extends AbstractDestinationFunctionFactory {
+    static final DestinationFunctionFactory INSTANCE = new 
RandomTargetFactory();
 
-    default Object key() {
-        return getClass();
+    @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
+        List<ClusterNode> nodes = m.nodes();
+
+        return r -> 
Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())));
+    }
+
+    @Override public Object key() {
+        return "RandomTargetFactory";
     }
+
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
similarity index 65%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
index d239c5c..4d21a60 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
@@ -16,17 +16,25 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.util.List;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.cluster.ClusterNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 
 /**
  *
  */
-public interface DestinationFunctionFactory {
-    DestinationFunction create(Context ctx, NodesMapping mapping, 
ImmutableIntList keys);
+class SingleTargetFactory extends AbstractDestinationFunctionFactory {
+    static final DestinationFunctionFactory INSTANCE = new 
SingleTargetFactory();
 
-    default Object key() {
-        return getClass();
+    @Override public DestinationFunction create(Context ctx, NodesMapping m, 
ImmutableIntList k) {
+        List<ClusterNode> nodes = m.nodes().subList(0, 1);
+
+        return r -> nodes;
+    }
+
+    @Override public Object key() {
+        return "SingleTargetFactory";
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeFactory.java
similarity index 63%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeFactory.java
index 32cf357..83a2a19 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeFactory.java
@@ -14,14 +14,20 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.metadata;
+package org.apache.ignite.internal.processors.query.calcite.type;
 
-import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
 
 /**
  *
  */
-public interface DistributionRegistry {
-    DistributionTrait distribution(int cacheId, RowType rowType);
+public class IgniteTypeFactory extends JavaTypeFactoryImpl {
+    public IgniteTypeFactory() {
+        super(IgniteTypeSystem.DEFAULT);
+    }
+
+    public IgniteTypeFactory(RelDataTypeSystem typeSystem) {
+        super(typeSystem);
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeSystem.java
similarity index 65%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeSystem.java
index 32cf357..6dbfd02 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeSystem.java
@@ -14,14 +14,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.metadata;
+package org.apache.ignite.internal.processors.query.calcite.type;
 
-import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import java.io.Serializable;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
 
 /**
  *
  */
-public interface DistributionRegistry {
-    DistributionTrait distribution(int cacheId, RowType rowType);
+public class IgniteTypeSystem extends RelDataTypeSystemImpl implements 
Serializable {
+    public static final RelDataTypeSystem DEFAULT = new IgniteTypeSystem();
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/RowType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/RowType.java
similarity index 98%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/RowType.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/RowType.java
index 0da2c70..2133c56 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/RowType.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/RowType.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.schema;
+package org.apache.ignite.internal.processors.query.calcite.type;
 
 import java.util.ArrayList;
 import java.util.BitSet;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index d594bb4..aab2302 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -42,7 +42,7 @@ import 
org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
+import org.apache.ignite.internal.processors.query.calcite.type.RowType;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java
similarity index 97%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java
index c45af43..58ba064 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java
@@ -28,7 +28,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 /**
  *
  */
-public interface Implementor<T> {
+public interface RelImplementor<T> {
     T implement(IgniteExchange rel);
     T implement(IgniteFilter rel);
     T implement(IgniteJoin rel);
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 1891f44..bc7d179 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -33,7 +33,16 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.BinaryConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
@@ -45,15 +54,21 @@ import 
org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
 import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
-import 
org.apache.ignite.internal.processors.query.calcite.serialize.Expression;
+import 
org.apache.ignite.internal.processors.query.calcite.serialize.LogicalExpression;
 import 
org.apache.ignite.internal.processors.query.calcite.serialize.RexToExpTranslator;
 import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.RowType;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.marshaller.MarshallerContextTestImpl;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.systemview.jmx.JmxSystemViewExporterSpi;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -341,7 +356,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
 
         Project proj = (Project) relRoot.rel.getInput(0);
 
-        List<Expression> expressions = 
translator.visitList(proj.getProjects());
+        List<LogicalExpression> expressions = 
translator.translate(proj.getProjects());
 
         assertNotNull(expressions);
     }
@@ -412,7 +427,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
 
     @Test
     public void testSplitterCollocatedReplicatedReplicated() throws Exception {
-        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+        String sql = "SELECT d.id, (d.id + 1) as id2, d.name, d.projectId, 
p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +
             "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
             ") p " +
@@ -504,7 +519,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 if (cacheId == CU.cacheId("Project"))
                     return IgniteDistributions.broadcast();
 
-                return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
+                return IgniteDistributions.hash(rowType.distributionKeys());
             }
 
             @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
@@ -592,7 +607,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 if (cacheId == CU.cacheId("Project"))
                     return IgniteDistributions.broadcast();
 
-                return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
+                return IgniteDistributions.hash(rowType.distributionKeys());
             }
 
             @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
@@ -761,7 +776,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 if (cacheId == CU.cacheId("Project"))
                     return IgniteDistributions.broadcast();
 
-                return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
+                return IgniteDistributions.hash(rowType.distributionKeys());
             }
 
             @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
@@ -850,7 +865,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 if (cacheId == CU.cacheId("Project"))
                     return IgniteDistributions.broadcast();
 
-                return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
+                return IgniteDistributions.hash(rowType.distributionKeys());
             }
 
             @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
@@ -943,7 +958,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
         }
 
         @Override public DistributionTrait distribution(int cacheId, RowType 
rowType) {
-            return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
+            return IgniteDistributions.hash(rowType.distributionKeys());
         }
 
         @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
@@ -967,4 +982,40 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
             throw new AssertionError("Unexpected cache id:" + cacheId);
         }
     }
+
+    /**
+     * @return Binary marshaller.
+     */
+    private BinaryMarshaller binaryMarshaller() throws IgniteCheckedException {
+        IgniteConfiguration iCfg = new IgniteConfiguration();
+
+        BinaryConfiguration bCfg = new BinaryConfiguration();
+        iCfg.setBinaryConfiguration(bCfg);
+        iCfg.setClientMode(false);
+        iCfg.setDiscoverySpi(new TcpDiscoverySpi() {
+            @Override public void sendCustomEvent(DiscoverySpiCustomMessage 
msg) throws IgniteException {
+                //No-op.
+            }
+        });
+        iCfg.setSystemViewExporterSpi(new JmxSystemViewExporterSpi());
+
+        BinaryContext ctx = new 
BinaryContext(BinaryCachingMetadataHandler.create(), iCfg, new NullLogger());
+
+        BinaryMarshaller marsh = new BinaryMarshaller();
+
+        MarshallerContextTestImpl marshCtx = new 
MarshallerContextTestImpl(null, null);
+
+        GridTestKernalContext kernCtx = new GridTestKernalContext(log, iCfg);
+
+        kernCtx.add(new GridSystemViewManager(kernCtx));
+        kernCtx.add(new GridDiscoveryManager(kernCtx));
+
+        marshCtx.onMarshallerProcessorStarted(kernCtx, null);
+
+        marsh.setContext(marshCtx);
+
+        IgniteUtils.invoke(BinaryMarshaller.class, marsh, "setBinaryContext", 
ctx, iCfg);
+
+        return marsh;
+    }
 }
\ No newline at end of file

Reply via email to