This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new 1995d3b pending
1995d3b is described below
commit 1995d3b417cdd350abc4e8020bb61b847bef9228
Author: Igor Seliverstov <[email protected]>
AuthorDate: Fri Nov 1 15:27:02 2019 +0300
pending
---
.../query/calcite/exchange/Receiver.java | 6 +-
.../processors/query/calcite/exchange/Sender.java | 10 +-
.../metadata/IgniteMdSourceDistribution.java | 6 +-
.../processors/query/calcite/prepare/Query.java | 20 ++
.../processors/query/calcite/rel/CloneContext.java | 50 +++
.../processors/query/calcite/rel/IgniteRel.java | 2 +-
.../query/calcite/rel/IgniteVisitor.java | 45 ---
.../calcite/rel/logical/IgniteLogicalExchange.java | 6 +-
.../calcite/rel/logical/IgniteLogicalFilter.java | 10 +-
.../calcite/rel/logical/IgniteLogicalJoin.java | 10 +-
.../calcite/rel/logical/IgniteLogicalProject.java | 10 +-
.../rel/logical/IgniteLogicalTableScan.java | 15 +-
.../query/calcite/schema/IgniteTable.java | 10 +-
.../query/calcite/splitter/Fragment.java | 22 +-
.../calcite/splitter/PartitionsDistribution.java | 2 +-
.../splitter/PartitionsDistributionRegistry.java | 6 +-
.../query/calcite/splitter/QueryPlan.java | 15 +
.../query/calcite/splitter/Splitter.java | 66 +---
.../processors/query/calcite/util/Commons.java | 9 +
.../query/calcite/CalciteQueryProcessorTest.java | 385 ++++++++++++++++++++-
20 files changed, 556 insertions(+), 149 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
index fe35b52..dda7840 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
@@ -22,8 +22,8 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
/**
@@ -51,8 +51,8 @@ public class Receiver extends SingleRel implements IgniteRel {
return new Receiver(getCluster(), traitSet, (Sender) sole(inputs));
}
- @Override public <T> T accept(IgniteVisitor<T> visitor) {
- return visitor.visitReceiver(this);
+ @Override public IgniteRel clone(CloneContext ctx) {
+ return new Receiver(ctx.getCluster(), getTraitSet(),
ctx.clone(getInput()));
}
public void init(SourceDistribution targetDistribution, RelMetadataQueryEx
mq) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
index f9bd762..7c1cd83 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
@@ -23,8 +23,8 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
@@ -49,12 +49,12 @@ public class Sender extends SingleRel implements IgniteRel {
super(cluster, traits, input);
}
- @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new Sender(getCluster(), traitSet, sole(inputs));
+ @Override public IgniteRel clone(CloneContext ctx) {
+ return new Sender(ctx.getCluster(), getTraitSet(),
ctx.clone(getInput()));
}
- @Override public <T> T accept(IgniteVisitor<T> visitor) {
- return visitor.visitSender(this);
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new Sender(getCluster(), traitSet, sole(inputs));
}
public void init(SourceDistribution targetDistribution) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
index d44a57b..6cb5bcf 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
@@ -16,6 +16,7 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
+import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.BiRel;
@@ -34,7 +35,6 @@ import
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDi
import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
import org.apache.ignite.internal.util.GridIntList;
-import org.apache.ignite.internal.util.typedef.F;
/**
*
@@ -72,7 +72,9 @@ public class IgniteMdSourceDistribution implements
MetadataHandler<SourceDistrib
public SourceDistribution getSourceDistribution(Receiver rel,
RelMetadataQuery mq) {
SourceDistribution res = new SourceDistribution();
- res.remoteInputs = F.asList(rel);
+ ArrayList<Receiver> remoteInputs = new ArrayList<>();
+ remoteInputs.add(rel);
+ res.remoteInputs = remoteInputs;
return res;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Query.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Query.java
index 92b974e..d18c82c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Query.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Query.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.internal.util.typedef.F;
@@ -50,4 +51,23 @@ public class Query {
}
return res;
}
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (!(o instanceof Query))
+ return false;
+
+ Query query = (Query) o;
+
+ if (!sql.equals(query.sql))
+ return false;
+ return Arrays.equals(params, query.params);
+ }
+
+ @Override public int hashCode() {
+ int result = sql.hashCode();
+ result = 31 * result + Arrays.hashCode(params);
+ return result;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/CloneContext.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/CloneContext.java
new file mode 100644
index 0000000..5c7046e
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/CloneContext.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.IdentityHashMap;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelNode;
+
+/**
+ *
+ */
+public final class CloneContext {
+ private final RelOptCluster cluster;
+ private final IdentityHashMap<IgniteRel, IgniteRel> mapping = new
IdentityHashMap<>();
+
+ public CloneContext(RelOptCluster cluster) {
+ this.cluster = cluster;
+ }
+
+ public RelOptCluster getCluster() {
+ return cluster;
+ }
+
+ public <T extends IgniteRel> T clone(RelNode src) {
+ try {
+ return (T) mapping.computeIfAbsent((IgniteRel) src, this::clone0);
+ }
+ catch (ClassCastException e) {
+ throw new IllegalStateException("Unexpected node type: " +
src.getClass());
+ }
+ }
+
+ private IgniteRel clone0(IgniteRel src) {
+ return src.clone(this);
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
index f15bf81..d2a47da 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
@@ -32,5 +32,5 @@ public interface IgniteRel extends RelNode {
}
};
- <T> T accept(IgniteVisitor<T> visitor);
+ IgniteRel clone(CloneContext ctx);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
deleted file mode 100644
index 542696d..0000000
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rel;
-
-import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
-import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
-import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalExchange;
-import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalFilter;
-import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin;
-import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalProject;
-import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
-
-/**
- *
- */
-public interface IgniteVisitor<T> {
- T visitExchange(IgniteLogicalExchange exchange);
-
- T visitFilter(IgniteLogicalFilter filter);
-
- T visitJoin(IgniteLogicalJoin join);
-
- T visitProject(IgniteLogicalProject project);
-
- T visitTableScan(IgniteLogicalTableScan tableScan);
-
- T visitReceiver(Receiver receiver);
-
- T visitSender(Sender sender);
-}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
index 32b8b09..dd2ad05 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
@@ -26,8 +26,8 @@ import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
/**
@@ -56,8 +56,8 @@ public final class IgniteLogicalExchange extends SingleRel
implements IgniteRel
return new IgniteLogicalExchange(getCluster(), traitSet, sole(inputs));
}
- @Override public <T> T accept(IgniteVisitor<T> visitor) {
- return visitor.visitExchange(this);
+ @Override public IgniteRel clone(CloneContext ctx) {
+ throw new UnsupportedOperationException();
}
@Override public RelWriter explainTerms(RelWriter pw) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
index 0da0d04..2bf7b6c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
@@ -27,8 +27,8 @@ import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rex.RexNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
public final class IgniteLogicalFilter extends Filter implements IgniteRel {
@@ -44,6 +44,10 @@ public final class IgniteLogicalFilter extends Filter
implements IgniteRel {
return variablesSet;
}
+ @Override public IgniteRel clone(CloneContext ctx) {
+ return new IgniteLogicalFilter(ctx.getCluster(), getTraitSet(),
ctx.clone(getInput()), getCondition(), variablesSet);
+ }
+
@Override public IgniteLogicalFilter copy(RelTraitSet traitSet, RelNode
input,
RexNode condition) {
return new IgniteLogicalFilter(getCluster(), traitSet, input, condition,
variablesSet);
@@ -61,8 +65,4 @@ public final class IgniteLogicalFilter extends Filter
implements IgniteRel {
return new IgniteLogicalFilter(filter.getCluster(), traits, input,
filter.getCondition(), filter.getVariablesSet());
}
-
- @Override public <T> T accept(IgniteVisitor<T> visitor) {
- return visitor.visitFilter(this);
- }
}
\ No newline at end of file
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
index 95699c7..df5bb2e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
@@ -25,8 +25,8 @@ import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
public final class IgniteLogicalJoin extends Join implements IgniteRel {
private final boolean semiJoinDone;
@@ -44,6 +44,10 @@ public final class IgniteLogicalJoin extends Join implements
IgniteRel {
this.semiJoinDone = semiJoinDone;
}
+ @Override public IgniteRel clone(CloneContext ctx) {
+ return new IgniteLogicalJoin(ctx.getCluster(), getTraitSet(),
ctx.clone(getLeft()), ctx.clone(getRight()), getCondition(), variablesSet,
getJoinType(), semiJoinDone);
+ }
+
@Override public IgniteLogicalJoin copy(RelTraitSet traitSet, RexNode
conditionExpr,
RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone)
{
return new IgniteLogicalJoin(getCluster(), traitSet, left, right,
conditionExpr, variablesSet, joinType, semiJoinDone);
@@ -59,8 +63,4 @@ public final class IgniteLogicalJoin extends Join implements
IgniteRel {
@Override public boolean isSemiJoinDone() {
return semiJoinDone;
}
-
- @Override public <T> T accept(IgniteVisitor<T> visitor) {
- return visitor.visitJoin(this);
- }
}
\ No newline at end of file
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
index e67d371..41a6f2a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
@@ -25,8 +25,8 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
public final class IgniteLogicalProject extends Project implements IgniteRel {
@@ -39,6 +39,10 @@ public final class IgniteLogicalProject extends Project
implements IgniteRel {
super(cluster, traitSet, input, projects, rowType);
}
+ @Override public IgniteRel clone(CloneContext ctx) {
+ return new IgniteLogicalProject(ctx.getCluster(), getTraitSet(),
ctx.clone(getInput()), getProjects(), getRowType());
+ }
+
@Override public IgniteLogicalProject copy(RelTraitSet traitSet, RelNode
input,
List<RexNode> projects, RelDataType rowType) {
return new IgniteLogicalProject(getCluster(), traitSet, input, projects,
rowType);
@@ -51,8 +55,4 @@ public final class IgniteLogicalProject extends Project
implements IgniteRel {
return new IgniteLogicalProject(project.getCluster(), traits, input,
project.getProjects(), project.getRowType());
}
-
- @Override public <T> T accept(IgniteVisitor<T> visitor) {
- return visitor.visitProject(this);
- }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
index fb4cf58..87ccb20 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
@@ -22,16 +22,21 @@ import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
public final class IgniteLogicalTableScan extends TableScan implements
IgniteRel {
public IgniteLogicalTableScan(RelOptCluster cluster, RelTraitSet traitSet,
RelOptTable table) {
super(cluster, traitSet, table);
}
+ @Override public IgniteRel clone(CloneContext ctx) {
+ return new IgniteLogicalTableScan(ctx.getCluster(), getTraitSet(),
getTable());
+ }
+
@Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
assert inputs.isEmpty();
@@ -39,11 +44,7 @@ public final class IgniteLogicalTableScan extends TableScan
implements IgniteRel
}
public SourceDistribution tableDistribution() {
- return getTable().unwrap(IgniteTable.class)
- .sourceDistribution(getCluster().getPlanner().getContext());
- }
-
- @Override public <T> T accept(IgniteVisitor<T> visitor) {
- return visitor.visitTableScan(this);
+ boolean local = !getTraitSet().isEnabled(DistributionTraitDef.INSTANCE);
+ return
getTable().unwrap(IgniteTable.class).sourceDistribution(getCluster().getPlanner().getContext(),
local);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index bf8b752..6d25e19 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -80,7 +80,7 @@ public class IgniteTable extends AbstractTable implements
TranslatableTable {
return IgniteDistributions.hash(rowType.distributionKeys(),
IgniteDistributions.noOpFunction()); // TODO
}
- public SourceDistribution sourceDistribution(Context context) {
+ public SourceDistribution sourceDistribution(Context context, boolean
local) {
int cacheId = CU.cacheId(cacheName);
SourceDistribution res = new SourceDistribution();
@@ -89,9 +89,11 @@ public class IgniteTable extends AbstractTable implements
TranslatableTable {
localInputs.add(cacheId);
res.localInputs = localInputs;
- PartitionsDistributionRegistry registry =
context.unwrap(PartitionsDistributionRegistry.class);
- AffinityTopologyVersion topVer =
context.unwrap(AffinityTopologyVersion.class);
- res.partitionMapping = registry.get(cacheId, topVer);
+ if (!local) {
+ PartitionsDistributionRegistry registry =
context.unwrap(PartitionsDistributionRegistry.class);
+ AffinityTopologyVersion topVer =
context.unwrap(AffinityTopologyVersion.class);
+ res.partitionMapping = registry.distributed(cacheId, topVer);
+ }
return res;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index f9ee3d1..ffc6b48 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -28,34 +28,38 @@ import org.apache.ignite.internal.util.typedef.F;
*
*/
public class Fragment {
- public final RelNode root;
+ public final RelNode rel;
public SourceDistribution distribution;
- public Fragment(RelNode root) {
- this.root = root;
+ public Fragment(RelNode rel) {
+ this.rel = rel;
}
public void init(Context ctx) {
RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
- distribution = mq.getSourceDistribution(root);
+ distribution = mq.getSourceDistribution(rel);
PartitionsDistribution mapping = distribution.partitionMapping;
- if (mapping == null)
- distribution.partitionMapping = isRootFragment() ?
registry(ctx).single() : registry(ctx).random(topologyVersion(ctx));
+ if (mapping == null) {
+ if (!isRoot())
+ distribution.partitionMapping =
registry(ctx).random(topologyVersion(ctx));
+ else if (!F.isEmpty(distribution.remoteInputs))
+ distribution.partitionMapping = registry(ctx).single();
+ }
else if (mapping.excessive)
distribution.partitionMapping = mapping.deduplicate();
-
+
if (!F.isEmpty(distribution.remoteInputs)) {
for (Receiver input : distribution.remoteInputs)
input.init(distribution, mq);
}
}
- private boolean isRootFragment() {
- return !(root instanceof Sender);
+ private boolean isRoot() {
+ return !(rel instanceof Sender);
}
private PartitionsDistributionRegistry registry(Context ctx) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
index 72ed1fa..fb2728a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
@@ -50,7 +50,7 @@ public class PartitionsDistribution {
else {
int[] mergedParts = merge(nodeParts[i], other.nodeParts[j]);
- if (mergedParts.length > 0) {
+ if (mergedParts == ALL_PARTS || mergedParts.length > 0) {
if (nodes0 == null) {
int len = Math.min(nodes.length, other.nodes.length);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
index 568ce65..345957e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
@@ -22,7 +22,7 @@ import
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
*
*/
public interface PartitionsDistributionRegistry {
- PartitionsDistribution get(int cacheId, AffinityTopologyVersion topVer);
- PartitionsDistribution random(AffinityTopologyVersion topVer);
- PartitionsDistribution single();
+ PartitionsDistribution single(); // returns local node with single
partition
+ PartitionsDistribution random(AffinityTopologyVersion topVer); // returns
random distribution, partitions count depends on nodes count
+ PartitionsDistribution distributed(int cacheId, AffinityTopologyVersion
topVer); // returns cache distribution
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index 84985e6..d23091d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -18,6 +18,7 @@ package
org.apache.ignite.internal.processors.query.calcite.splitter;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.Context;
+import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
/**
*
@@ -34,4 +35,18 @@ public class QueryPlan {
fragment.init(ctx);
}
}
+
+ public ImmutableList<Fragment> fragments() {
+ return fragments;
+ }
+
+ public QueryPlan clone(CloneContext ctx) {
+ ImmutableList.Builder<Fragment> b = ImmutableList.builder();
+
+ for (Fragment f : fragments) {
+ b.add(new Fragment(ctx.clone(f.rel)));
+ }
+
+ return new QueryPlan(b.build());
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index 5220d74..fb1c730 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -17,24 +17,19 @@
package org.apache.ignite.internal.processors.query.calcite.splitter;
import com.google.common.collect.ImmutableList;
-import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalExchange;
-import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalFilter;
-import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin;
-import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalProject;
-import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
/**
*
*/
-public class Splitter implements IgniteVisitor<IgniteRel> {
+public class Splitter extends RelShuttleImpl {
private ImmutableList.Builder<Fragment> b;
public QueryPlan go(IgniteRel root) {
@@ -43,53 +38,24 @@ public class Splitter implements IgniteVisitor<IgniteRel> {
return new QueryPlan(b.add(new Fragment(root.accept(this))).build());
}
- @Override public IgniteRel visitExchange(IgniteLogicalExchange exchange) {
- RelOptCluster cluster = exchange.getCluster();
- RelTraitSet traitSet = exchange.getTraitSet();
+ @Override public RelNode visit(RelNode rel) {
+ if (!(rel instanceof IgniteRel))
+ throw new AssertionError("Unexpected node: " + rel);
+ else if (rel instanceof Sender || rel instanceof Receiver)
+ throw new AssertionError("An attempt to split an already split
task.");
+ else if (rel instanceof IgniteLogicalExchange) {
+ IgniteLogicalExchange exchange = (IgniteLogicalExchange) rel;
- IgniteRel input = visitChildren(exchange.getInput());
+ RelOptCluster cluster = exchange.getCluster();
+ RelTraitSet traitSet = exchange.getTraitSet();
- Sender sender = new Sender(cluster, traitSet, input);
+ Sender sender = new Sender(cluster, traitSet,
visit(exchange.getInput()));
- b.add(new Fragment(sender));
+ b.add(new Fragment(sender));
- return new Receiver(cluster, traitSet, sender);
- }
-
- @Override public IgniteRel visitFilter(IgniteLogicalFilter filter) {
- return visitChildren(filter);
- }
-
- @Override public IgniteRel visitJoin(IgniteLogicalJoin join) {
- return visitChildren(join);
- }
-
- @Override public IgniteRel visitProject(IgniteLogicalProject project) {
- return visitChildren(project);
- }
-
- @Override public IgniteRel visitTableScan(IgniteLogicalTableScan
tableScan) {
- return tableScan;
- }
-
- @Override public IgniteRel visitReceiver(Receiver receiver) {
- throw new AssertionError("An attempt to split an already split task.");
- }
-
- @Override public IgniteRel visitSender(Sender sender) {
- throw new AssertionError("An attempt to split an already split task.");
- }
-
- private IgniteRel visitChildren(RelNode parent) {
- List<RelNode> inputs = parent.getInputs();
-
- for (int i = 0; i < inputs.size(); i++) {
- IgniteRel input = (IgniteRel) inputs.get(i);
- IgniteRel rel = input.accept(this);
-
- if (rel != input)
- parent.replaceInput(i, rel);
+ return new Receiver(cluster, traitSet, sender);
}
- return (IgniteRel) parent;
+
+ return super.visit(rel);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 739d411..fc4a6eb 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -60,6 +60,15 @@ public final class Commons {
return paramSrc.get();
}
+ public static <T> T contextParam(Context ctx, Class<T> paramType,
Supplier<T> paramSrc) {
+ T param = ctx.unwrap(paramType);
+
+ if (param != null)
+ return param;
+
+ return paramSrc.get();
+ }
+
/** */
public static RowType rowType(GridQueryTypeDescriptor desc) {
RowType.Builder b = RowType.builder();
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index d9ebd5b..8806868 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -62,6 +62,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
private static PartitionsDistribution projectDistribution;
private static PartitionsDistribution randomDistribution;
private static PartitionsDistribution singleDistribution;
+
private static PartitionsDistributionRegistry registry;
@BeforeClass
@@ -132,7 +133,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
singleDistribution.nodeParts = new int[][]{{1}};
registry = new PartitionsDistributionRegistry() {
- @Override public PartitionsDistribution get(int cacheId,
AffinityTopologyVersion topVer) {
+ @Override public PartitionsDistribution distributed(int cacheId,
AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
return developerDistribution;
if (cacheId == CU.cacheId("Project"))
@@ -165,6 +166,322 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
assertNotNull(ctx);
RelTraitDef[] traitDefs = {
+ ConventionTraitDef.INSTANCE
+ };
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+ assertNotNull(planner);
+
+ Query query = ctx.unwrap(Query.class);
+
+ assertNotNull(query);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(query.sql());
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+ }
+
+ assertNotNull(relRoot.rel);
+
+
+ }
+
+ @Test
+ public void testHepPlaner() throws Exception {
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.projectId = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+ assertNotNull(ctx);
+
+ RelTraitDef[] traitDefs = {
+ ConventionTraitDef.INSTANCE
+ };
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+ assertNotNull(planner);
+
+ Query query = ctx.unwrap(Query.class);
+
+ assertNotNull(query);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(query.sql());
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerType.HEP,
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot.rel);
+ }
+
+ @Test
+ public void testVolcanoPlanerDistributed() throws Exception {
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.projectId = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+ assertNotNull(ctx);
+
+ RelTraitDef[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE
+ };
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+ assertNotNull(planner);
+
+ Query query = ctx.unwrap(Query.class);
+
+ assertNotNull(query);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(query.sql());
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteRel.LOGICAL_CONVENTION)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL,
rel, desired);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot.rel);
+ }
+
+ @Test
+ public void testVolcanoPlanerLocal() throws Exception {
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.projectId = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+ assertNotNull(ctx);
+
+ RelTraitDef[] traitDefs = {
+ ConventionTraitDef.INSTANCE
+ };
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+ assertNotNull(planner);
+
+ Query query = ctx.unwrap(Query.class);
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(query.sql());
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteRel.LOGICAL_CONVENTION)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL,
rel, desired);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot.rel);
+ }
+
+ @Test
+ public void testSplitterLocal() throws Exception {
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.projectId = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+ assertNotNull(ctx);
+
+ RelTraitDef[] traitDefs = {
+ ConventionTraitDef.INSTANCE
+ };
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+ assertNotNull(planner);
+
+ Query query = ctx.unwrap(Query.class);
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(query.sql());
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerType.HEP,
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteRel.LOGICAL_CONVENTION)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL,
rel, desired);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot);
+
+ QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
+
+ assertNotNull(plan);
+ }
+
+ @Test
+ public void testSplitterCollocated() throws Exception {
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.id = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+ assertNotNull(ctx);
+
+ RelTraitDef[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE
+ };
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+ assertNotNull(planner);
+
+ Query query = ctx.unwrap(Query.class);
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(query.sql());
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerType.HEP,
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteRel.LOGICAL_CONVENTION)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL,
rel, desired);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot);
+
+ QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
+
+ assertNotNull(plan);
+
+ assertTrue(plan.fragments().size() == 2);
+ }
+
+ @Test
+ public void testSplitterPartiallyCollocated() throws Exception {
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.projectId = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+ assertNotNull(ctx);
+
+ RelTraitDef[] traitDefs = {
DistributionTraitDef.INSTANCE,
ConventionTraitDef.INSTANCE
};
@@ -211,5 +528,71 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
plan.init(ctx);
assertNotNull(plan);
+
+ assertTrue(plan.fragments().size() == 3);
+ }
+
+ @Test
+ public void testSplitterNonCollocated() throws Exception {
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.projectId = p.ver0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+ assertNotNull(ctx);
+
+ RelTraitDef[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE
+ };
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+ assertNotNull(planner);
+
+ Query query = ctx.unwrap(Query.class);
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(query.sql());
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerType.HEP,
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteRel.LOGICAL_CONVENTION)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL,
rel, desired);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot);
+
+ QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
+
+ assertNotNull(plan);
+
+ assertTrue(plan.fragments().size() == 4);
}
}
\ No newline at end of file