This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch ignite-12248 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push: new e797a6e pending e797a6e is described below commit e797a6eba85738b9c8b9d8b2fe500e590a776a2f Author: Igor Seliverstov <gvvinbl...@gmail.com> AuthorDate: Mon Oct 28 21:06:03 2019 +0300 pending --- .../query/calcite/CalciteQueryProcessor.java | 5 +- .../query/calcite/exchange/Receiver.java | 7 +- .../processors/query/calcite/exchange/Sender.java | 7 +- .../calcite/metadata/IgniteMdDistribution.java | 28 +-- .../metadata/IgniteMdSourceDistribution.java | 94 ++++------ .../query/calcite/metadata/IgniteMetadata.java | 20 +-- .../query/calcite/metadata/RelMetadataQueryEx.java | 88 +++++++++ .../processors/query/calcite/rel/IgniteRel.java | 4 +- .../query/calcite/rel/IgniteVisitor.java | 24 ++- .../calcite/rel/logical/IgniteLogicalExchange.java | 5 + .../calcite/rel/logical/IgniteLogicalFilter.java | 5 + .../calcite/rel/logical/IgniteLogicalJoin.java | 5 + .../calcite/rel/logical/IgniteLogicalProject.java | 5 + .../rel/logical/IgniteLogicalTableScan.java | 5 + .../query/calcite/schema/IgniteTable.java | 13 +- .../calcite/splitter/PartitionsDistribution.java | 196 +++++++++++++++++++++ .../PartitionsDistributionRegistry.java} | 8 +- .../query/calcite/splitter/SourceDistribution.java | 4 +- .../query/calcite/splitter/SplitTask.java | 9 + .../query/calcite/splitter/TaskSplitter.java | 84 ++++++++- .../query/calcite/util/IgniteMethod.java | 8 +- .../query/calcite/CalciteQueryProcessorTest.java | 37 +++- 22 files changed, 543 insertions(+), 118 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index a024027..d1ad130 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -145,12 +145,11 @@ public class CalciteQueryProcessor implements QueryEngine { * @return Query execution context. */ Context context(@NotNull Context ctx, String query, Object[] params) { // Package private visibility for tests. - return Contexts.chain(ctx, + return Contexts.chain(ctx, config.getContext(), Contexts.of( new Query(query, params), contextParameter(ctx, SchemaPlus.class, schemaHolder::schema), - contextParameter(ctx, AffinityTopologyVersion.class, this::readyAffinityVersion)), - config.getContext()); + contextParameter(ctx, AffinityTopologyVersion.class, this::readyAffinityVersion))); } private QueryExecution prepare(Context ctx) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java index 8d2ee1c..993f55b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java @@ -22,6 +22,7 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.SingleRel; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; /** * @@ -32,7 +33,7 @@ public class Receiver extends SingleRel implements IgniteRel { * @param traits Trait set. * @param sender Corresponding sender. */ - protected Receiver(RelOptCluster cluster, RelTraitSet traits, Sender sender) { + public Receiver(RelOptCluster cluster, RelTraitSet traits, Sender sender) { super(cluster, traits, sender); } @@ -45,4 +46,8 @@ public class Receiver extends SingleRel implements IgniteRel { @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { return new Receiver(getCluster(), traitSet, (Sender) sole(inputs)); } + + @Override public <T> T accept(IgniteVisitor<T> visitor) { + return visitor.visitReceiver(this); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java index 5049250..63cf8f7 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java @@ -21,6 +21,7 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.SingleRel; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; /** * @@ -33,7 +34,11 @@ public class Sender extends SingleRel implements IgniteRel { * @param traits Trait set. * @param input Input relational expression */ - protected Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input) { + public Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input) { super(cluster, traits, input); } + + @Override public <T> T accept(IgniteVisitor<T> visitor) { + return visitor.visitSender(this); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java index ded1a8d..298e905 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java @@ -47,36 +47,36 @@ import static org.apache.ignite.internal.processors.query.calcite.trait.Distribu /** * */ -public class IgniteMdDistribution implements MetadataHandler<IgniteMetadata.TraitDistribution> { +public class IgniteMdDistribution implements MetadataHandler<IgniteMetadata.DistributionTraitMetadata> { public static final RelMetadataProvider SOURCE = - ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.DISTRIBUTION.method(), new IgniteMdDistribution()); + ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.DISTRIBUTION_TRAIT.method(), new IgniteMdDistribution()); - @Override public MetadataDef<IgniteMetadata.TraitDistribution> getDef() { - return IgniteMetadata.TraitDistribution.DEF; + @Override public MetadataDef<IgniteMetadata.DistributionTraitMetadata> getDef() { + return IgniteMetadata.DistributionTraitMetadata.DEF; } - public DistributionTrait distribution(RelNode rel, RelMetadataQuery mq) { + public DistributionTrait getDistributionTrait(RelNode rel, RelMetadataQuery mq) { return DistributionTraitDef.INSTANCE.getDefault(); } - public DistributionTrait distribution(Filter filter, RelMetadataQuery mq) { + public DistributionTrait getDistributionTrait(Filter filter, RelMetadataQuery mq) { return filter(mq, filter.getInput(), filter.getCondition()); } - public DistributionTrait distribution(Project project, RelMetadataQuery mq) { + public DistributionTrait getDistributionTrait(Project project, RelMetadataQuery mq) { return project(mq, project.getInput(), project.getProjects()); } - public DistributionTrait distribution(Join join, RelMetadataQuery mq) { + public DistributionTrait getDistributionTrait(Join join, RelMetadataQuery mq) { return join(mq, join.getLeft(), join.getRight(), join.getCondition()); } - public DistributionTrait distribution(RelSubset rel, RelMetadataQuery mq) { + public DistributionTrait getDistributionTrait(RelSubset rel, RelMetadataQuery mq) { return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE); } public static DistributionTrait project(RelMetadataQuery mq, RelNode input, List<RexNode> projects) { - DistributionTrait trait = distribution_(input, mq); + DistributionTrait trait = distribution(input, mq); if (trait.type() == HASH) { ImmutableIntList keys = trait.keys(); @@ -115,14 +115,14 @@ public class IgniteMdDistribution implements MetadataHandler<IgniteMetadata.Trai } public static DistributionTrait filter(RelMetadataQuery mq, RelNode input, RexNode condition) { - return distribution_(input, mq); + return distribution(input, mq); } public static DistributionTrait join(RelMetadataQuery mq, RelNode left, RelNode right, RexNode condition) { - return distribution_(left, mq); + return distribution(left, mq); } - public static DistributionTrait distribution_(RelNode rel, RelMetadataQuery mq) { - return rel.metadata(IgniteMetadata.TraitDistribution.class, mq).distribution(); + public static DistributionTrait distribution(RelNode rel, RelMetadataQuery mq) { + return RelMetadataQueryEx.wrap(mq).getDistributionTrait(rel); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java index 1c80b96..255e06c 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java @@ -16,12 +16,7 @@ package org.apache.ignite.internal.processors.query.calcite.metadata; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.BiRel; import org.apache.calcite.rel.RelNode; @@ -31,78 +26,80 @@ import org.apache.calcite.rel.metadata.MetadataHandler; import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataQuery; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver; -import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TaskDistribution; +import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.SourceDistributionMetadata; import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan; +import org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution; import org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution; import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod; import org.apache.ignite.internal.util.GridIntList; +import org.apache.ignite.internal.util.typedef.F; /** * */ -public class IgniteMdSourceDistribution implements MetadataHandler<TaskDistribution> { +public class IgniteMdSourceDistribution implements MetadataHandler<SourceDistributionMetadata> { public static final RelMetadataProvider SOURCE = - ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.TASK_DISTRIBUTION.method(), new IgniteMdSourceDistribution()); + ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.SOURCE_DISTRIBUTION.method(), new IgniteMdSourceDistribution()); - @Override public MetadataDef<TaskDistribution> getDef() { - return TaskDistribution.DEF; + @Override public MetadataDef<SourceDistributionMetadata> getDef() { + return SourceDistributionMetadata.DEF; } - public SourceDistribution distribution(RelNode rel, RelMetadataQuery mq) { + public SourceDistribution getSourceDistribution(RelNode rel, RelMetadataQuery mq) { throw new AssertionError(); } - public SourceDistribution distribution(RelSubset rel, RelMetadataQuery mq) { + public SourceDistribution getSourceDistribution(RelSubset rel, RelMetadataQuery mq) { throw new AssertionError(); } - public SourceDistribution distribution(SingleRel rel, RelMetadataQuery mq) { - return distribution_(rel.getInput(), mq); + public SourceDistribution getSourceDistribution(SingleRel rel, RelMetadataQuery mq) { + return distribution(rel.getInput(), mq); } - public SourceDistribution distribution(BiRel rel, RelMetadataQuery mq) { - return merge(distribution_(rel.getLeft(), mq), distribution_(rel.getRight(), mq)); + public SourceDistribution getSourceDistribution(BiRel rel, RelMetadataQuery mq) { + mq = RelMetadataQueryEx.wrap(mq); + + return merge(distribution(rel.getLeft(), mq), distribution(rel.getRight(), mq)); } - public SourceDistribution distribution(Receiver rel, RelMetadataQuery mq) { + public SourceDistribution getSourceDistribution(Receiver rel, RelMetadataQuery mq) { SourceDistribution res = new SourceDistribution(); - res.remoteInputs.add(rel); + res.remoteInputs = F.asList(rel); return res; } - public SourceDistribution distribution(IgniteLogicalTableScan rel, RelMetadataQuery mq) { + public SourceDistribution getSourceDistribution(IgniteLogicalTableScan rel, RelMetadataQuery mq) { return rel.tableDistribution(); } - public static SourceDistribution distribution_(RelNode rel, RelMetadataQuery mq) { - return rel.metadata(TaskDistribution.class, mq).distribution(); + public static SourceDistribution distribution(RelNode rel, RelMetadataQuery mq) { + return RelMetadataQueryEx.wrap(mq).getSourceDistribution(rel); } private static SourceDistribution merge(SourceDistribution left, SourceDistribution right) { SourceDistribution res = new SourceDistribution(); - res.remoteInputs = merge(left.remoteInputs, right.remoteInputs); res.partitionMapping = merge(left.partitionMapping, right.partitionMapping); + res.remoteInputs = merge(left.remoteInputs, right.remoteInputs); res.localInputs = merge(left.localInputs, right.localInputs); return res; } - private static <T> List<T> merge(List<T> left, List<T> right) { + private static PartitionsDistribution merge(PartitionsDistribution left, PartitionsDistribution right) { if (left == null) return right; + if (right == null) + return left; - if (right != null) - left.addAll(right); - - return left; + return left.mergeWith(right); } - private static GridIntList merge(GridIntList left, GridIntList right) { + private static <T> List<T> merge(List<T> left, List<T> right) { if (left == null) return right; @@ -112,44 +109,13 @@ public class IgniteMdSourceDistribution implements MetadataHandler<TaskDistribut return left; } - private static Map<ClusterNode, int[]> merge(Map<ClusterNode, int[]> left, Map<ClusterNode, int[]> right) { + private static GridIntList merge(GridIntList left, GridIntList right) { if (left == null) return right; - if (right == null) - return left; - - Map<ClusterNode, int[]> res = new HashMap<>(Math.min(left.size(), right.size())); - - Set<ClusterNode> keys = new HashSet<>(left.keySet()); - - keys.retainAll(right.keySet()); - - for (ClusterNode node : keys) { - int[] leftParts = left.get(node); - int[] rightParts = right.get(node); - - int[] nodeParts = new int[Math.min(leftParts.length, rightParts.length)]; - - int i = 0, j = 0, k = 0; - - while (i < leftParts.length && j < rightParts.length) { - if (leftParts[i] < rightParts[j]) - i++; - else if (rightParts[j] < leftParts[i]) - j++; - else { - nodeParts[k++] = leftParts[i]; - - i++; - j++; - } - } - - if (k > 0) - res.put(node, k < nodeParts.length ? Arrays.copyOf(nodeParts, k) : nodeParts); - } + if (right != null) + left.addAll(right); - return res; + return left; } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java index 3b5445e..090ef53 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java @@ -40,27 +40,27 @@ public class IgniteMetadata { IgniteMdSourceDistribution.SOURCE, DefaultRelMetadataProvider.INSTANCE)); - public interface TraitDistribution extends Metadata { - MetadataDef<TraitDistribution> DEF = MetadataDef.of(TraitDistribution.class, TraitDistribution.Handler.class, IgniteMethod.DISTRIBUTION.method()); + public interface DistributionTraitMetadata extends Metadata { + MetadataDef<DistributionTraitMetadata> DEF = MetadataDef.of(DistributionTraitMetadata.class, DistributionTraitMetadata.Handler.class, IgniteMethod.DISTRIBUTION_TRAIT.method()); /** Determines how the rows are distributed. */ - DistributionTrait distribution(); + DistributionTrait getDistributionTrait(); /** Handler API. */ - interface Handler extends MetadataHandler<TraitDistribution> { - DistributionTrait distribution(RelNode r, RelMetadataQuery mq); + interface Handler extends MetadataHandler<DistributionTraitMetadata> { + DistributionTrait getDistributionTrait(RelNode r, RelMetadataQuery mq); } } - public interface TaskDistribution extends Metadata { - MetadataDef<TaskDistribution> DEF = MetadataDef.of(TaskDistribution.class, TaskDistribution.Handler.class, IgniteMethod.TASK_DISTRIBUTION.method()); + public interface SourceDistributionMetadata extends Metadata { + MetadataDef<SourceDistributionMetadata> DEF = MetadataDef.of(SourceDistributionMetadata.class, SourceDistributionMetadata.Handler.class, IgniteMethod.SOURCE_DISTRIBUTION.method()); /** Determines how the rows are distributed. */ - SourceDistribution distribution(); + SourceDistribution getSourceDistribution(); /** Handler API. */ - interface Handler extends MetadataHandler<TaskDistribution> { - SourceDistribution distribution(RelNode r, RelMetadataQuery mq); + interface Handler extends MetadataHandler<SourceDistributionMetadata> { + SourceDistribution getSourceDistribution(RelNode r, RelMetadataQuery mq); } } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java new file mode 100644 index 0000000..8c10a51 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java @@ -0,0 +1,88 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.metadata; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; +import org.jetbrains.annotations.NotNull; + +/** + * + */ +public class RelMetadataQueryEx extends RelMetadataQuery { + private static final RelMetadataQueryEx PROTO = new RelMetadataQueryEx(); + private static final JaninoRelMetadataProvider PROVIDER = JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER); + + private IgniteMetadata.DistributionTraitMetadata.Handler distributionTraitHandler; + private IgniteMetadata.SourceDistributionMetadata.Handler sourceDistributionHandler; + + private RelMetadataQueryEx() { + super(JaninoRelMetadataProvider.DEFAULT, RelMetadataQuery.EMPTY); + + distributionTraitHandler = initialHandler(IgniteMetadata.DistributionTraitMetadata.Handler.class); + sourceDistributionHandler = initialHandler(IgniteMetadata.SourceDistributionMetadata.Handler.class); + } + + protected RelMetadataQueryEx(JaninoRelMetadataProvider metadataProvider, RelMetadataQueryEx prototype) { + super(metadataProvider, prototype); + + distributionTraitHandler = prototype.distributionTraitHandler; + sourceDistributionHandler = prototype.sourceDistributionHandler; + } + + protected RelMetadataQueryEx(JaninoRelMetadataProvider metadataProvider, RelMetadataQuery parent) { + super(metadataProvider, parent); + + distributionTraitHandler = PROTO.distributionTraitHandler; + sourceDistributionHandler = PROTO.sourceDistributionHandler; + } + + @SuppressWarnings("MethodOverridesStaticMethodOfSuperclass") + public static RelMetadataQueryEx instance() { + return new RelMetadataQueryEx(PROVIDER, PROTO); + } + + public static RelMetadataQueryEx wrap(@NotNull RelMetadataQuery mq) { + if (mq.getClass() == RelMetadataQueryEx.class) + return (RelMetadataQueryEx) mq; + + return new RelMetadataQueryEx(PROVIDER, mq); + } + + public SourceDistribution getSourceDistribution(RelNode rel) { + for (;;) { + try { + return sourceDistributionHandler.getSourceDistribution(rel, this); + } catch (JaninoRelMetadataProvider.NoHandler e) { + sourceDistributionHandler = revise(e.relClass, IgniteMetadata.SourceDistributionMetadata.DEF); + } + } + } + + public DistributionTrait getDistributionTrait(RelNode rel) { + for (;;) { + try { + return distributionTraitHandler.getDistributionTrait(rel, this); + } catch (JaninoRelMetadataProvider.NoHandler e) { + distributionTraitHandler = revise(e.relClass, IgniteMetadata.DistributionTraitMetadata.DEF); + } + } + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java index 995ad70..f15bf81 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java @@ -32,7 +32,5 @@ public interface IgniteRel extends RelNode { } }; - default void visit(IgniteVisitor visitor) { - visitor.visit(this); - } + <T> T accept(IgniteVisitor<T> visitor); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java index a89c034..542696d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java @@ -17,9 +17,29 @@ package org.apache.ignite.internal.processors.query.calcite.rel; +import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver; +import org.apache.ignite.internal.processors.query.calcite.exchange.Sender; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalExchange; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalFilter; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalProject; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan; + /** * */ -public interface IgniteVisitor { - public void visit(IgniteRel rel); +public interface IgniteVisitor<T> { + T visitExchange(IgniteLogicalExchange exchange); + + T visitFilter(IgniteLogicalFilter filter); + + T visitJoin(IgniteLogicalJoin join); + + T visitProject(IgniteLogicalProject project); + + T visitTableScan(IgniteLogicalTableScan tableScan); + + T visitReceiver(Receiver receiver); + + T visitSender(Sender sender); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java index ed9bee6..3a577e9 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java @@ -26,6 +26,7 @@ import org.apache.calcite.rel.SingleRel; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.Util; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; @@ -62,4 +63,8 @@ public final class IgniteLogicalExchange extends SingleRel implements IgniteRel @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { return new IgniteLogicalExchange(getCluster(), traitSet, sole(inputs)); } + + @Override public <T> T accept(IgniteVisitor<T> visitor) { + return visitor.visitExchange(this); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java index 0bc0fbf..34bbe3d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java @@ -28,6 +28,7 @@ import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rex.RexNode; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; public final class IgniteLogicalFilter extends Filter implements IgniteRel { private final Set<CorrelationId> variablesSet; @@ -59,4 +60,8 @@ public final class IgniteLogicalFilter extends Filter implements IgniteRel { return new IgniteLogicalFilter(filter.getCluster(), traits, input, filter.getCondition(), filter.getVariablesSet()); } + + @Override public <T> T accept(IgniteVisitor<T> visitor) { + return visitor.visitFilter(this); + } } \ No newline at end of file diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java index 215b676..95699c7 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java @@ -26,6 +26,7 @@ import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexNode; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; public final class IgniteLogicalJoin extends Join implements IgniteRel { private final boolean semiJoinDone; @@ -58,4 +59,8 @@ public final class IgniteLogicalJoin extends Join implements IgniteRel { @Override public boolean isSemiJoinDone() { return semiJoinDone; } + + @Override public <T> T accept(IgniteVisitor<T> visitor) { + return visitor.visitJoin(this); + } } \ No newline at end of file diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java index 89a992b..56c0f50 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java @@ -26,6 +26,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; public final class IgniteLogicalProject extends Project implements IgniteRel { public IgniteLogicalProject( @@ -49,4 +50,8 @@ public final class IgniteLogicalProject extends Project implements IgniteRel { return new IgniteLogicalProject(project.getCluster(), traits, input, project.getProjects(), project.getRowType()); } + + @Override public <T> T accept(IgniteVisitor<T> visitor) { + return visitor.visitProject(this); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java index 8698573..ca2ae90 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java @@ -23,6 +23,7 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.TableScan; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; import org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution; @@ -40,4 +41,8 @@ public final class IgniteLogicalTableScan extends TableScan implements IgniteRel public SourceDistribution tableDistribution() { return getTable().unwrap(IgniteTable.class).tableDistribution(getCluster().getPlanner().getContext()); } + + @Override public <T> T accept(IgniteVisitor<T> visitor) { + return visitor.visitTableScan(this); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java index 3cbda12..bccf570 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java @@ -27,9 +27,9 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.TranslatableTable; import org.apache.calcite.schema.impl.AbstractTable; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.query.calcite.exchange.DistributionRegistry; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan; +import org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry; import org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; @@ -78,13 +78,16 @@ public class IgniteTable extends AbstractTable implements TranslatableTable { public SourceDistribution tableDistribution(Context context) { SourceDistribution res = new SourceDistribution(); - res.localInputs = new GridIntList(); - res.localInputs.add(CU.cacheId(cacheName)); + GridIntList localInputs = new GridIntList(); - DistributionRegistry registry = context.unwrap(DistributionRegistry.class); + localInputs.add(CU.cacheId(cacheName)); + + res.localInputs = localInputs; + + PartitionsDistributionRegistry registry = context.unwrap(PartitionsDistributionRegistry.class); AffinityTopologyVersion topVer = context.unwrap(AffinityTopologyVersion.class); - res.partitionMapping = registry.partitionMapping(CU.cacheId(cacheName), topVer); + res.partitionMapping = registry.partitionsDistribution(CU.cacheId(cacheName), topVer); return res; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java new file mode 100644 index 0000000..2a7707c --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java @@ -0,0 +1,196 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.splitter; + +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; +import java.util.Map; + +/** + * + */ +public class PartitionsDistribution { + public static final int[] ALL_PARTS = new int[0]; + private static final int[] EMPTY = new int[0]; + + public boolean excessive; + public int parts; + public int[] nodes; + public int[][] nodeParts; + + public PartitionsDistribution mergeWith(PartitionsDistribution other) { + if (parts != 0 && other.parts != 0 && parts != other.parts) + throw new IllegalStateException("Non-collocated query fragment."); + + int[] nodes0 = null; + int[][] nodeParts0 = null; + + int i = 0, j = 0, k = 0; + + while (i < nodes0.length && j < other.nodes.length) { + if (nodes[i] < other.nodes[j]) + i++; + else if (other.nodes[j] < nodes[i]) + j++; + else { + int[] mergedParts = merge(nodeParts[i], other.nodeParts[j]); + + if (mergedParts.length > 0) { + if (nodes0 == null) { + int len = Math.min(nodes.length, other.nodes.length); + + nodes0 = new int[len]; + nodeParts0 = new int[len][]; + } + + nodes0[k] = nodes[i]; + nodeParts0[k] = mergedParts; + + k++; + } + + i++; + j++; + } + } + + PartitionsDistribution res = new PartitionsDistribution(); + + res.excessive = excessive && other.excessive; + res.parts = Math.max(parts, other.parts); + res.nodes = nodes0.length == k ? nodes0 : Arrays.copyOf(nodes0, k); + res.nodeParts = nodeParts0.length == k ? nodeParts0 : Arrays.copyOf(nodeParts0, k); + + check(res); + + return res; + } + + private void check(PartitionsDistribution res) { + if (res.parts == 0) + return; // Only receivers and/or replicated caches in task subtree. + + BitSet check = new BitSet(res.parts); + + int checkedParts = 0; + + for (int[] nodePart : res.nodeParts) { + for (int p : nodePart) { + if (!check.get(p)) { + check.set(p); + checkedParts++; + } + } + } + + if (checkedParts < res.parts) + throw new IllegalStateException("Failed to collocate used caches."); + } + + private int[] merge(int[] left, int[] right) { + if (left == ALL_PARTS) + return right; + if (right == ALL_PARTS) + return left; + + int[] nodeParts = null; + + int i = 0, j = 0, k = 0; + + while (i < left.length && j < right.length) { + if (left[i] < right[j]) + i++; + else if (right[j] < left[i]) + j++; + else { + if (nodeParts == null) + nodeParts = new int[Math.min(left.length, right.length)]; + + nodeParts[k++] = left[i]; + + i++; + j++; + } + } + + if (k == 0) + return EMPTY; + + return nodeParts.length == k ? nodeParts : Arrays.copyOf(nodeParts, k); + } + + public PartitionsDistribution deduplicate() { + if (!excessive) + return this; + + Map<Integer, Integer> map = new HashMap<>(); + + int idx = 0; int[] idxs = new int[nodeParts.length]; + while (map.size() < parts) { + int[] nodePart = nodeParts[idx]; + + int j = idxs[idx]; + + while (j < nodePart.length) { + if (map.putIfAbsent(nodePart[j], nodes[idx]) == null || j + 1 == nodePart.length) { + idxs[idx] = j + 1; + + break; + } + + j++; + } + + idx = (idx + 1) % nodes.length; + } + + int[] nodes0 = new int[nodes.length]; int[][] nodeParts0 = new int[nodes.length][]; + + int k = 0; + + for (int i = 0; i < nodes.length; i++) { + int j = 0; + + int[] nodePart0 = null; + + for (int p : nodeParts[i]) { + if (map.get(p) == nodes[i]) { + if (nodePart0 == null) + nodePart0 = new int[nodeParts[i].length]; + + nodePart0[j++] = p; + } + } + + if (nodePart0 != null) { + nodes0[k] = nodes[i]; + nodeParts0[k] = nodePart0.length == j ? nodePart0 : Arrays.copyOf(nodePart0, j); + + k++; + } + } + + PartitionsDistribution res = new PartitionsDistribution(); + + res.parts = parts; + res.nodes = nodes0.length == k ? nodes0 : Arrays.copyOf(nodes0, k); + res.nodeParts = nodeParts0.length == k ? nodeParts0 : Arrays.copyOf(nodeParts0, k); + + return res; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java similarity index 74% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java index a2b0942..f234ae7 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java @@ -14,15 +14,13 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.calcite.exchange; +package org.apache.ignite.internal.processors.query.calcite.splitter; -import java.util.Map; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; /** * */ -public interface DistributionRegistry { - Map<ClusterNode, int[]> partitionMapping(int cacheId, AffinityTopologyVersion topVer); +public interface PartitionsDistributionRegistry { + PartitionsDistribution partitionsDistribution(int cacheId, AffinityTopologyVersion topVer); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java index eded34e..02a5514 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.query.calcite.splitter; import java.util.List; -import java.util.Map; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver; import org.apache.ignite.internal.util.GridIntList; @@ -26,7 +24,7 @@ import org.apache.ignite.internal.util.GridIntList; * */ public class SourceDistribution { - public Map<ClusterNode, int[]> partitionMapping; // partition filter for unstable topology + public PartitionsDistribution partitionMapping; // partitions mapping. public List<Receiver> remoteInputs; // remote inputs to notify particular senders about final task distribution public GridIntList localInputs; // involved caches, used for partitions reservation } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java index f405939..1fbe086 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java @@ -28,5 +28,14 @@ public class SplitTask { public SplitTask(RelNode root, SourceDistribution distribution) { this.distribution = distribution; this.root = root; + + init(); + } + + private void init() { + PartitionsDistribution mapping = distribution.partitionMapping; + + if (mapping != null && mapping.excessive) + distribution.partitionMapping = mapping.deduplicate(); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java index 4ca6716..22527b3 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java @@ -16,11 +16,91 @@ package org.apache.ignite.internal.processors.query.calcite.splitter; -import org.apache.calcite.rel.RelShuttleImpl; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver; +import org.apache.ignite.internal.processors.query.calcite.exchange.Sender; +import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdSourceDistribution; +import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalExchange; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalFilter; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalProject; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan; /** * */ -public class TaskSplitter extends RelShuttleImpl { +public class TaskSplitter implements IgniteVisitor<IgniteRel> { + /** */ + private List<IgniteRel> roots; + public List<SplitTask> go(IgniteRel root) { + roots = new ArrayList<>(); + + roots.add(root.accept(this)); + + ArrayList<SplitTask> splitTasks = new ArrayList<>(roots.size()); + + RelMetadataQuery mq = RelMetadataQueryEx.instance(); + + for (IgniteRel igniteRel : roots) { + splitTasks.add(new SplitTask(igniteRel, IgniteMdSourceDistribution.distribution(igniteRel, mq))); + } + + return splitTasks; + } + + @Override public IgniteRel visitExchange(IgniteLogicalExchange exchange) { + RelOptCluster cluster = exchange.getCluster(); + RelNode input = exchange.getInput(); + + Sender sender = new Sender(cluster, input.getTraitSet(), visitChildren(input)); + + roots.add(sender); + + return new Receiver(cluster, exchange.getTraitSet(), sender); + } + + @Override public IgniteRel visitFilter(IgniteLogicalFilter filter) { + return visitChildren(filter); + } + + @Override public IgniteRel visitJoin(IgniteLogicalJoin join) { + return visitChildren(join); + } + + @Override public IgniteRel visitProject(IgniteLogicalProject project) { + return visitChildren(project); + } + + @Override public IgniteRel visitTableScan(IgniteLogicalTableScan tableScan) { + return tableScan; + } + + @Override public IgniteRel visitReceiver(Receiver receiver) { + throw new AssertionError("An attempt to split an already split task."); + } + + @Override public IgniteRel visitSender(Sender sender) { + throw new AssertionError("An attempt to split an already split task."); + } + + private IgniteRel visitChildren(RelNode parent) { + List<RelNode> inputs = parent.getInputs(); + + for (int i = 0; i < inputs.size(); i++) { + IgniteRel input = (IgniteRel) inputs.get(i); + IgniteRel rel = input.accept(this); + + if (rel != input) + parent.replaceInput(i, rel); + } + return (IgniteRel) parent; + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java index b6a6703..63a5c2b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java @@ -19,15 +19,15 @@ package org.apache.ignite.internal.processors.query.calcite.util; import java.lang.reflect.Method; import org.apache.calcite.linq4j.tree.Types; -import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TaskDistribution; -import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TraitDistribution; +import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.DistributionTraitMetadata; +import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.SourceDistributionMetadata; /** * */ public enum IgniteMethod { - DISTRIBUTION(TraitDistribution.class, "distribution"), - TASK_DISTRIBUTION(TaskDistribution.class, "distribution"); + DISTRIBUTION_TRAIT(DistributionTraitMetadata.class, "getDistributionTrait"), + SOURCE_DISTRIBUTION(SourceDistributionMetadata.class, "getSourceDistribution"); private final Method method; diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java index 028bb9c..884b9ca 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.calcite; +import java.util.List; import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.ConventionTraitDef; @@ -37,8 +38,13 @@ import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType; import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; import org.apache.ignite.internal.processors.query.calcite.schema.RowType; +import org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution; +import org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry; +import org.apache.ignite.internal.processors.query.calcite.splitter.SplitTask; +import org.apache.ignite.internal.processors.query.calcite.splitter.TaskSplitter; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -54,6 +60,9 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { private static CalciteQueryProcessor proc; private static SchemaPlus schema; + private static PartitionsDistribution developerDistribution; + private static PartitionsDistribution projectDistribution; + @BeforeClass public static void setupClass() { proc = new CalciteQueryProcessor(); @@ -71,6 +80,12 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { .field("cityId", Integer.class) .build())); + developerDistribution = new PartitionsDistribution(); + + developerDistribution.parts = 5; + developerDistribution.nodes = new int[]{0,1,2}; + developerDistribution.nodeParts = new int[][]{{1,2},{3,4},{5}}; + publicSchema.addTable("Project", new IgniteTable("Project", "Project", RowType.builder() .keyField("id", Integer.class, true) @@ -78,6 +93,13 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { .field("ver", Integer.class) .build())); + projectDistribution = new PartitionsDistribution(); + + projectDistribution.excessive = true; + projectDistribution.parts = 5; + projectDistribution.nodes = new int[]{0,1,2}; + projectDistribution.nodeParts = new int[][]{{1,2,3,5},{2,3,4},{1,4,5}}; + publicSchema.addTable("Country", new IgniteTable("Country", "Country", RowType.builder() .keyField("id", Integer.class, true) @@ -106,7 +128,16 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.projectId = p.id0 " + "WHERE (d.projectId + 1) > ?"; - Context ctx = proc.context(Contexts.of(schema, AffinityTopologyVersion.NONE), sql, new Object[]{2}); + PartitionsDistributionRegistry registry = (id, top) -> { + if (id == CU.cacheId("Developer")) + return developerDistribution; + if (id == CU.cacheId("Project")) + return projectDistribution; + + throw new AssertionError("Unexpected cache id:" + id); + }; + + Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); assertNotNull(ctx); @@ -149,5 +180,9 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { } assertNotNull(relRoot); + + List<SplitTask> fragments = new TaskSplitter().go((IgniteRel) relRoot.rel); + + assertNotNull(fragments); } } \ No newline at end of file