This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new e797a6e pending
e797a6e is described below
commit e797a6eba85738b9c8b9d8b2fe500e590a776a2f
Author: Igor Seliverstov <[email protected]>
AuthorDate: Mon Oct 28 21:06:03 2019 +0300
pending
---
.../query/calcite/CalciteQueryProcessor.java | 5 +-
.../query/calcite/exchange/Receiver.java | 7 +-
.../processors/query/calcite/exchange/Sender.java | 7 +-
.../calcite/metadata/IgniteMdDistribution.java | 28 +--
.../metadata/IgniteMdSourceDistribution.java | 94 ++++------
.../query/calcite/metadata/IgniteMetadata.java | 20 +--
.../query/calcite/metadata/RelMetadataQueryEx.java | 88 +++++++++
.../processors/query/calcite/rel/IgniteRel.java | 4 +-
.../query/calcite/rel/IgniteVisitor.java | 24 ++-
.../calcite/rel/logical/IgniteLogicalExchange.java | 5 +
.../calcite/rel/logical/IgniteLogicalFilter.java | 5 +
.../calcite/rel/logical/IgniteLogicalJoin.java | 5 +
.../calcite/rel/logical/IgniteLogicalProject.java | 5 +
.../rel/logical/IgniteLogicalTableScan.java | 5 +
.../query/calcite/schema/IgniteTable.java | 13 +-
.../calcite/splitter/PartitionsDistribution.java | 196 +++++++++++++++++++++
.../PartitionsDistributionRegistry.java} | 8 +-
.../query/calcite/splitter/SourceDistribution.java | 4 +-
.../query/calcite/splitter/SplitTask.java | 9 +
.../query/calcite/splitter/TaskSplitter.java | 84 ++++++++-
.../query/calcite/util/IgniteMethod.java | 8 +-
.../query/calcite/CalciteQueryProcessorTest.java | 37 +++-
22 files changed, 543 insertions(+), 118 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index a024027..d1ad130 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -145,12 +145,11 @@ public class CalciteQueryProcessor implements QueryEngine
{
* @return Query execution context.
*/
Context context(@NotNull Context ctx, String query, Object[] params) { //
Package private visibility for tests.
- return Contexts.chain(ctx,
+ return Contexts.chain(ctx, config.getContext(),
Contexts.of(
new Query(query, params),
contextParameter(ctx, SchemaPlus.class, schemaHolder::schema),
- contextParameter(ctx, AffinityTopologyVersion.class,
this::readyAffinityVersion)),
- config.getContext());
+ contextParameter(ctx, AffinityTopologyVersion.class,
this::readyAffinityVersion)));
}
private QueryExecution prepare(Context ctx) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
index 8d2ee1c..993f55b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
@@ -22,6 +22,7 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
/**
*
@@ -32,7 +33,7 @@ public class Receiver extends SingleRel implements IgniteRel {
* @param traits Trait set.
* @param sender Corresponding sender.
*/
- protected Receiver(RelOptCluster cluster, RelTraitSet traits, Sender
sender) {
+ public Receiver(RelOptCluster cluster, RelTraitSet traits, Sender sender) {
super(cluster, traits, sender);
}
@@ -45,4 +46,8 @@ public class Receiver extends SingleRel implements IgniteRel {
@Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new Receiver(getCluster(), traitSet, (Sender) sole(inputs));
}
+
+ @Override public <T> T accept(IgniteVisitor<T> visitor) {
+ return visitor.visitReceiver(this);
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
index 5049250..63cf8f7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
@@ -21,6 +21,7 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
/**
*
@@ -33,7 +34,11 @@ public class Sender extends SingleRel implements IgniteRel {
* @param traits Trait set.
* @param input Input relational expression
*/
- protected Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input)
{
+ public Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
super(cluster, traits, input);
}
+
+ @Override public <T> T accept(IgniteVisitor<T> visitor) {
+ return visitor.visitSender(this);
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
index ded1a8d..298e905 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
@@ -47,36 +47,36 @@ import static
org.apache.ignite.internal.processors.query.calcite.trait.Distribu
/**
*
*/
-public class IgniteMdDistribution implements
MetadataHandler<IgniteMetadata.TraitDistribution> {
+public class IgniteMdDistribution implements
MetadataHandler<IgniteMetadata.DistributionTraitMetadata> {
public static final RelMetadataProvider SOURCE =
-
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.DISTRIBUTION.method(),
new IgniteMdDistribution());
+
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.DISTRIBUTION_TRAIT.method(),
new IgniteMdDistribution());
- @Override public MetadataDef<IgniteMetadata.TraitDistribution> getDef() {
- return IgniteMetadata.TraitDistribution.DEF;
+ @Override public MetadataDef<IgniteMetadata.DistributionTraitMetadata>
getDef() {
+ return IgniteMetadata.DistributionTraitMetadata.DEF;
}
- public DistributionTrait distribution(RelNode rel, RelMetadataQuery mq) {
+ public DistributionTrait getDistributionTrait(RelNode rel,
RelMetadataQuery mq) {
return DistributionTraitDef.INSTANCE.getDefault();
}
- public DistributionTrait distribution(Filter filter, RelMetadataQuery mq) {
+ public DistributionTrait getDistributionTrait(Filter filter,
RelMetadataQuery mq) {
return filter(mq, filter.getInput(), filter.getCondition());
}
- public DistributionTrait distribution(Project project, RelMetadataQuery
mq) {
+ public DistributionTrait getDistributionTrait(Project project,
RelMetadataQuery mq) {
return project(mq, project.getInput(), project.getProjects());
}
- public DistributionTrait distribution(Join join, RelMetadataQuery mq) {
+ public DistributionTrait getDistributionTrait(Join join, RelMetadataQuery
mq) {
return join(mq, join.getLeft(), join.getRight(), join.getCondition());
}
- public DistributionTrait distribution(RelSubset rel, RelMetadataQuery mq) {
+ public DistributionTrait getDistributionTrait(RelSubset rel,
RelMetadataQuery mq) {
return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
}
public static DistributionTrait project(RelMetadataQuery mq, RelNode
input, List<RexNode> projects) {
- DistributionTrait trait = distribution_(input, mq);
+ DistributionTrait trait = distribution(input, mq);
if (trait.type() == HASH) {
ImmutableIntList keys = trait.keys();
@@ -115,14 +115,14 @@ public class IgniteMdDistribution implements
MetadataHandler<IgniteMetadata.Trai
}
public static DistributionTrait filter(RelMetadataQuery mq, RelNode input,
RexNode condition) {
- return distribution_(input, mq);
+ return distribution(input, mq);
}
public static DistributionTrait join(RelMetadataQuery mq, RelNode left,
RelNode right, RexNode condition) {
- return distribution_(left, mq);
+ return distribution(left, mq);
}
- public static DistributionTrait distribution_(RelNode rel,
RelMetadataQuery mq) {
- return rel.metadata(IgniteMetadata.TraitDistribution.class,
mq).distribution();
+ public static DistributionTrait distribution(RelNode rel, RelMetadataQuery
mq) {
+ return RelMetadataQueryEx.wrap(mq).getDistributionTrait(rel);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
index 1c80b96..255e06c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
@@ -16,12 +16,7 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelNode;
@@ -31,78 +26,80 @@ import org.apache.calcite.rel.metadata.MetadataHandler;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TaskDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.SourceDistributionMetadata;
import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+import
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution;
import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
-public class IgniteMdSourceDistribution implements
MetadataHandler<TaskDistribution> {
+public class IgniteMdSourceDistribution implements
MetadataHandler<SourceDistributionMetadata> {
public static final RelMetadataProvider SOURCE =
-
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.TASK_DISTRIBUTION.method(),
new IgniteMdSourceDistribution());
+
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.SOURCE_DISTRIBUTION.method(),
new IgniteMdSourceDistribution());
- @Override public MetadataDef<TaskDistribution> getDef() {
- return TaskDistribution.DEF;
+ @Override public MetadataDef<SourceDistributionMetadata> getDef() {
+ return SourceDistributionMetadata.DEF;
}
- public SourceDistribution distribution(RelNode rel, RelMetadataQuery mq) {
+ public SourceDistribution getSourceDistribution(RelNode rel,
RelMetadataQuery mq) {
throw new AssertionError();
}
- public SourceDistribution distribution(RelSubset rel, RelMetadataQuery mq)
{
+ public SourceDistribution getSourceDistribution(RelSubset rel,
RelMetadataQuery mq) {
throw new AssertionError();
}
- public SourceDistribution distribution(SingleRel rel, RelMetadataQuery mq)
{
- return distribution_(rel.getInput(), mq);
+ public SourceDistribution getSourceDistribution(SingleRel rel,
RelMetadataQuery mq) {
+ return distribution(rel.getInput(), mq);
}
- public SourceDistribution distribution(BiRel rel, RelMetadataQuery mq) {
- return merge(distribution_(rel.getLeft(), mq),
distribution_(rel.getRight(), mq));
+ public SourceDistribution getSourceDistribution(BiRel rel,
RelMetadataQuery mq) {
+ mq = RelMetadataQueryEx.wrap(mq);
+
+ return merge(distribution(rel.getLeft(), mq),
distribution(rel.getRight(), mq));
}
- public SourceDistribution distribution(Receiver rel, RelMetadataQuery mq) {
+ public SourceDistribution getSourceDistribution(Receiver rel,
RelMetadataQuery mq) {
SourceDistribution res = new SourceDistribution();
- res.remoteInputs.add(rel);
+ res.remoteInputs = F.asList(rel);
return res;
}
- public SourceDistribution distribution(IgniteLogicalTableScan rel,
RelMetadataQuery mq) {
+ public SourceDistribution getSourceDistribution(IgniteLogicalTableScan
rel, RelMetadataQuery mq) {
return rel.tableDistribution();
}
- public static SourceDistribution distribution_(RelNode rel,
RelMetadataQuery mq) {
- return rel.metadata(TaskDistribution.class, mq).distribution();
+ public static SourceDistribution distribution(RelNode rel,
RelMetadataQuery mq) {
+ return RelMetadataQueryEx.wrap(mq).getSourceDistribution(rel);
}
private static SourceDistribution merge(SourceDistribution left,
SourceDistribution right) {
SourceDistribution res = new SourceDistribution();
- res.remoteInputs = merge(left.remoteInputs, right.remoteInputs);
res.partitionMapping = merge(left.partitionMapping,
right.partitionMapping);
+ res.remoteInputs = merge(left.remoteInputs, right.remoteInputs);
res.localInputs = merge(left.localInputs, right.localInputs);
return res;
}
- private static <T> List<T> merge(List<T> left, List<T> right) {
+ private static PartitionsDistribution merge(PartitionsDistribution left,
PartitionsDistribution right) {
if (left == null)
return right;
+ if (right == null)
+ return left;
- if (right != null)
- left.addAll(right);
-
- return left;
+ return left.mergeWith(right);
}
- private static GridIntList merge(GridIntList left, GridIntList right) {
+ private static <T> List<T> merge(List<T> left, List<T> right) {
if (left == null)
return right;
@@ -112,44 +109,13 @@ public class IgniteMdSourceDistribution implements
MetadataHandler<TaskDistribut
return left;
}
- private static Map<ClusterNode, int[]> merge(Map<ClusterNode, int[]> left,
Map<ClusterNode, int[]> right) {
+ private static GridIntList merge(GridIntList left, GridIntList right) {
if (left == null)
return right;
- if (right == null)
- return left;
-
- Map<ClusterNode, int[]> res = new HashMap<>(Math.min(left.size(),
right.size()));
-
- Set<ClusterNode> keys = new HashSet<>(left.keySet());
-
- keys.retainAll(right.keySet());
-
- for (ClusterNode node : keys) {
- int[] leftParts = left.get(node);
- int[] rightParts = right.get(node);
-
- int[] nodeParts = new int[Math.min(leftParts.length,
rightParts.length)];
-
- int i = 0, j = 0, k = 0;
-
- while (i < leftParts.length && j < rightParts.length) {
- if (leftParts[i] < rightParts[j])
- i++;
- else if (rightParts[j] < leftParts[i])
- j++;
- else {
- nodeParts[k++] = leftParts[i];
-
- i++;
- j++;
- }
- }
-
- if (k > 0)
- res.put(node, k < nodeParts.length ? Arrays.copyOf(nodeParts,
k) : nodeParts);
- }
+ if (right != null)
+ left.addAll(right);
- return res;
+ return left;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index 3b5445e..090ef53 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -40,27 +40,27 @@ public class IgniteMetadata {
IgniteMdSourceDistribution.SOURCE,
DefaultRelMetadataProvider.INSTANCE));
- public interface TraitDistribution extends Metadata {
- MetadataDef<TraitDistribution> DEF =
MetadataDef.of(TraitDistribution.class, TraitDistribution.Handler.class,
IgniteMethod.DISTRIBUTION.method());
+ public interface DistributionTraitMetadata extends Metadata {
+ MetadataDef<DistributionTraitMetadata> DEF =
MetadataDef.of(DistributionTraitMetadata.class,
DistributionTraitMetadata.Handler.class,
IgniteMethod.DISTRIBUTION_TRAIT.method());
/** Determines how the rows are distributed. */
- DistributionTrait distribution();
+ DistributionTrait getDistributionTrait();
/** Handler API. */
- interface Handler extends MetadataHandler<TraitDistribution> {
- DistributionTrait distribution(RelNode r, RelMetadataQuery mq);
+ interface Handler extends MetadataHandler<DistributionTraitMetadata> {
+ DistributionTrait getDistributionTrait(RelNode r, RelMetadataQuery
mq);
}
}
- public interface TaskDistribution extends Metadata {
- MetadataDef<TaskDistribution> DEF =
MetadataDef.of(TaskDistribution.class, TaskDistribution.Handler.class,
IgniteMethod.TASK_DISTRIBUTION.method());
+ public interface SourceDistributionMetadata extends Metadata {
+ MetadataDef<SourceDistributionMetadata> DEF =
MetadataDef.of(SourceDistributionMetadata.class,
SourceDistributionMetadata.Handler.class,
IgniteMethod.SOURCE_DISTRIBUTION.method());
/** Determines how the rows are distributed. */
- SourceDistribution distribution();
+ SourceDistribution getSourceDistribution();
/** Handler API. */
- interface Handler extends MetadataHandler<TaskDistribution> {
- SourceDistribution distribution(RelNode r, RelMetadataQuery mq);
+ interface Handler extends MetadataHandler<SourceDistributionMetadata> {
+ SourceDistribution getSourceDistribution(RelNode r,
RelMetadataQuery mq);
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
new file mode 100644
index 0000000..8c10a51
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+public class RelMetadataQueryEx extends RelMetadataQuery {
+ private static final RelMetadataQueryEx PROTO = new RelMetadataQueryEx();
+ private static final JaninoRelMetadataProvider PROVIDER =
JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER);
+
+ private IgniteMetadata.DistributionTraitMetadata.Handler
distributionTraitHandler;
+ private IgniteMetadata.SourceDistributionMetadata.Handler
sourceDistributionHandler;
+
+ private RelMetadataQueryEx() {
+ super(JaninoRelMetadataProvider.DEFAULT, RelMetadataQuery.EMPTY);
+
+ distributionTraitHandler =
initialHandler(IgniteMetadata.DistributionTraitMetadata.Handler.class);
+ sourceDistributionHandler =
initialHandler(IgniteMetadata.SourceDistributionMetadata.Handler.class);
+ }
+
+ protected RelMetadataQueryEx(JaninoRelMetadataProvider metadataProvider,
RelMetadataQueryEx prototype) {
+ super(metadataProvider, prototype);
+
+ distributionTraitHandler = prototype.distributionTraitHandler;
+ sourceDistributionHandler = prototype.sourceDistributionHandler;
+ }
+
+ protected RelMetadataQueryEx(JaninoRelMetadataProvider metadataProvider,
RelMetadataQuery parent) {
+ super(metadataProvider, parent);
+
+ distributionTraitHandler = PROTO.distributionTraitHandler;
+ sourceDistributionHandler = PROTO.sourceDistributionHandler;
+ }
+
+ @SuppressWarnings("MethodOverridesStaticMethodOfSuperclass")
+ public static RelMetadataQueryEx instance() {
+ return new RelMetadataQueryEx(PROVIDER, PROTO);
+ }
+
+ public static RelMetadataQueryEx wrap(@NotNull RelMetadataQuery mq) {
+ if (mq.getClass() == RelMetadataQueryEx.class)
+ return (RelMetadataQueryEx) mq;
+
+ return new RelMetadataQueryEx(PROVIDER, mq);
+ }
+
+ public SourceDistribution getSourceDistribution(RelNode rel) {
+ for (;;) {
+ try {
+ return sourceDistributionHandler.getSourceDistribution(rel,
this);
+ } catch (JaninoRelMetadataProvider.NoHandler e) {
+ sourceDistributionHandler = revise(e.relClass,
IgniteMetadata.SourceDistributionMetadata.DEF);
+ }
+ }
+ }
+
+ public DistributionTrait getDistributionTrait(RelNode rel) {
+ for (;;) {
+ try {
+ return distributionTraitHandler.getDistributionTrait(rel,
this);
+ } catch (JaninoRelMetadataProvider.NoHandler e) {
+ distributionTraitHandler = revise(e.relClass,
IgniteMetadata.DistributionTraitMetadata.DEF);
+ }
+ }
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
index 995ad70..f15bf81 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
@@ -32,7 +32,5 @@ public interface IgniteRel extends RelNode {
}
};
- default void visit(IgniteVisitor visitor) {
- visitor.visit(this);
- }
+ <T> T accept(IgniteVisitor<T> visitor);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
index a89c034..542696d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java
@@ -17,9 +17,29 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
+import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalExchange;
+import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalFilter;
+import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin;
+import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalProject;
+import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+
/**
*
*/
-public interface IgniteVisitor {
- public void visit(IgniteRel rel);
+public interface IgniteVisitor<T> {
+ T visitExchange(IgniteLogicalExchange exchange);
+
+ T visitFilter(IgniteLogicalFilter filter);
+
+ T visitJoin(IgniteLogicalJoin join);
+
+ T visitProject(IgniteLogicalProject project);
+
+ T visitTableScan(IgniteLogicalTableScan tableScan);
+
+ T visitReceiver(Receiver receiver);
+
+ T visitSender(Sender sender);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
index ed9bee6..3a577e9 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.Util;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
@@ -62,4 +63,8 @@ public final class IgniteLogicalExchange extends SingleRel
implements IgniteRel
@Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new IgniteLogicalExchange(getCluster(), traitSet, sole(inputs));
}
+
+ @Override public <T> T accept(IgniteVisitor<T> visitor) {
+ return visitor.visitExchange(this);
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
index 0bc0fbf..34bbe3d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rex.RexNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
public final class IgniteLogicalFilter extends Filter implements IgniteRel {
private final Set<CorrelationId> variablesSet;
@@ -59,4 +60,8 @@ public final class IgniteLogicalFilter extends Filter
implements IgniteRel {
return new IgniteLogicalFilter(filter.getCluster(), traits, input,
filter.getCondition(), filter.getVariablesSet());
}
+
+ @Override public <T> T accept(IgniteVisitor<T> visitor) {
+ return visitor.visitFilter(this);
+ }
}
\ No newline at end of file
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
index 215b676..95699c7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rex.RexNode;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
public final class IgniteLogicalJoin extends Join implements IgniteRel {
private final boolean semiJoinDone;
@@ -58,4 +59,8 @@ public final class IgniteLogicalJoin extends Join implements
IgniteRel {
@Override public boolean isSemiJoinDone() {
return semiJoinDone;
}
+
+ @Override public <T> T accept(IgniteVisitor<T> visitor) {
+ return visitor.visitJoin(this);
+ }
}
\ No newline at end of file
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
index 89a992b..56c0f50 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
public final class IgniteLogicalProject extends Project implements IgniteRel {
public IgniteLogicalProject(
@@ -49,4 +50,8 @@ public final class IgniteLogicalProject extends Project
implements IgniteRel {
return new IgniteLogicalProject(project.getCluster(), traits, input,
project.getProjects(), project.getRowType());
}
+
+ @Override public <T> T accept(IgniteVisitor<T> visitor) {
+ return visitor.visitProject(this);
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
index 8698573..ca2ae90 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
@@ -23,6 +23,7 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
@@ -40,4 +41,8 @@ public final class IgniteLogicalTableScan extends TableScan
implements IgniteRel
public SourceDistribution tableDistribution() {
return
getTable().unwrap(IgniteTable.class).tableDistribution(getCluster().getPlanner().getContext());
}
+
+ @Override public <T> T accept(IgniteVisitor<T> visitor) {
+ return visitor.visitTableScan(this);
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 3cbda12..bccf570 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -27,9 +27,9 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import
org.apache.ignite.internal.processors.query.calcite.exchange.DistributionRegistry;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+import
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry;
import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
@@ -78,13 +78,16 @@ public class IgniteTable extends AbstractTable implements
TranslatableTable {
public SourceDistribution tableDistribution(Context context) {
SourceDistribution res = new SourceDistribution();
- res.localInputs = new GridIntList();
- res.localInputs.add(CU.cacheId(cacheName));
+ GridIntList localInputs = new GridIntList();
- DistributionRegistry registry =
context.unwrap(DistributionRegistry.class);
+ localInputs.add(CU.cacheId(cacheName));
+
+ res.localInputs = localInputs;
+
+ PartitionsDistributionRegistry registry =
context.unwrap(PartitionsDistributionRegistry.class);
AffinityTopologyVersion topVer =
context.unwrap(AffinityTopologyVersion.class);
- res.partitionMapping =
registry.partitionMapping(CU.cacheId(cacheName), topVer);
+ res.partitionMapping =
registry.partitionsDistribution(CU.cacheId(cacheName), topVer);
return res;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
new file mode 100644
index 0000000..2a7707c
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.splitter;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+public class PartitionsDistribution {
+ public static final int[] ALL_PARTS = new int[0];
+ private static final int[] EMPTY = new int[0];
+
+ public boolean excessive;
+ public int parts;
+ public int[] nodes;
+ public int[][] nodeParts;
+
+ public PartitionsDistribution mergeWith(PartitionsDistribution other) {
+ if (parts != 0 && other.parts != 0 && parts != other.parts)
+ throw new IllegalStateException("Non-collocated query fragment.");
+
+ int[] nodes0 = null;
+ int[][] nodeParts0 = null;
+
+ int i = 0, j = 0, k = 0;
+
+ while (i < nodes0.length && j < other.nodes.length) {
+ if (nodes[i] < other.nodes[j])
+ i++;
+ else if (other.nodes[j] < nodes[i])
+ j++;
+ else {
+ int[] mergedParts = merge(nodeParts[i], other.nodeParts[j]);
+
+ if (mergedParts.length > 0) {
+ if (nodes0 == null) {
+ int len = Math.min(nodes.length, other.nodes.length);
+
+ nodes0 = new int[len];
+ nodeParts0 = new int[len][];
+ }
+
+ nodes0[k] = nodes[i];
+ nodeParts0[k] = mergedParts;
+
+ k++;
+ }
+
+ i++;
+ j++;
+ }
+ }
+
+ PartitionsDistribution res = new PartitionsDistribution();
+
+ res.excessive = excessive && other.excessive;
+ res.parts = Math.max(parts, other.parts);
+ res.nodes = nodes0.length == k ? nodes0 : Arrays.copyOf(nodes0, k);
+ res.nodeParts = nodeParts0.length == k ? nodeParts0 :
Arrays.copyOf(nodeParts0, k);
+
+ check(res);
+
+ return res;
+ }
+
+ private void check(PartitionsDistribution res) {
+ if (res.parts == 0)
+ return; // Only receivers and/or replicated caches in task subtree.
+
+ BitSet check = new BitSet(res.parts);
+
+ int checkedParts = 0;
+
+ for (int[] nodePart : res.nodeParts) {
+ for (int p : nodePart) {
+ if (!check.get(p)) {
+ check.set(p);
+ checkedParts++;
+ }
+ }
+ }
+
+ if (checkedParts < res.parts)
+ throw new IllegalStateException("Failed to collocate used
caches.");
+ }
+
+ private int[] merge(int[] left, int[] right) {
+ if (left == ALL_PARTS)
+ return right;
+ if (right == ALL_PARTS)
+ return left;
+
+ int[] nodeParts = null;
+
+ int i = 0, j = 0, k = 0;
+
+ while (i < left.length && j < right.length) {
+ if (left[i] < right[j])
+ i++;
+ else if (right[j] < left[i])
+ j++;
+ else {
+ if (nodeParts == null)
+ nodeParts = new int[Math.min(left.length, right.length)];
+
+ nodeParts[k++] = left[i];
+
+ i++;
+ j++;
+ }
+ }
+
+ if (k == 0)
+ return EMPTY;
+
+ return nodeParts.length == k ? nodeParts : Arrays.copyOf(nodeParts, k);
+ }
+
+ public PartitionsDistribution deduplicate() {
+ if (!excessive)
+ return this;
+
+ Map<Integer, Integer> map = new HashMap<>();
+
+ int idx = 0; int[] idxs = new int[nodeParts.length];
+ while (map.size() < parts) {
+ int[] nodePart = nodeParts[idx];
+
+ int j = idxs[idx];
+
+ while (j < nodePart.length) {
+ if (map.putIfAbsent(nodePart[j], nodes[idx]) == null || j + 1
== nodePart.length) {
+ idxs[idx] = j + 1;
+
+ break;
+ }
+
+ j++;
+ }
+
+ idx = (idx + 1) % nodes.length;
+ }
+
+ int[] nodes0 = new int[nodes.length]; int[][] nodeParts0 = new
int[nodes.length][];
+
+ int k = 0;
+
+ for (int i = 0; i < nodes.length; i++) {
+ int j = 0;
+
+ int[] nodePart0 = null;
+
+ for (int p : nodeParts[i]) {
+ if (map.get(p) == nodes[i]) {
+ if (nodePart0 == null)
+ nodePart0 = new int[nodeParts[i].length];
+
+ nodePart0[j++] = p;
+ }
+ }
+
+ if (nodePart0 != null) {
+ nodes0[k] = nodes[i];
+ nodeParts0[k] = nodePart0.length == j ? nodePart0 :
Arrays.copyOf(nodePart0, j);
+
+ k++;
+ }
+ }
+
+ PartitionsDistribution res = new PartitionsDistribution();
+
+ res.parts = parts;
+ res.nodes = nodes0.length == k ? nodes0 : Arrays.copyOf(nodes0, k);
+ res.nodeParts = nodeParts0.length == k ? nodeParts0 :
Arrays.copyOf(nodeParts0, k);
+
+ return res;
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
similarity index 74%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
index a2b0942..f234ae7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
@@ -14,15 +14,13 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.exchange;
+package org.apache.ignite.internal.processors.query.calcite.splitter;
-import java.util.Map;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
/**
*
*/
-public interface DistributionRegistry {
- Map<ClusterNode, int[]> partitionMapping(int cacheId,
AffinityTopologyVersion topVer);
+public interface PartitionsDistributionRegistry {
+ PartitionsDistribution partitionsDistribution(int cacheId,
AffinityTopologyVersion topVer);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
index eded34e..02a5514 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.query.calcite.splitter;
import java.util.List;
-import java.util.Map;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
import org.apache.ignite.internal.util.GridIntList;
@@ -26,7 +24,7 @@ import org.apache.ignite.internal.util.GridIntList;
*
*/
public class SourceDistribution {
- public Map<ClusterNode, int[]> partitionMapping; // partition filter for
unstable topology
+ public PartitionsDistribution partitionMapping; // partitions mapping.
public List<Receiver> remoteInputs; // remote inputs to notify particular
senders about final task distribution
public GridIntList localInputs; // involved caches, used for partitions
reservation
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
index f405939..1fbe086 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
@@ -28,5 +28,14 @@ public class SplitTask {
public SplitTask(RelNode root, SourceDistribution distribution) {
this.distribution = distribution;
this.root = root;
+
+ init();
+ }
+
+ private void init() {
+ PartitionsDistribution mapping = distribution.partitionMapping;
+
+ if (mapping != null && mapping.excessive)
+ distribution.partitionMapping = mapping.deduplicate();
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
index 4ca6716..22527b3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
@@ -16,11 +16,91 @@
package org.apache.ignite.internal.processors.query.calcite.splitter;
-import org.apache.calcite.rel.RelShuttleImpl;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdSourceDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
+import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalExchange;
+import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalFilter;
+import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin;
+import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalProject;
+import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
/**
*
*/
-public class TaskSplitter extends RelShuttleImpl {
+public class TaskSplitter implements IgniteVisitor<IgniteRel> {
+ /** */
+ private List<IgniteRel> roots;
+ public List<SplitTask> go(IgniteRel root) {
+ roots = new ArrayList<>();
+
+ roots.add(root.accept(this));
+
+ ArrayList<SplitTask> splitTasks = new ArrayList<>(roots.size());
+
+ RelMetadataQuery mq = RelMetadataQueryEx.instance();
+
+ for (IgniteRel igniteRel : roots) {
+ splitTasks.add(new SplitTask(igniteRel,
IgniteMdSourceDistribution.distribution(igniteRel, mq)));
+ }
+
+ return splitTasks;
+ }
+
+ @Override public IgniteRel visitExchange(IgniteLogicalExchange exchange) {
+ RelOptCluster cluster = exchange.getCluster();
+ RelNode input = exchange.getInput();
+
+ Sender sender = new Sender(cluster, input.getTraitSet(),
visitChildren(input));
+
+ roots.add(sender);
+
+ return new Receiver(cluster, exchange.getTraitSet(), sender);
+ }
+
+ @Override public IgniteRel visitFilter(IgniteLogicalFilter filter) {
+ return visitChildren(filter);
+ }
+
+ @Override public IgniteRel visitJoin(IgniteLogicalJoin join) {
+ return visitChildren(join);
+ }
+
+ @Override public IgniteRel visitProject(IgniteLogicalProject project) {
+ return visitChildren(project);
+ }
+
+ @Override public IgniteRel visitTableScan(IgniteLogicalTableScan
tableScan) {
+ return tableScan;
+ }
+
+ @Override public IgniteRel visitReceiver(Receiver receiver) {
+ throw new AssertionError("An attempt to split an already split task.");
+ }
+
+ @Override public IgniteRel visitSender(Sender sender) {
+ throw new AssertionError("An attempt to split an already split task.");
+ }
+
+ private IgniteRel visitChildren(RelNode parent) {
+ List<RelNode> inputs = parent.getInputs();
+
+ for (int i = 0; i < inputs.size(); i++) {
+ IgniteRel input = (IgniteRel) inputs.get(i);
+ IgniteRel rel = input.accept(this);
+
+ if (rel != input)
+ parent.replaceInput(i, rel);
+ }
+ return (IgniteRel) parent;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
index b6a6703..63a5c2b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
@@ -19,15 +19,15 @@ package
org.apache.ignite.internal.processors.query.calcite.util;
import java.lang.reflect.Method;
import org.apache.calcite.linq4j.tree.Types;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TaskDistribution;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TraitDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.DistributionTraitMetadata;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.SourceDistributionMetadata;
/**
*
*/
public enum IgniteMethod {
- DISTRIBUTION(TraitDistribution.class, "distribution"),
- TASK_DISTRIBUTION(TaskDistribution.class, "distribution");
+ DISTRIBUTION_TRAIT(DistributionTraitMetadata.class,
"getDistributionTrait"),
+ SOURCE_DISTRIBUTION(SourceDistributionMetadata.class,
"getSourceDistribution");
private final Method method;
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 028bb9c..884b9ca 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite;
+import java.util.List;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
@@ -37,8 +38,13 @@ import
org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
+import
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry;
+import org.apache.ignite.internal.processors.query.calcite.splitter.SplitTask;
+import
org.apache.ignite.internal.processors.query.calcite.splitter.TaskSplitter;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -54,6 +60,9 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
private static CalciteQueryProcessor proc;
private static SchemaPlus schema;
+ private static PartitionsDistribution developerDistribution;
+ private static PartitionsDistribution projectDistribution;
+
@BeforeClass
public static void setupClass() {
proc = new CalciteQueryProcessor();
@@ -71,6 +80,12 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
.field("cityId", Integer.class)
.build()));
+ developerDistribution = new PartitionsDistribution();
+
+ developerDistribution.parts = 5;
+ developerDistribution.nodes = new int[]{0,1,2};
+ developerDistribution.nodeParts = new int[][]{{1,2},{3,4},{5}};
+
publicSchema.addTable("Project", new IgniteTable("Project", "Project",
RowType.builder()
.keyField("id", Integer.class, true)
@@ -78,6 +93,13 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
.field("ver", Integer.class)
.build()));
+ projectDistribution = new PartitionsDistribution();
+
+ projectDistribution.excessive = true;
+ projectDistribution.parts = 5;
+ projectDistribution.nodes = new int[]{0,1,2};
+ projectDistribution.nodeParts = new int[][]{{1,2,3,5},{2,3,4},{1,4,5}};
+
publicSchema.addTable("Country", new IgniteTable("Country", "Country",
RowType.builder()
.keyField("id", Integer.class, true)
@@ -106,7 +128,16 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
"ON d.projectId = p.id0 " +
"WHERE (d.projectId + 1) > ?";
- Context ctx = proc.context(Contexts.of(schema,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+ PartitionsDistributionRegistry registry = (id, top) -> {
+ if (id == CU.cacheId("Developer"))
+ return developerDistribution;
+ if (id == CU.cacheId("Project"))
+ return projectDistribution;
+
+ throw new AssertionError("Unexpected cache id:" + id);
+ };
+
+ Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
assertNotNull(ctx);
@@ -149,5 +180,9 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
}
assertNotNull(relRoot);
+
+ List<SplitTask> fragments = new TaskSplitter().go((IgniteRel)
relRoot.rel);
+
+ assertNotNull(fragments);
}
}
\ No newline at end of file