This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new ecbb326 pending
ecbb326 is described below
commit ecbb326567bf7a0d53a5a6ad74f005ee0ed0c34e
Author: Igor Seliverstov <[email protected]>
AuthorDate: Fri Nov 22 15:51:19 2019 +0300
pending
---
.../query/calcite/cluster/RegistryImpl.java | 75 ++++++++------
.../calcite/metadata/IgniteMdFragmentInfo.java | 2 +-
.../query/calcite/metadata/NodesMapping.java | 52 +++++-----
.../query/calcite/prepare/IgnitePlanner.java | 56 +++++++++--
.../processors/query/calcite/rel/Receiver.java | 35 ++++++-
.../processors/query/calcite/rel/Sender.java | 22 +++++
.../query/calcite/serialize/ConversionContext.java | 92 ++++++++++++++++++
...Expression.java => DynamicParamExpression.java} | 8 +-
.../{ExpressionType.java => ExpDataType.java} | 6 +-
.../query/calcite/serialize/ExpImplementor.java | 2 +
.../calcite/serialize/ExpToRexTranslator.java | 4 +
.../query/calcite/serialize/FilterNode.java | 50 ++++++++++
.../processors/query/calcite/serialize/Graph.java | 52 ++++++----
.../query/calcite/serialize/GraphNode.java | 4 +-
.../{GraphNode.java => GraphToRelConverter.java} | 8 +-
.../calcite/serialize/InputRefExpression.java | 4 +-
.../query/calcite/serialize/JoinNode.java | 68 +++++++++++++
.../query/calcite/serialize/LiteralExpression.java | 4 +-
.../calcite/serialize/LocalRefExpression.java | 4 +-
.../query/calcite/serialize/ProjectNode.java | 46 +++++++++
.../query/calcite/serialize/ReceiverNode.java | 46 +++++++++
.../serialize/{GraphNode.java => RelGraph.java} | 2 +-
.../query/calcite/serialize/RelGraphNode.java | 17 ++--
.../calcite/serialize/RelToGraphConverter.java | 95 ++++++++++--------
.../calcite/serialize/RexToExpTranslator.java | 2 +-
.../query/calcite/serialize/SenderNode.java | 21 ++--
...GraphNode.java => SerializedCorrelationId.java} | 26 +++--
.../calcite/serialize/SerializedTraitSet.java | 75 ++++++++++++++
.../serialize/{FieldType.java => SimpleType.java} | 10 +-
.../query/calcite/serialize/StructType.java | 12 +--
.../{LiteralExpression.java => TableScanNode.java} | 22 +++--
.../query/calcite/splitter/Fragment.java | 32 ++++--
.../query/calcite/trait/AllTargetsFactory.java | 11 ++-
.../query/calcite/trait/DestinationFunction.java | 4 +-
.../query/calcite/trait/DistributionTrait.java | 35 +++++--
.../query/calcite/trait/HashFunctionFactory.java | 13 ++-
.../query/calcite/trait/IgniteDistributions.java | 15 ++-
.../query/calcite/trait/NoOpFactory.java | 7 +-
.../query/calcite/trait/RandomTargetFactory.java | 10 +-
.../query/calcite/trait/SingleTargetFactory.java | 13 ++-
.../processors/query/calcite/util/Commons.java | 108 +++++++--------------
.../query/calcite/CalciteQueryProcessorTest.java | 85 +++++++++++++++-
42 files changed, 949 insertions(+), 306 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
index 95ad49f..8e5773b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.processors.query.calcite.cluster;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import java.util.function.ToIntFunction;
import org.apache.calcite.plan.Context;
import org.apache.calcite.util.ImmutableIntList;
@@ -29,6 +30,7 @@ import
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
@@ -37,6 +39,7 @@ import
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunc
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.type.RowType;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -62,11 +65,13 @@ public class RegistryImpl implements DistributionRegistry,
LocationRegistry {
}
@Override public NodesMapping local() {
- return new
NodesMapping(Collections.singletonList(ctx.discovery().localNode()), null,
(byte) 0);
+ return new
NodesMapping(Collections.singletonList(ctx.discovery().localNode().id()), null,
(byte) 0);
}
@Override public NodesMapping random(AffinityTopologyVersion topVer) {
- return new
NodesMapping(ctx.discovery().discoCache(topVer).serverNodes(), null, (byte) 0);
+ List<ClusterNode> nodes =
ctx.discovery().discoCache(topVer).serverNodes();
+
+ return new NodesMapping(Commons.transform(nodes, ClusterNode::id),
null, (byte) 0);
}
@Override public NodesMapping distributed(int cacheId,
AffinityTopologyVersion topVer) {
@@ -79,35 +84,35 @@ public class RegistryImpl implements DistributionRegistry,
LocationRegistry {
byte flags = NodesMapping.HAS_PARTITIONED_CACHES;
List<List<ClusterNode>> assignments =
cctx.affinity().assignments(topVer);
+ List<List<UUID>> res;
if (cctx.config().getWriteSynchronizationMode() ==
CacheWriteSynchronizationMode.PRIMARY_SYNC) {
- List<List<ClusterNode>> assignments0 = new
ArrayList<>(assignments.size());
+ res = new ArrayList<>(assignments.size());
for (List<ClusterNode> partNodes : assignments)
- assignments0.add(F.isEmpty(partNodes) ?
Collections.emptyList() : Collections.singletonList(F.first(partNodes)));
-
- assignments = assignments0;
+ res.add(F.isEmpty(partNodes) ? Collections.emptyList() :
Collections.singletonList(F.first(partNodes).id()));
}
else if (!cctx.topology().rebalanceFinished(topVer)) {
- flags |= NodesMapping.HAS_MOVING_PARTITIONS;
+ res = new ArrayList<>(assignments.size());
- List<List<ClusterNode>> assignments0 = new
ArrayList<>(assignments.size());
+ flags |= NodesMapping.HAS_MOVING_PARTITIONS;
for (int part = 0; part < assignments.size(); part++) {
- List<ClusterNode> partNodes = assignments0.get(part),
partNodes0 = new ArrayList<>(partNodes.size());
+ List<ClusterNode> partNodes = assignments.get(part);
+ List<UUID> partIds = new ArrayList<>(partNodes.size());
- for (ClusterNode partNode : partNodes) {
- if (cctx.topology().partitionState(partNode.id(), part) ==
GridDhtPartitionState.OWNING)
- partNodes0.add(partNode);
+ for (ClusterNode node : partNodes) {
+ if (cctx.topology().partitionState(node.id(), part) ==
GridDhtPartitionState.OWNING)
+ partIds.add(node.id());
}
- assignments0.add(partNodes0);
+ res.add(partIds);
}
-
- assignments = assignments0;
}
+ else
+ res = Commons.transform(assignments, nodes ->
Commons.transform(nodes, ClusterNode::id));
- return new NodesMapping(null, assignments, flags);
+ return new NodesMapping(null, res, flags);
}
private NodesMapping replicatedLocation(GridCacheContext cctx,
AffinityTopologyVersion topVer) {
@@ -116,32 +121,38 @@ public class RegistryImpl implements
DistributionRegistry, LocationRegistry {
if (cctx.config().getNodeFilter() != null)
flags |= NodesMapping.PARTIALLY_REPLICATED;
+ GridDhtPartitionTopology topology = cctx.topology();
+
List<ClusterNode> nodes =
cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.cacheId());
+ List<UUID> res;
- if (!cctx.topology().rebalanceFinished(topVer)) {
+ if (!topology.rebalanceFinished(topVer)) {
flags |= NodesMapping.PARTIALLY_REPLICATED;
- List<ClusterNode> nodes0 = new ArrayList<>(nodes.size());
+ res = new ArrayList<>(nodes.size());
- int parts = cctx.topology().partitions();
+ int parts = topology.partitions();
- parent:
for (ClusterNode node : nodes) {
- for (int part = 0; part < parts; part++) {
- if (cctx.topology().partitionState(node.id(), part) !=
GridDhtPartitionState.OWNING)
- continue parent;
- }
-
- nodes0.add(node);
+ if (isOwner(node.id(), topology, parts))
+ res.add(node.id());
}
-
- nodes = nodes0;
}
+ else
+ res = Commons.transform(nodes, ClusterNode::id);
- return new NodesMapping(nodes, null, flags);
+ return new NodesMapping(res, null, flags);
+ }
+
+ private boolean isOwner(UUID nodeId, GridDhtPartitionTopology topology,
int parts) {
+ for (int p = 0; p < parts; p++) {
+ if (topology.partitionState(nodeId, p) !=
GridDhtPartitionState.OWNING)
+ return false;
+ }
+ return true;
}
- private static class AffinityFactory extends
AbstractDestinationFunctionFactory {
+ private final static class AffinityFactory extends
AbstractDestinationFunctionFactory {
private final int cacheId;
private final Object key;
@@ -153,10 +164,10 @@ public class RegistryImpl implements
DistributionRegistry, LocationRegistry {
@Override public DestinationFunction create(Context ctx, NodesMapping
mapping, ImmutableIntList keys) {
assert keys.size() == 1 && mapping != null &&
!F.isEmpty(mapping.assignments());
- List<List<ClusterNode>> assignments = mapping.assignments();
+ List<List<UUID>> assignments = mapping.assignments();
if (U.assertionsEnabled()) {
- for (List<ClusterNode> assignment : assignments) {
+ for (List<UUID> assignment : assignments) {
assert F.isEmpty(assignment) || assignment.size() == 1;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
index a1f09a7..4a96cf3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
@@ -93,7 +93,7 @@ public class IgniteMdFragmentInfo implements
MetadataHandler<FragmentMetadata> {
}
public FragmentInfo getFragmentInfo(Receiver rel, RelMetadataQuery mq) {
- return new FragmentInfo(rel.source());
+ return new FragmentInfo(rel.sourceFragment());
}
public FragmentInfo getFragmentInfo(IgniteTableScan rel, RelMetadataQuery
mq) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
index fdbed1d..16bafc0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
@@ -22,7 +22,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
-import org.apache.ignite.cluster.ClusterNode;
+import java.util.UUID;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.typedef.F;
@@ -38,21 +38,21 @@ public class NodesMapping implements Serializable {
public static final byte PARTIALLY_REPLICATED = 1 << 3;
public static final byte DEDUPLICATED = 1 << 4;
- private final List<ClusterNode> nodes;
- private final List<List<ClusterNode>> assignments;
+ private final List<UUID> nodes;
+ private final List<List<UUID>> assignments;
private final byte flags;
- public NodesMapping(List<ClusterNode> nodes, List<List<ClusterNode>>
assignments, byte flags) {
+ public NodesMapping(List<UUID> nodes, List<List<UUID>> assignments, byte
flags) {
this.nodes = nodes;
this.assignments = assignments;
this.flags = flags;
}
- public List<ClusterNode> nodes() {
+ public List<UUID> nodes() {
return nodes;
}
- public List<List<ClusterNode>> assignments() {
+ public List<List<UUID>> assignments() {
return assignments;
}
@@ -62,7 +62,7 @@ public class NodesMapping implements Serializable {
if ((flags & PARTIALLY_REPLICATED) == 0)
return new NodesMapping(U.firstNotNull(nodes, other.nodes),
mergeAssignments(other, null), flags);
- List<ClusterNode> nodes;
+ List<UUID> nodes;
if (this.nodes == null)
nodes = other.nodes;
@@ -81,11 +81,11 @@ public class NodesMapping implements Serializable {
if (assignments == null || !excessive())
return this;
- HashSet<ClusterNode> nodes0 = new HashSet<>();
- List<List<ClusterNode>> assignments0 = new
ArrayList<>(assignments.size());
+ HashSet<UUID> nodes0 = new HashSet<>();
+ List<List<UUID>> assignments0 = new ArrayList<>(assignments.size());
- for (List<ClusterNode> partNodes : assignments) {
- ClusterNode node = F.first(partNodes);
+ for (List<UUID> partNodes : assignments) {
+ UUID node = F.first(partNodes);
if (node == null)
throw new LocationMappingException("Failed to map fragment to
location.");
@@ -97,14 +97,14 @@ public class NodesMapping implements Serializable {
return new NodesMapping(new ArrayList<>(nodes0), assignments0,
(byte)(flags | DEDUPLICATED));
}
- public int[] partitions(ClusterNode node) {
+ public int[] partitions(UUID node) {
if (assignments == null)
return null;
GridIntList parts = new GridIntList(assignments.size());
for (int i = 0; i < assignments.size(); i++) {
- List<ClusterNode> assignment = assignments.get(i);
+ List<UUID> assignment = assignments.get(i);
if (Objects.equals(node, F.first(assignment)))
parts.add(i);
}
@@ -132,25 +132,25 @@ public class NodesMapping implements Serializable {
return (flags & PARTIALLY_REPLICATED) == PARTIALLY_REPLICATED;
}
- private List<List<ClusterNode>> mergeAssignments(NodesMapping other,
List<ClusterNode> nodes) throws LocationMappingException {
- byte flags = (byte) (this.flags | other.flags);
List<List<ClusterNode>> left = assignments, right = other.assignments;
+ private List<List<UUID>> mergeAssignments(NodesMapping other, List<UUID>
nodes) throws LocationMappingException {
+ byte flags = (byte) (this.flags | other.flags); List<List<UUID>> left
= assignments, right = other.assignments;
if (left == null && right == null)
return null; // nothing to intersect;
if (left == null || right == null || (flags & HAS_MOVING_PARTITIONS)
== 0) {
- List<List<ClusterNode>> assignments = U.firstNotNull(left, right);
+ List<List<UUID>> assignments = U.firstNotNull(left, right);
if (nodes == null || (flags & PARTIALLY_REPLICATED) == 0)
return assignments;
- List<List<ClusterNode>> assignments0 = new
ArrayList<>(assignments.size());
- HashSet<ClusterNode> nodesSet = new HashSet<>(nodes);
+ List<List<UUID>> assignments0 = new
ArrayList<>(assignments.size());
+ HashSet<UUID> nodesSet = new HashSet<>(nodes);
- for (List<ClusterNode> partNodes : assignments) {
- List<ClusterNode> partNodes0 = new
ArrayList<>(partNodes.size());
+ for (List<UUID> partNodes : assignments) {
+ List<UUID> partNodes0 = new ArrayList<>(partNodes.size());
- for (ClusterNode partNode : partNodes) {
+ for (UUID partNode : partNodes) {
if (nodesSet.contains(partNode))
partNodes0.add(partNode);
}
@@ -164,14 +164,14 @@ public class NodesMapping implements Serializable {
return assignments0;
}
- List<List<ClusterNode>> assignments = new ArrayList<>(left.size());
- HashSet<ClusterNode> nodesSet = nodes != null ? new HashSet<>(nodes) :
null;
+ List<List<UUID>> assignments = new ArrayList<>(left.size());
+ HashSet<UUID> nodesSet = nodes != null ? new HashSet<>(nodes) : null;
for (int i = 0; i < left.size(); i++) {
- List<ClusterNode> leftNodes = left.get(i), partNodes = new
ArrayList<>(leftNodes.size());
- HashSet<ClusterNode> rightNodesSet = new HashSet<>(right.get(i));
+ List<UUID> leftNodes = left.get(i), partNodes = new
ArrayList<>(leftNodes.size());
+ HashSet<UUID> rightNodesSet = new HashSet<>(right.get(i));
- for (ClusterNode partNode : leftNodes) {
+ for (UUID partNode : leftNodes) {
if (rightNodesSet.contains(partNode) && (nodesSet == null ||
nodesSet.contains(partNode)))
partNodes.add(partNode);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index e7f4bb8..c9f9f26 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -26,11 +26,13 @@ import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCostImpl;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.plan.RelTraitSet;
@@ -69,11 +71,18 @@ import org.apache.calcite.tools.RuleSet;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
+import
org.apache.ignite.internal.processors.query.calcite.serialize.ConversionContext;
import org.apache.ignite.internal.processors.query.calcite.serialize.Graph;
+import org.apache.ignite.internal.processors.query.calcite.serialize.RelGraph;
+import
org.apache.ignite.internal.processors.query.calcite.serialize.RelGraphNode;
+import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.util.typedef.F;
/**
*
@@ -200,6 +209,28 @@ public class IgnitePlanner implements Planner,
RelOptTable.ViewExpander {
return rel(sql).rel;
}
+ public RelNode convert(RelGraph graph) {
+ ready();
+
+ CalciteCatalogReader catalogReader = createCatalogReader();
+ RexBuilder rexBuilder = createRexBuilder();
+ RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
+ RelBuilder relBuilder = createRelBuilder(cluster, catalogReader);
+
+ ConversionContext ctx = new ConversionContext(this, relBuilder,
operatorTable);
+
+ return F.first(convertRecursive(ctx, graph, graph.nodes().subList(0,
1)));
+ }
+
+ private List<RelNode> convertRecursive(ConversionContext ctx, RelGraph
graph, List<Ord<RelGraphNode>> src) {
+ ImmutableList.Builder<RelNode> b = ImmutableList.builder();
+
+ for (Ord<RelGraphNode> node : src)
+ b.add(node.e.toRel(ctx, convertRecursive(ctx, graph,
graph.children(node.i))));
+
+ return b.build();
+ }
+
/** {@inheritDoc} */
@Override public RelRoot rel(SqlNode sql) {
ready();
@@ -215,18 +246,21 @@ public class IgnitePlanner implements Planner,
RelOptTable.ViewExpander {
new SqlToRelConverter(this, validator, createCatalogReader(),
cluster, convertletTable, config);
RelRoot root = sqlToRelConverter.convertQuery(sql, false, true);
root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
- RelBuilder relBuilder = config.getRelBuilderFactory().create(cluster,
null);
+ RelBuilder relBuilder = createRelBuilder(cluster, null);
root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel,
relBuilder));
return root;
}
- public RelNode convert(Graph graph) {
+ public QueryPlan plan(RelNode rel) {
ready();
- return null; // TODO
+ if (rel.getConvention() != IgniteRel.IGNITE_CONVENTION)
+ throw new IllegalArgumentException("IGNITE_CONVENTION is
required.");
+
+ return new Splitter().go((IgniteRel) rel);
}
- public Graph convert(RelNode node) {
+ public Graph graph(RelNode node) {
ready();
return null; // TODO
@@ -265,7 +299,7 @@ public class IgnitePlanner implements Planner,
RelOptTable.ViewExpander {
RelRoot root = sqlToRelConverter.convertQuery(sqlNode, true, false);
RelRoot root2 = root.withRel(sqlToRelConverter.flattenTypes(root.rel,
true));
- RelBuilder relBuilder = config.getRelBuilderFactory().create(cluster,
null);
+ RelBuilder relBuilder = createRelBuilder(cluster, null);
return root2.withRel(RelDecorrelator.decorrelateQuery(root.rel,
relBuilder));
}
@@ -328,19 +362,23 @@ public class IgnitePlanner implements Planner,
RelOptTable.ViewExpander {
return typeFactory;
}
- public SqlConformance conformance() {
+ private SqlConformance conformance() {
return connectionConfig.conformance();
}
- public SqlOperatorTable operatorTable() {
+ private SqlOperatorTable operatorTable() {
return operatorTable;
}
- public RexBuilder createRexBuilder() {
+ private RexBuilder createRexBuilder() {
return new RexBuilder(typeFactory);
}
- public CalciteCatalogReader createCatalogReader() {
+ private RelBuilder createRelBuilder(RelOptCluster cluster, RelOptSchema
schema) {
+ return sqlToRelConverterConfig.getRelBuilderFactory().create(cluster,
schema);
+ }
+
+ private CalciteCatalogReader createCatalogReader() {
SchemaPlus rootSchema = rootSchema(defaultSchema);
return new CalciteCatalogReader(
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
index 47cd875..cde4290 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
@@ -20,6 +20,7 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.type.RelDataType;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
@@ -27,16 +28,31 @@ import
org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
*
*/
public final class Receiver extends AbstractRelNode implements IgniteRel {
- private final Fragment source;
+ private final Fragment sourceFragment;
+ private final NodesMapping sourceMapping;
/**
* @param cluster Cluster this relational expression belongs to
* @param traits Trait set.
*/
- public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType
rowType, Fragment source) {
+ public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType
rowType, Fragment sourceFragment) {
super(cluster, traits);
this.rowType = rowType;
- this.source = source;
+ this.sourceFragment = sourceFragment;
+
+ sourceMapping = null;
+ }
+
+ /**
+ * @param cluster Cluster this relational expression belongs to
+ * @param traits Trait set.
+ */
+ public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType
rowType, NodesMapping sourceMapping) {
+ super(cluster, traits);
+ this.rowType = rowType;
+ this.sourceMapping = sourceMapping;
+
+ sourceFragment = null;
}
/** {@inheritDoc} */
@@ -44,7 +60,16 @@ public final class Receiver extends AbstractRelNode
implements IgniteRel {
return implementor.implement(this);
}
- public Fragment source() {
- return source;
+ public Fragment sourceFragment() {
+ return sourceFragment;
+ }
+
+ public NodesMapping sourceMapping() {
+ if (sourceFragment != null)
+ return sourceFragment.mapping();
+
+ assert sourceMapping != null;
+
+ return sourceMapping;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
index 8e7c956..aa279b2 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
@@ -20,9 +20,12 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
import org.jetbrains.annotations.NotNull;
@@ -47,6 +50,14 @@ public final class Sender extends SingleRel implements
IgniteRel {
this.targetDistr = targetDistr;
}
+ private Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+ @NotNull DistributionTrait targetDistr, @NotNull NodesMapping
targetMapping) {
+ super(cluster, traits, input);
+
+ this.targetDistr = targetDistr;
+ this.targetMapping = targetMapping;
+ }
+
/** {@inheritDoc} */
@Override public <T> T implement(RelImplementor<T> implementor) {
return implementor.implement(this);
@@ -67,4 +78,15 @@ public final class Sender extends SingleRel implements
IgniteRel {
public DestinationFunction targetFunction(org.apache.calcite.plan.Context
ctx) {
return targetDistr.destinationFunctionFactory().create(ctx,
targetMapping, targetDistr.keys());
}
+
+ public static Sender create(RelNode input, DistributionTrait targetDistr,
NodesMapping targetMapping) {
+ RelOptCluster cluster = input.getCluster();
+ RelMetadataQuery mq = cluster.getMetadataQuery();
+
+ RelTraitSet traits = cluster.traitSet()
+ .replace(IgniteRel.IGNITE_CONVENTION)
+ .replaceIf(DistributionTraitDef.INSTANCE, () ->
IgniteMdDistribution.distribution(input, mq));
+
+ return new Sender(cluster, traits, input, targetDistr, targetMapping);
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ConversionContext.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ConversionContext.java
new file mode 100644
index 0000000..a2ed800
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ConversionContext.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.List;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+
+/**
+ *
+ */
+public class ConversionContext implements RelOptTable.ToRelContext {
+ private final IgnitePlanner planner;
+ private final SqlOperatorTable operatorTable;
+ private final RelBuilder relBuilder;
+ private final ExpToRexTranslator expTranslator;
+
+ public ConversionContext(IgnitePlanner planner, RelBuilder relBuilder,
SqlOperatorTable operatorTable) {
+ this.planner = planner;
+ this.relBuilder = relBuilder;
+ this.operatorTable = operatorTable;
+
+ expTranslator = new ExpToRexTranslator(rexBuilder(), typeFactory(),
operatorTable);
+ }
+
+ public IgnitePlanner planner() {
+ return planner;
+ }
+
+ public RelDataTypeFactory typeFactory() {
+ return cluster().getTypeFactory();
+ }
+
+ public SqlOperatorTable operatorTable() {
+ return operatorTable;
+ }
+
+ public RelOptSchema schema() {
+ return relBuilder().getRelOptSchema();
+ }
+
+ public RelOptCluster cluster() {
+ return relBuilder().getCluster();
+ }
+
+ public Context context() {
+ return cluster().getPlanner().getContext();
+ }
+
+ public RelBuilder relBuilder() {
+ return relBuilder;
+ }
+
+ public RexBuilder rexBuilder() {
+ return cluster().getRexBuilder();
+ }
+
+ public ExpToRexTranslator expressionTranslator() {
+ return expTranslator;
+ }
+
+ @Override public RelOptCluster getCluster() {
+ return cluster();
+ }
+
+ @Override public RelRoot expandView(RelDataType rowType, String
queryString, List<String> schemaPath, List<String> viewPath) {
+ return planner.expandView(rowType, queryString, schemaPath, viewPath);
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/DynamicParamExpression.java
similarity index 82%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/DynamicParamExpression.java
index bc12e25..8019797 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/DynamicParamExpression.java
@@ -21,12 +21,12 @@ import org.apache.calcite.rel.type.RelDataType;
/**
*
*/
-public class InputRefExpression implements LogicalExpression {
- public final ExpressionType type;
+public class DynamicParamExpression implements LogicalExpression {
+ public final ExpDataType type;
public final int index;
- public InputRefExpression(RelDataType type, int index) {
- this.type = ExpressionType.fromType(type);
+ public DynamicParamExpression(RelDataType type, int index) {
+ this.type = ExpDataType.fromType(type);
this.index = index;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpressionType.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpDataType.java
similarity index 83%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpressionType.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpDataType.java
index 05762d8..4294854 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpressionType.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpDataType.java
@@ -23,9 +23,9 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
/**
*
*/
-public interface ExpressionType extends Serializable {
- static ExpressionType fromType(RelDataType type) {
- return type.isStruct() ? StructType.fromType(type) :
FieldType.fromType(type);
+public interface ExpDataType extends Serializable {
+ static ExpDataType fromType(RelDataType type) {
+ return type.isStruct() ? StructType.fromType(type) :
SimpleType.fromType(type);
}
RelDataType toRelDataType(RelDataTypeFactory factory);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java
index 1c08bd9..a76db0b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java
@@ -27,4 +27,6 @@ public interface ExpImplementor<T> {
T implement(LiteralExpression literalExpression);
T implement(LocalRefExpression localRefExpression);
+
+ T implement(DynamicParamExpression dynamicParamExpression);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java
index 820043f..9b09ef0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java
@@ -90,6 +90,10 @@ public class ExpToRexTranslator implements
ExpImplementor<RexNode> {
return new RexLocalRef(exp.index, exp.type.toRelDataType(typeFactory));
}
+ @Override public RexNode implement(DynamicParamExpression exp) {
+ return builder.makeDynamicParam(exp.type.toRelDataType(typeFactory),
exp.index);
+ }
+
private SqlOperator op(String name, SqlSyntax syntax) {
return ops.get(Pair.of(name, syntax));
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FilterNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FilterNode.java
new file mode 100644
index 0000000..4c87ccc
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FilterNode.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class FilterNode extends RelGraphNode {
+ private final int[] variables;
+ private final LogicalExpression condition;
+
+ private FilterNode(LogicalExpression condition, int[] variables) {
+ this.variables = variables;
+ this.condition = condition;
+ }
+
+ public static FilterNode create(IgniteFilter rel, RexToExpTranslator
expTranslator) {
+ return new FilterNode(expTranslator.translate(rel.getCondition()),
+
rel.getVariablesSet().stream().mapToInt(CorrelationId::getId).toArray());
+ }
+
+ @Override public RelNode toRel(ConversionContext ctx, List<RelNode>
children) {
+ return IgniteFilter.create(
+ F.first(children),
+ condition.implement(ctx.expressionTranslator()),
+
Arrays.stream(variables).mapToObj(CorrelationId::new).collect(Collectors.toSet()));
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
index 06b7b04..0427a5f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
@@ -19,16 +19,34 @@ package
org.apache.ignite.internal.processors.query.calcite.serialize;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.GridIntList;
/**
*
*/
-public class Graph implements Serializable {
- private final List<GraphNode> nodes = new ArrayList<>();
+public class Graph<T extends GraphNode> implements Serializable {
+ private final List<T> nodes = new ArrayList<>();
private final List<GridIntList> edges = new ArrayList<>();
- int addNode(GraphNode node) {
+ public List<Ord<T>> nodes() {
+ return Ord.zip(nodes);
+ }
+
+ public List<GridIntList> edges() {
+ return Commons.transform(edges, GridIntList::copy);
+ }
+
+ public int addNode(int parentId, T node) {
+ int id = addNode(node);
+
+ addEdge(parentId, id);
+
+ return id;
+ }
+
+ public int addNode(T node) {
assert nodes.size() == edges.size();
int id = nodes.size();
@@ -39,26 +57,22 @@ public class Graph implements Serializable {
return id;
}
- void addEdge(int parentId, int childId) {
- edges.get(parentId).add(childId);
- }
-
- int addChild(int parentId, GraphNode node) {
- int id = addNode(node);
-
- edges.get(parentId).add(id);
+ public void addEdge(int parentId, int childId) {
+ assert parentId == -1 || (parentId >= 0 && parentId < edges.size());
+ assert nodes.size() == edges.size();
- return id;
+ if (parentId != -1)
+ edges.get(parentId).add(childId);
}
- List<GraphNode> children(int parentId) {
- GridIntList childrenIds = edges.get(parentId);
- ArrayList<GraphNode> children = new ArrayList<>(childrenIds.size());
+ public List<Ord<T>> children(int parentId) {
+ GridIntList children = edges.get(parentId);
+
+ ArrayList<Ord<T>> ords = new ArrayList<>(children.size());
- for (int i = 0; i < childrenIds.size(); i++) {
- children.add(nodes.get(i));
- }
+ for (int i = 0; i < children.size(); i++)
+ ords.add(Ord.of(children.get(i), nodes.get(children.get(i))));
- return children;
+ return ords;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
index f2f533a..82e3f22 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
@@ -16,8 +16,10 @@
package org.apache.ignite.internal.processors.query.calcite.serialize;
+import java.io.Serializable;
+
/**
*
*/
-public interface GraphNode {
+public interface GraphNode extends Serializable {
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphToRelConverter.java
similarity index 75%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphToRelConverter.java
index f2f533a..abf4cdf 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphToRelConverter.java
@@ -16,8 +16,14 @@
package org.apache.ignite.internal.processors.query.calcite.serialize;
+import org.apache.calcite.rel.RelNode;
+import
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+
/**
*
*/
-public interface GraphNode {
+public class GraphToRelConverter {
+ public RelNode convert(IgnitePlanner planner, Graph graph) {
+ return null;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
index bc12e25..9a52eb3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
@@ -22,11 +22,11 @@ import org.apache.calcite.rel.type.RelDataType;
*
*/
public class InputRefExpression implements LogicalExpression {
- public final ExpressionType type;
+ public final ExpDataType type;
public final int index;
public InputRefExpression(RelDataType type, int index) {
- this.type = ExpressionType.fromType(type);
+ this.type = ExpDataType.fromType(type);
this.index = index;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/JoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/JoinNode.java
new file mode 100644
index 0000000..f08a928
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/JoinNode.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+
+/**
+ *
+ */
+public class JoinNode extends RelGraphNode {
+ private final LogicalExpression condition;
+ private final int[] variables;
+ private final JoinRelType joinType;
+ private final boolean semiDone;
+
+ private JoinNode(RelTraitSet traits, LogicalExpression condition, int[]
variables, JoinRelType joinType, boolean semiDone) {
+ super(traits);
+ this.condition = condition;
+ this.variables = variables;
+ this.joinType = joinType;
+ this.semiDone = semiDone;
+ }
+
+ public static JoinNode create(IgniteJoin rel, RexToExpTranslator
expTranslator) {
+ return new JoinNode(rel.getTraitSet(),
+ expTranslator.translate(rel.getCondition()),
+
rel.getVariablesSet().stream().mapToInt(CorrelationId::getId).toArray(),
+ rel.getJoinType(),
+ rel.isSemiJoinDone());
+ }
+
+ @Override public RelNode toRel(ConversionContext ctx, List<RelNode>
children) {
+ assert children.size() == 2;
+
+ RelNode left = children.get(0);
+ RelNode right = children.get(1);
+
+ return new IgniteJoin(ctx.cluster(),
+ traitSet.toTraitSet(ctx.cluster()),
+ left,
+ right,
+ ctx.expressionTranslator().translate(condition),
+
Arrays.stream(variables).mapToObj(CorrelationId::new).collect(Collectors.toSet()),
+ joinType,
+ semiDone);
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
index 3c95b96..2dba7d1 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
@@ -22,11 +22,11 @@ import org.apache.calcite.rel.type.RelDataType;
*
*/
public class LiteralExpression implements LogicalExpression {
- public final ExpressionType type;
+ public final ExpDataType type;
public final Comparable value;
public LiteralExpression(RelDataType type, Comparable value) {
- this.type = ExpressionType.fromType(type);
+ this.type = ExpDataType.fromType(type);
this.value = value;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
index 4f3ed54..bc5a77e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
@@ -22,11 +22,11 @@ import org.apache.calcite.rel.type.RelDataType;
*
*/
public class LocalRefExpression implements LogicalExpression {
- public final ExpressionType type;
+ public final ExpDataType type;
public final int index;
public LocalRefExpression(RelDataType type, int index) {
- this.type = ExpressionType.fromType(type);
+ this.type = ExpDataType.fromType(type);
this.index = index;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ProjectNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ProjectNode.java
new file mode 100644
index 0000000..07c2cf5
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ProjectNode.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.List;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class ProjectNode extends RelGraphNode {
+ private final List<LogicalExpression> projects;
+ private final ExpDataType dataType;
+
+ private ProjectNode(List<LogicalExpression> projects, ExpDataType
dataType) {
+ this.projects = projects;
+ this.dataType = dataType;
+ }
+
+ public static ProjectNode create(IgniteProject rel, RexToExpTranslator
rexTranslator) {
+ return new ProjectNode(rexTranslator.translate(rel.getProjects()),
+ ExpDataType.fromType(rel.getRowType()));
+ }
+
+ @Override public RelNode toRel(ConversionContext ctx, List<RelNode>
children) {
+ return IgniteProject.create(F.first(children),
+ ctx.expressionTranslator().translate(projects),
+ dataType.toRelDataType(ctx.typeFactory()));
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverNode.java
new file mode 100644
index 0000000..93405cd
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverNode.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.List;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+
+
+/**
+ *
+ */
+public class ReceiverNode extends RelGraphNode {
+ private final ExpDataType dataType;
+ private final NodesMapping sourceMapping;
+
+ private ReceiverNode(RelTraitSet traits, ExpDataType dataType,
NodesMapping sourceMapping) {
+ super(traits);
+ this.dataType = dataType;
+ this.sourceMapping = sourceMapping;
+ }
+
+ public static ReceiverNode create(Receiver rel) {
+ return new ReceiverNode(rel.getTraitSet(),
ExpDataType.fromType(rel.getRowType()), rel.sourceMapping());
+ }
+
+ @Override public RelNode toRel(ConversionContext ctx, List<RelNode>
children) {
+ return new Receiver(ctx.cluster(), traitSet.toTraitSet(ctx.cluster()),
dataType.toRelDataType(ctx.typeFactory()), sourceMapping);
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraph.java
similarity index 93%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraph.java
index f2f533a..c00837a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraph.java
@@ -19,5 +19,5 @@ package
org.apache.ignite.internal.processors.query.calcite.serialize;
/**
*
*/
-public interface GraphNode {
+public class RelGraph extends Graph<RelGraphNode> {
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
index 14a2019..164274f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
@@ -16,17 +16,22 @@
package org.apache.ignite.internal.processors.query.calcite.serialize;
-import java.io.Serializable;
-import org.apache.calcite.plan.RelTrait;
+import java.util.List;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
/**
*
*/
-public class RelGraphNode implements GraphNode, Serializable {
- protected RelTrait[] traits;
+public abstract class RelGraphNode implements GraphNode {
+ protected SerializedTraitSet traitSet;
- public RelGraphNode(RelTraitSet traits) {
- this.traits = traits.toArray(new RelTrait[0]);
+ protected RelGraphNode() {
}
+
+ protected RelGraphNode(RelTraitSet traits) {
+ traitSet = new SerializedTraitSet(traits);
+ }
+
+ public abstract RelNode toRel(ConversionContext ctx, List<RelNode>
children);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
index 40a9158..7de26c7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.processors.query.calcite.serialize;
import java.util.ArrayDeque;
+import java.util.Collections;
import java.util.Deque;
import java.util.List;
-import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
@@ -28,67 +28,80 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
+import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
-public class RelToGraphConverter implements RelImplementor<Pair<Integer,
List<IgniteRel>>> {
- private Deque<Pair<Integer, List<IgniteRel>>> stack = new ArrayDeque<>();
- private Graph graph;
- private int parentId;
+public class RelToGraphConverter {
+ private final RexToExpTranslator rexTranslator = new RexToExpTranslator();
- public Graph convert(IgniteRel root) {
- stack = new ArrayDeque<>();
- graph = new Graph();
- parentId = -1;
+ private RelGraph graph;
+ private int curParent;
- stack.push(root.implement(this));
+ private static final class Item {
+ final int parentId;
+ final List<IgniteRel> children;
- while (!stack.isEmpty()) {
- Pair<Integer, List<IgniteRel>> pair = stack.pop();
+ private Item(int parentId, List<IgniteRel> children) {
+ this.parentId = parentId;
+ this.children = children;
+ }
+ }
- parentId = pair.left;
+ private final class Implementor implements RelImplementor<Item> {
+ @Override public Item implement(IgniteFilter rel) {
+ return new Item(graph.addNode(curParent, FilterNode.create(rel,
rexTranslator)), Commons.cast(rel.getInputs()));
+ }
- for (IgniteRel child : pair.right) {
- stack.push(child.implement(this));
- }
+ @Override public Item implement(IgniteJoin rel) {
+ return new Item(graph.addNode(curParent, JoinNode.create(rel,
rexTranslator)), Commons.cast(rel.getInputs()));
}
- return graph;
- }
+ @Override public Item implement(IgniteProject rel) {
+ return new Item(graph.addNode(curParent, ProjectNode.create(rel,
rexTranslator)), Commons.cast(rel.getInputs()));
+ }
- @Override public Pair<Integer, List<IgniteRel>> implement(IgniteFilter
rel) {
- return null;
- }
+ @Override public Item implement(IgniteTableScan rel) {
+ return new Item(graph.addNode(curParent,
TableScanNode.create(rel)), Commons.cast(rel.getInputs()));
+ }
- @Override public Pair<Integer, List<IgniteRel>> implement(IgniteJoin rel) {
- return null;
- }
+ @Override public Item implement(Receiver rel) {
+ return new Item(graph.addNode(curParent,
ReceiverNode.create(rel)), Collections.emptyList());
+ }
- @Override public Pair<Integer, List<IgniteRel>> implement(IgniteProject
rel) {
- return null;
- }
+ @Override public Item implement(Sender rel) {
+ return new Item(graph.addNode(curParent, SenderNode.create(rel)),
Commons.cast(rel.getInputs()));
+ }
- @Override public Pair<Integer, List<IgniteRel>> implement(IgniteTableScan
rel) {
- return null;
- }
+ @Override public Item implement(IgniteExchange rel) {
+ throw new UnsupportedOperationException();
+ }
- @Override public Pair<Integer, List<IgniteRel>> implement(Receiver rel) {
- return null;
+ @Override public Item implement(IgniteRel other) {
+ throw new AssertionError();
+ }
}
- @Override public Pair<Integer, List<IgniteRel>> implement(Sender rel) {
- assert parentId == -1;
+ public RelGraph convert(IgniteRel root) {
+ graph = new RelGraph();
- return null;
- }
+ Implementor implementor = new Implementor();
+ Deque<Item> stack = new ArrayDeque<>();
+ stack.push(new Item(-1, F.asList(root)));
- @Override public Pair<Integer, List<IgniteRel>> implement(IgniteExchange
rel) {
- throw new UnsupportedOperationException();
- }
+ while (!stack.isEmpty()) {
+ Item item = stack.pop();
+
+ curParent = item.parentId;
+
+ for (IgniteRel child : item.children) {
+ stack.push(child.implement(implementor));
+ }
+ }
- @Override public Pair<Integer, List<IgniteRel>> implement(IgniteRel other)
{
- throw new AssertionError();
+ return graph;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
index dfb92d3..e2ff02a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
@@ -76,7 +76,7 @@ public class RexToExpTranslator implements
RexVisitor<LogicalExpression> {
}
@Override public LogicalExpression visitDynamicParam(RexDynamicParam
dynamicParam) {
- throw new UnsupportedOperationException();
+ return new DynamicParamExpression(dynamicParam.getType(),
dynamicParam.getIndex());
}
@Override public LogicalExpression visitRangeRef(RexRangeRef rangeRef) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java
index 41820d6..e23b705 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java
@@ -16,21 +16,30 @@
package org.apache.ignite.internal.processors.query.calcite.serialize;
+import java.util.List;
+import org.apache.calcite.rel.RelNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
public class SenderNode extends RelGraphNode {
- private DistributionTrait targetDistr;
- private NodesMapping targetMapping;
+ private final DistributionTrait targetDistr;
+ private final NodesMapping targetMapping;
- public SenderNode(Sender sender) {
- super(sender.getTraitSet());
+ private SenderNode(DistributionTrait targetDistr, NodesMapping
targetMapping) {
+ this.targetDistr = targetDistr;
+ this.targetMapping = targetMapping;
+ }
+
+ public static SenderNode create(Sender rel) {
+ return new SenderNode(rel.targetDistribution(), rel.targetMapping());
+ }
- targetDistr = sender.targetDistribution();
- targetMapping = sender.targetMapping();
+ @Override public RelNode toRel(ConversionContext ctx, List<RelNode>
children) {
+ return Sender.create(F.first(children), targetDistr, targetMapping);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializedCorrelationId.java
similarity index 61%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializedCorrelationId.java
index 14a2019..cc2c378 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializedCorrelationId.java
@@ -17,16 +17,30 @@
package org.apache.ignite.internal.processors.query.calcite.serialize;
import java.io.Serializable;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.CorrelationId;
/**
*
*/
-public class RelGraphNode implements GraphNode, Serializable {
- protected RelTrait[] traits;
+public class SerializedCorrelationId implements Serializable {
+ private final int id;
+ private final String name;
- public RelGraphNode(RelTraitSet traits) {
- this.traits = traits.toArray(new RelTrait[0]);
+ public SerializedCorrelationId(int id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ public SerializedCorrelationId(CorrelationId corrId) {
+ id = corrId.getId();
+ name = corrId.getName();
+ }
+
+ public int id() {
+ return id;
+ }
+
+ public String name() {
+ return name;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializedTraitSet.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializedTraitSet.java
new file mode 100644
index 0000000..f35d933
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializedTraitSet.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+
+/**
+ *
+ */
+public class SerializedTraitSet implements Serializable {
+ private static final Byte IGNITE_CONVENTION = 0;
+
+ private final List<Serializable> traits;
+
+ public SerializedTraitSet(RelTraitSet traits) {
+ this.traits = translate(traits);
+ }
+
+ public RelTraitSet toTraitSet(RelOptCluster cluster) {
+ RelTraitSet traits = cluster.traitSet();
+
+ for (Serializable trait : this.traits) {
+ traits.replace(fromSerializable(trait));
+ }
+
+ return traits.simplify();
+ }
+
+ private List<Serializable> translate(List<RelTrait> traits) {
+ ArrayList<Serializable> res = new ArrayList<>(traits.size());
+ for (RelTrait trait : traits) {
+ res.add(toSerializable(trait));
+ }
+
+ return res;
+ }
+
+ private Serializable toSerializable(RelTrait trait) {
+ if (trait instanceof Serializable)
+ return (Serializable) trait;
+ if (trait == IgniteRel.IGNITE_CONVENTION)
+ return IGNITE_CONVENTION;
+
+ throw new AssertionError();
+ }
+
+ private RelTrait fromSerializable(Serializable trait) {
+ if (trait instanceof RelTrait)
+ return (RelTrait) trait;
+ if (IGNITE_CONVENTION.equals(trait))
+ return IgniteRel.IGNITE_CONVENTION;
+
+ throw new AssertionError();
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FieldType.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SimpleType.java
similarity index 81%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FieldType.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SimpleType.java
index e1e1b3a..54b7fb5 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FieldType.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SimpleType.java
@@ -24,22 +24,22 @@ import org.apache.calcite.sql.type.SqlTypeName;
/**
*
*/
-public class FieldType implements ExpressionType {
+public class SimpleType implements ExpDataType {
private final Class clazz;
private final SqlTypeName typeName;
private final int precision;
private final int scale;
- public static FieldType fromType(RelDataType type) {
+ public static SimpleType fromType(RelDataType type) {
assert !type.isStruct();
if (type instanceof RelDataTypeFactoryImpl.JavaType)
- return new FieldType(((RelDataTypeFactoryImpl.JavaType)
type).getJavaClass(), null, 0, 0);
+ return new SimpleType(((RelDataTypeFactoryImpl.JavaType)
type).getJavaClass(), null, 0, 0);
- return new FieldType(null, type.getSqlTypeName(), type.getPrecision(),
type.getScale());
+ return new SimpleType(null, type.getSqlTypeName(),
type.getPrecision(), type.getScale());
}
- private FieldType(Class clazz, SqlTypeName typeName, int precision, int
scale) {
+ private SimpleType(Class clazz, SqlTypeName typeName, int precision, int
scale) {
this.clazz = clazz;
this.typeName = typeName;
this.precision = precision;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java
index 45d5e44..e72b9dc 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java
@@ -24,22 +24,22 @@ import org.apache.calcite.rel.type.RelDataTypeField;
/**
*
*/
-public class StructType implements ExpressionType {
- private final LinkedHashMap<String, FieldType> fields;
+public class StructType implements ExpDataType {
+ private final LinkedHashMap<String, ExpDataType> fields;
- public static StructType fromType(RelDataType type) {
+ static StructType fromType(RelDataType type) {
assert type.isStruct();
- LinkedHashMap<String, FieldType> fields = new LinkedHashMap<>();
+ LinkedHashMap<String, ExpDataType> fields = new LinkedHashMap<>();
for (RelDataTypeField field : type.getFieldList()) {
- fields.put(field.getName(), FieldType.fromType(field.getType()));
+ fields.put(field.getName(), ExpDataType.fromType(field.getType()));
}
return new StructType(fields);
}
- private StructType(LinkedHashMap<String, FieldType> fields) {
+ private StructType(LinkedHashMap<String, ExpDataType> fields) {
this.fields = fields;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableScanNode.java
similarity index 56%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableScanNode.java
index 3c95b96..3604f17 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableScanNode.java
@@ -16,21 +16,25 @@
package org.apache.ignite.internal.processors.query.calcite.serialize;
-import org.apache.calcite.rel.type.RelDataType;
+import java.util.List;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
/**
*
*/
-public class LiteralExpression implements LogicalExpression {
- public final ExpressionType type;
- public final Comparable value;
+public class TableScanNode extends RelGraphNode {
+ private final List<String> tableName;
- public LiteralExpression(RelDataType type, Comparable value) {
- this.type = ExpressionType.fromType(type);
- this.value = value;
+ private TableScanNode(List<String> tableName) {
+ this.tableName = tableName;
}
- @Override public <T> T implement(ExpImplementor<T> implementor) {
- return implementor.implement(this);
+ public static TableScanNode create(IgniteTableScan rel) {
+ return new TableScanNode(rel.getTable().getQualifiedName());
+ }
+
+ @Override public RelNode toRel(ConversionContext ctx, List<RelNode>
children) {
+ return ctx.schema().getTableForMember(tableName).toRel(ctx);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index 8e9d9ed..55264d6 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.util.typedef.F;
*
*/
public class Fragment {
- public final RelNode rel;
+ private final RelNode rel;
private NodesMapping mapping;
private ImmutableIntList localInputs;
@@ -49,14 +49,34 @@ public class Fragment {
init(null, ctx, mq);
}
- public void init(Fragment parent, Context ctx, RelMetadataQuery mq) {
+ public RelNode root() {
+ return rel;
+ }
+
+ public NodesMapping mapping() {
+ return mapping;
+ }
+
+ public ImmutableIntList localInputs() {
+ return localInputs;
+ }
+
+ public ImmutableList<Fragment> remoteInputs() {
+ return remoteInputs;
+ }
+
+ public boolean isRemote() {
+ return rel instanceof Sender;
+ }
+
+ private void init(Fragment parent, Context ctx, RelMetadataQuery mq) {
FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(rel, mq);
remoteInputs = info.remoteInputs();
localInputs = info.localInputs();
if (info.mapping() == null)
- mapping = remote() ? registry(ctx).random(topologyVersion(ctx)) :
registry(ctx).local();
+ mapping = isRemote() ? registry(ctx).random(topologyVersion(ctx))
: registry(ctx).local();
else {
try {
mapping = info.mapping().deduplicate();
@@ -67,7 +87,7 @@ public class Fragment {
}
if (parent != null) {
- assert remote();
+ assert isRemote();
((Sender)rel).init(parent.mapping);
}
@@ -78,10 +98,6 @@ public class Fragment {
}
}
- private boolean remote() {
- return rel instanceof Sender;
- }
-
private LocationRegistry registry(Context ctx) {
return ctx.unwrap(LocationRegistry.class);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
index 535c3bb..7e69adf 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
@@ -16,20 +16,21 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
+import java.io.ObjectStreamException;
import java.util.List;
+import java.util.UUID;
import org.apache.calcite.plan.Context;
import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.cluster.ClusterNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
/**
*
*/
-class AllTargetsFactory extends AbstractDestinationFunctionFactory {
+final class AllTargetsFactory extends AbstractDestinationFunctionFactory {
static final DestinationFunctionFactory INSTANCE = new AllTargetsFactory();
@Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
- List<ClusterNode> nodes = m.nodes();
+ List<UUID> nodes = m.nodes();
return r -> nodes;
}
@@ -37,4 +38,8 @@ class AllTargetsFactory extends
AbstractDestinationFunctionFactory {
@Override public Object key() {
return "AllTargetsFactory";
}
+
+ private Object readResolve() throws ObjectStreamException {
+ return INSTANCE;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
index 7577767..3d4dc41 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
@@ -17,11 +17,11 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
import java.util.List;
-import org.apache.ignite.cluster.ClusterNode;
+import java.util.UUID;
/**
*
*/
public interface DestinationFunction {
- List<ClusterNode> destination(Object row);
+ List<UUID> destination(Object row);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index 756bf6b..0fe775a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -16,8 +16,11 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamException;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.Objects;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTrait;
@@ -29,13 +32,13 @@ import org.apache.calcite.util.ImmutableIntList;
*/
public final class DistributionTrait implements RelTrait, Serializable {
private DistributionType type;
- private int[] keys;
+ private ImmutableIntList keys;
private DestinationFunctionFactory functionFactory;
public DistributionTrait() {
}
- public DistributionTrait(DistributionType type, int[] keys,
DestinationFunctionFactory functionFactory) {
+ public DistributionTrait(DistributionType type, ImmutableIntList keys,
DestinationFunctionFactory functionFactory) {
this.type = type;
this.keys = keys;
this.functionFactory = functionFactory;
@@ -50,7 +53,7 @@ public final class DistributionTrait implements RelTrait,
Serializable {
}
public ImmutableIntList keys() {
- return ImmutableIntList.of(keys);
+ return keys;
}
@Override public void register(RelOptPlanner planner) {}
@@ -62,18 +65,18 @@ public final class DistributionTrait implements RelTrait,
Serializable {
if (o instanceof DistributionTrait) {
DistributionTrait that = (DistributionTrait) o;
- return type == that.type() && Arrays.equals(keys, that.keys);
+ return type == that.type() && Objects.equals(keys, that.keys);
}
return false;
}
@Override public int hashCode() {
- return Objects.hash(type, Arrays.hashCode(keys));
+ return Objects.hash(type, keys);
}
@Override public String toString() {
- return type + (type == DistributionType.HASH ? Arrays.toString(keys) :
"");
+ return type + (type == DistributionType.HASH ? String.valueOf(keys) :
"");
}
@Override public RelTraitDef getTraitDef() {
@@ -94,9 +97,25 @@ public final class DistributionTrait implements RelTrait,
Serializable {
if (type() == other.type())
return type() != DistributionType.HASH
- || (Arrays.equals(keys, other.keys)
+ || (Objects.equals(keys, other.keys)
&& Objects.equals(functionFactory, other.functionFactory));
return other.type() == DistributionType.RANDOM && type() ==
DistributionType.HASH;
}
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeObject(type);
+ out.writeObject(keys.toIntArray());
+ out.writeObject(functionFactory);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ type = (DistributionType) in.readObject();
+ keys = ImmutableIntList.of((int[])in.readObject());
+ functionFactory = (DestinationFunctionFactory) in.readObject();
+ }
+
+ private Object readResolve() throws ObjectStreamException {
+ return DistributionTraitDef.INSTANCE.canonize(this);
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
index c6a3eb5..379a707 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
@@ -16,11 +16,12 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
+import java.io.ObjectStreamException;
import java.util.List;
+import java.util.UUID;
import java.util.function.ToIntFunction;
import org.apache.calcite.plan.Context;
import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.cluster.ClusterNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -28,7 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
/**
*
*/
-class HashFunctionFactory extends AbstractDestinationFunctionFactory {
+final class HashFunctionFactory extends AbstractDestinationFunctionFactory {
static final DestinationFunctionFactory INSTANCE = new
HashFunctionFactory();
@Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
@@ -50,10 +51,10 @@ class HashFunctionFactory extends
AbstractDestinationFunctionFactory {
return hash;
};
- List<List<ClusterNode>> assignments = m.assignments();
+ List<List<UUID>> assignments = m.assignments();
if (U.assertionsEnabled()) {
- for (List<ClusterNode> assignment : assignments) {
+ for (List<UUID> assignment : assignments) {
assert F.isEmpty(assignment) || assignment.size() == 1;
}
}
@@ -64,4 +65,8 @@ class HashFunctionFactory extends
AbstractDestinationFunctionFactory {
@Override public Object key() {
return "HashFunctionFactory";
}
+
+ private Object readResolve() throws ObjectStreamException {
+ return INSTANCE;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index a95734a..6a9e76d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -23,8 +23,8 @@ import java.util.List;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableIntList;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import org.apache.ignite.internal.util.typedef.internal.U;
import static
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
@@ -32,11 +32,10 @@ import static
org.apache.ignite.internal.processors.query.calcite.trait.Distribu
*
*/
public class IgniteDistributions {
- private static final int[] EMPTY_KEYS = new int[0];
- private static final DistributionTrait BROADCAST = new
DistributionTrait(DistributionType.BROADCAST, EMPTY_KEYS,
AllTargetsFactory.INSTANCE);
- private static final DistributionTrait SINGLE = new
DistributionTrait(DistributionType.SINGLE, EMPTY_KEYS,
SingleTargetFactory.INSTANCE);
- private static final DistributionTrait RANDOM = new
DistributionTrait(DistributionType.RANDOM, EMPTY_KEYS,
RandomTargetFactory.INSTANCE);
- private static final DistributionTrait ANY = new
DistributionTrait(DistributionType.ANY, EMPTY_KEYS, NoOpFactory.INSTANCE);
+ private static final DistributionTrait BROADCAST = new
DistributionTrait(DistributionType.BROADCAST, ImmutableIntList.of(),
AllTargetsFactory.INSTANCE);
+ private static final DistributionTrait SINGLE = new
DistributionTrait(DistributionType.SINGLE, ImmutableIntList.of(),
SingleTargetFactory.INSTANCE);
+ private static final DistributionTrait RANDOM = new
DistributionTrait(DistributionType.RANDOM, ImmutableIntList.of(),
RandomTargetFactory.INSTANCE);
+ private static final DistributionTrait ANY = new
DistributionTrait(DistributionType.ANY, ImmutableIntList.of(),
NoOpFactory.INSTANCE);
public static DistributionTrait any() {
return ANY;
@@ -55,11 +54,11 @@ public class IgniteDistributions {
}
public static DistributionTrait hash(List<Integer> keys) {
- return new DistributionTrait(HASH, U.toIntArray(keys),
HashFunctionFactory.INSTANCE);
+ return new DistributionTrait(HASH, ImmutableIntList.copyOf(keys),
HashFunctionFactory.INSTANCE);
}
public static DistributionTrait hash(List<Integer> keys,
DestinationFunctionFactory factory) {
- return new DistributionTrait(HASH, U.toIntArray(keys), factory);
+ return new DistributionTrait(HASH, ImmutableIntList.copyOf(keys),
factory);
}
public static List<DistributionTrait> deriveDistributions(RelNode rel,
RelMetadataQuery mq) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
index 5dff5a9..1988671 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
@@ -16,6 +16,7 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
+import java.io.ObjectStreamException;
import org.apache.calcite.plan.Context;
import org.apache.calcite.util.ImmutableIntList;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
@@ -23,7 +24,7 @@ import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
/**
*
*/
-class NoOpFactory extends AbstractDestinationFunctionFactory {
+final class NoOpFactory extends AbstractDestinationFunctionFactory {
static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
@Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
@@ -33,4 +34,8 @@ class NoOpFactory extends AbstractDestinationFunctionFactory {
@Override public Object key() {
return "NoOpFactory";
}
+
+ private Object readResolve() throws ObjectStreamException {
+ return INSTANCE;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
index a4b27b7..8de55e5 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
@@ -16,22 +16,23 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
+import java.io.ObjectStreamException;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.calcite.plan.Context;
import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.cluster.ClusterNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
/**
*
*/
-class RandomTargetFactory extends AbstractDestinationFunctionFactory {
+final class RandomTargetFactory extends AbstractDestinationFunctionFactory {
static final DestinationFunctionFactory INSTANCE = new
RandomTargetFactory();
@Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
- List<ClusterNode> nodes = m.nodes();
+ List<UUID> nodes = m.nodes();
return r ->
Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())));
}
@@ -40,4 +41,7 @@ class RandomTargetFactory extends
AbstractDestinationFunctionFactory {
return "RandomTargetFactory";
}
+ private Object readResolve() throws ObjectStreamException {
+ return INSTANCE;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
index 4d21a60..1de22fd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
@@ -16,20 +16,23 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
+import java.io.ObjectStreamException;
+import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import org.apache.calcite.plan.Context;
import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.cluster.ClusterNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
-class SingleTargetFactory extends AbstractDestinationFunctionFactory {
+final class SingleTargetFactory extends AbstractDestinationFunctionFactory {
static final DestinationFunctionFactory INSTANCE = new
SingleTargetFactory();
@Override public DestinationFunction create(Context ctx, NodesMapping m,
ImmutableIntList k) {
- List<ClusterNode> nodes = m.nodes().subList(0, 1);
+ List<UUID> nodes = Collections.singletonList(F.first(m.nodes()));
return r -> nodes;
}
@@ -37,4 +40,8 @@ class SingleTargetFactory extends
AbstractDestinationFunctionFactory {
@Override public Object key() {
return "SingleTargetFactory";
}
+
+ private Object readResolve() throws ObjectStreamException {
+ return INSTANCE;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index aab2302..22486e8 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.calcite.plan.Context;
@@ -44,7 +45,7 @@ import
org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.type.RowType;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -138,97 +139,56 @@ public final class Commons {
};
}
- public static int[] intersect(int[] left, int[] right) {
- if (F.isEmpty(left) || F.isEmpty(right))
- return EMPTY;
-
- int[] res = null;
-
- int i = 0, j = 0, k = 0, size = Math.min(left.length, right.length);
-
- while (i < left.length && j < right.length) {
- if (left[i] < right[j])
- i++;
- else if (right[j] < left[i])
- j++;
- else {
- if (res == null)
- res = new int[size];
-
- res[k++] = left[i];
-
- i++;
- j++;
- }
- }
-
- if (k == 0)
- return EMPTY;
-
- return res.length == k ? res : Arrays.copyOf(res, k);
- }
-
public static <T> List<T> intersect(List<T> left, List<T> right) {
if (F.isEmpty(left) || F.isEmpty(right))
return Collections.emptyList();
-
- HashSet<T> set = new HashSet<>(right);
-
- return
left.stream().filter(set::contains).collect(Collectors.toList());
+ else if (left.size() > right.size())
+ return intersect0(right, left);
+ else
+ return intersect0(left, right);
}
- public static <T> List<T> union(List<T> left, List<T> right) {
- Set<T> set = U.newHashSet(left.size() + right.size());
+ public static <T> List<T> intersect0(List<T> left, List<T> right) {
+ List<T> res = new ArrayList<>(Math.min(left.size(), right.size()));
+ HashSet<T> set = new HashSet<>(left);
- set.addAll(left);
- set.addAll(right);
+ for (T t : right) {
+ if (set.contains(t))
+ res.add(t);
+ }
- return new ArrayList<>(set);
+ return res;
}
- public static int[] union(int[] left, int[] right) {
- if (F.isEmpty(left) && F.isEmpty(right))
- return EMPTY;
-
- int min = Math.min(left.length, right.length);
- int max = left.length + right.length;
- int expected = Math.max(min, (int) (max * 1.5));
-
- int[] res = new int[U.ceilPow2(expected)];
-
- int i = 0, j = 0, k = 0;
+ public static <T> List<T> concat(List<T> col, T... elements) {
+ ArrayList<T> res = new ArrayList<>(col.size() + elements.length);
- while (i < left.length && j < right.length) {
- res = ensureSize(res, k + 1);
+ res.addAll(col);
+ res.addAll(Arrays.asList(elements));
- if (left[i] < right[j])
- res[k++] = left[i++];
- else if (right[j] < left[i])
- res[k++] = right[j++];
- else {
- res[k++] = left[i];
+ return res;
+ }
- i++;
- j++;
- }
- }
+ @SuppressWarnings("unchecked")
+ public static <T> List<T> cast(List<?> src) {
+ return (List)src;
+ }
- if (k == 0)
- return EMPTY;
+ public static <T,R> List<R> transform(@NotNull List<T> src, @NotNull
Function<T,R> mapFun) {
+ List<R> list = new ArrayList<>(src.size());
- return res.length == k ? res : Arrays.copyOf(res, k);
- }
+ for (T t : src)
+ list.add(mapFun.apply(t));
- private static int[] ensureSize(int[] array, int size) {
- return size < array.length ? array : Arrays.copyOf(array,
U.ceilPow2(size));
+ return list;
}
- public static <T> List<T> concat(List<T> col, T... elements) {
- ArrayList<T> res = new ArrayList<>(col.size() + elements.length);
+ public static <T,R> Set<R> transform(@NotNull Set<T> src, @NotNull
Function<T,R> mapFun) {
+ Set<R> set = new HashSet<>(src.size());
- res.addAll(col);
- Collections.addAll(res, elements);
+ for (T t : src)
+ set.add(mapFun.apply(t));
- return res;
+ return set;
}
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index bc7d179..c6463dd 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -35,7 +35,6 @@ import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
@@ -55,6 +54,8 @@ import
org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import
org.apache.ignite.internal.processors.query.calcite.serialize.LogicalExpression;
+import org.apache.ignite.internal.processors.query.calcite.serialize.RelGraph;
+import
org.apache.ignite.internal.processors.query.calcite.serialize.RelToGraphConverter;
import
org.apache.ignite.internal.processors.query.calcite.serialize.RexToExpTranslator;
import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
@@ -66,10 +67,10 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.marshaller.MarshallerContextTestImpl;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.systemview.jmx.JmxSystemViewExporterSpi;
-import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.BeforeClass;
@@ -85,7 +86,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
private static SchemaPlus schema;
private static TestRegistry registry;
- private static List<ClusterNode> nodes;
+ private static List<UUID> nodes;
@BeforeClass
public static void setupClass() {
@@ -134,7 +135,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
nodes = new ArrayList<>(4);
for (int i = 0; i < 4; i++) {
- nodes.add(new GridTestNode(UUID.randomUUID()));
+ nodes.add(UUID.randomUUID());
}
registry = new TestRegistry();
@@ -362,6 +363,82 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
}
@Test
+ public void testPlanSerializationDeserialization() throws Exception {
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.id = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+ assertNotNull(ctx);
+
+ RelTraitDef[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE
+ };
+
+ byte[] convertedBytes;
+
+ try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+ assertNotNull(planner);
+
+ Query query = ctx.unwrap(Query.class);
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(query.sql());
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ RelNode rel = planner.convert(sqlNode);
+
+ // Transformation chain
+ rel = planner.transform(PlannerType.HEP,
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteRel.IGNITE_CONVENTION)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL,
rel, desired);
+
+ assertNotNull(rel);
+
+ QueryPlan plan = planner.plan(rel);
+
+ assertNotNull(plan);
+
+ assertTrue(plan.fragments().size() == 2);
+
+ plan.init(ctx);
+
+ RelGraph graph = new RelToGraphConverter().convert((IgniteRel)
plan.fragments().get(1).root());
+
+ convertedBytes = new JdkMarshaller().marshal(graph);
+
+ assertNotNull(convertedBytes);
+ }
+
+ try (IgnitePlanner planner = proc.planner(traitDefs, ctx)) {
+ assertNotNull(planner);
+
+ RelGraph graph = new JdkMarshaller().unmarshal(convertedBytes,
getClass().getClassLoader());
+
+ assertNotNull(graph);
+
+ RelNode rel = planner.convert(graph);
+
+ assertNotNull(rel);
+ }
+ }
+
+ @Test
public void testSplitterCollocatedPartitionedPartitioned() throws
Exception {
String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
"FROM PUBLIC.Developer d JOIN (" +