This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch ignite-12248 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push: new e18d8a1 pending e18d8a1 is described below commit e18d8a17a211b61b15213ddedcc8ac637b4f5888 Author: Igor Seliverstov <gvvinbl...@gmail.com> AuthorDate: Wed Oct 30 19:46:03 2019 +0300 pending --- modules/calcite/pom.xml | 7 ++ .../query/calcite/CalciteQueryProcessor.java | 6 +- .../query/calcite/exchange/Receiver.java | 14 ++++ .../processors/query/calcite/exchange/Sender.java | 38 ++++++++++ .../calcite/metadata/IgniteMdDistribution.java | 2 +- .../metadata/IgniteMdSourceDistribution.java | 5 ++ .../calcite/rel/logical/IgniteLogicalExchange.java | 15 ++-- .../calcite/rel/logical/IgniteLogicalFilter.java | 7 +- .../calcite/rel/logical/IgniteLogicalProject.java | 7 +- .../rel/logical/IgniteLogicalTableScan.java | 3 +- .../processors/query/calcite/rule/IgniteRules.java | 8 +-- .../query/calcite/rule/logical/IgniteJoinRule.java | 7 +- .../query/calcite/schema/IgniteTable.java | 18 +++-- .../query/calcite/splitter/Fragment.java | 68 ++++++++++++++++++ .../calcite/splitter/PartitionsDistribution.java | 2 +- .../splitter/PartitionsDistributionRegistry.java | 4 +- .../splitter/{SplitTask.java => QueryPlan.java} | 24 +++---- .../splitter/{TaskSplitter.java => Splitter.java} | 37 ++++------ .../DistributionFunction.java} | 9 +-- .../DistributionFunctionFactory.java} | 9 +-- .../query/calcite/trait/DistributionTrait.java | 8 ++- .../query/calcite/trait/DistributionTraitDef.java | 2 +- .../query/calcite/trait/DistributionTraitImpl.java | 8 ++- .../query/calcite/trait/IgniteDistributions.java | 35 ++++++++-- .../processors/query/calcite/util/Commons.java | 21 +----- .../query/calcite/CalciteQueryProcessorTest.java | 81 ++++++++++++++-------- 26 files changed, 304 insertions(+), 141 deletions(-) diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml index 1fa26fa..d654edb 100644 --- a/modules/calcite/pom.xml +++ b/modules/calcite/pom.xml @@ -79,6 +79,13 @@ </dependency> <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${spring.version}</version> diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index d1ad130..b036397 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -48,7 +48,7 @@ import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.query.calcite.util.Commons.contextParameter; +import static org.apache.ignite.internal.processors.query.calcite.util.Commons.provided; /** * @@ -148,8 +148,8 @@ public class CalciteQueryProcessor implements QueryEngine { return Contexts.chain(ctx, config.getContext(), Contexts.of( new Query(query, params), - contextParameter(ctx, SchemaPlus.class, schemaHolder::schema), - contextParameter(ctx, AffinityTopologyVersion.class, this::readyAffinityVersion))); + provided(ctx, SchemaPlus.class, schemaHolder::schema), + provided(ctx, AffinityTopologyVersion.class, this::readyAffinityVersion))); } private QueryExecution prepare(Context ctx) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java index 993f55b..fe35b52 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java @@ -21,13 +21,17 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.SingleRel; +import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; +import org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution; /** * */ public class Receiver extends SingleRel implements IgniteRel { + private SourceDistribution sourceDistribution; + /** * @param cluster Cluster this relational expression belongs to * @param traits Trait set. @@ -50,4 +54,14 @@ public class Receiver extends SingleRel implements IgniteRel { @Override public <T> T accept(IgniteVisitor<T> visitor) { return visitor.visitReceiver(this); } + + public void init(SourceDistribution targetDistribution, RelMetadataQueryEx mq) { + getInput().init(targetDistribution); + + sourceDistribution = getInput().sourceDistribution(mq); + } + + public SourceDistribution sourceDistribution() { + return sourceDistribution; + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java index 63cf8f7..f9bd762 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java @@ -16,17 +16,28 @@ package org.apache.ignite.internal.processors.query.calcite.exchange; +import java.util.List; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; +import org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; /** * */ public class Sender extends SingleRel implements IgniteRel { + private SourceDistribution sourceDistribution; + private SourceDistribution targetDistribution; + private DistributionFunction targetFunction; + /** * Creates a <code>SingleRel</code>. * @@ -38,7 +49,34 @@ public class Sender extends SingleRel implements IgniteRel { super(cluster, traits, input); } + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new Sender(getCluster(), traitSet, sole(inputs)); + } + @Override public <T> T accept(IgniteVisitor<T> visitor) { return visitor.visitSender(this); } + + public void init(SourceDistribution targetDistribution) { + this.targetDistribution = targetDistribution; + } + + public DistributionFunction targetFunction() { + if (targetFunction == null) { + assert targetDistribution != null && targetDistribution.partitionMapping != null; + + DistributionTrait distribution = getTraitSet().getTrait(DistributionTraitDef.INSTANCE); + + targetFunction = distribution.functionFactory().create(targetDistribution, distribution.keys()); + } + + return targetFunction; + } + + public SourceDistribution sourceDistribution(RelMetadataQuery mq) { + if (sourceDistribution == null) + sourceDistribution = RelMetadataQueryEx.wrap(mq).getSourceDistribution(getInput()); + + return sourceDistribution; + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java index 298e905..f644d0e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java @@ -108,7 +108,7 @@ public class IgniteMdDistribution implements MetadataHandler<IgniteMetadata.Dist newKeys.add(mapped); } - return IgniteDistributions.hash(newKeys); + return IgniteDistributions.hash(newKeys, trait.functionFactory()); } return trait; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java index 255e06c..d44a57b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java @@ -27,6 +27,7 @@ import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver; +import org.apache.ignite.internal.processors.query.calcite.exchange.Sender; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.SourceDistributionMetadata; import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan; import org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution; @@ -58,6 +59,10 @@ public class IgniteMdSourceDistribution implements MetadataHandler<SourceDistrib return distribution(rel.getInput(), mq); } + public SourceDistribution getSourceDistribution(Sender rel, RelMetadataQuery mq) { + return rel.sourceDistribution(mq); + } + public SourceDistribution getSourceDistribution(BiRel rel, RelMetadataQuery mq) { mq = RelMetadataQueryEx.wrap(mq); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java index 3a577e9..32b8b09 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java @@ -22,12 +22,12 @@ import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.SingleRel; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.Util; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; -import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; /** @@ -52,14 +52,6 @@ public final class IgniteLogicalExchange extends SingleRel implements IgniteRel Util.nLogN(rowCount) * bytesPerRow, rowCount, 0); } - public DistributionTrait sourceDistribution() { - return getInput().getTraitSet().getTrait(DistributionTraitDef.INSTANCE); - } - - public DistributionTrait targetDistribution() { - return getTraitSet().getTrait(DistributionTraitDef.INSTANCE); - } - @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { return new IgniteLogicalExchange(getCluster(), traitSet, sole(inputs)); } @@ -67,4 +59,9 @@ public final class IgniteLogicalExchange extends SingleRel implements IgniteRel @Override public <T> T accept(IgniteVisitor<T> visitor) { return visitor.visitExchange(this); } + + @Override public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .item("distribution", getTraitSet().getTrait(DistributionTraitDef.INSTANCE)); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java index 34bbe3d..0da0d04 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java @@ -24,11 +24,12 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.CorrelationId; import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rex.RexNode; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution; +import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; public final class IgniteLogicalFilter extends Filter implements IgniteRel { private final Set<CorrelationId> variablesSet; @@ -53,10 +54,10 @@ public final class IgniteLogicalFilter extends Filter implements IgniteRel { .itemIf("variablesSet", variablesSet, !variablesSet.isEmpty()); } - public static IgniteLogicalFilter create(LogicalFilter filter, RelNode input) { + public static IgniteLogicalFilter create(Filter filter, RelNode input) { RelTraitSet traits = filter.getTraitSet() .replace(IgniteRel.LOGICAL_CONVENTION) - .replace(IgniteMdDistribution.filter(filter.getCluster().getMetadataQuery(), input, filter.getCondition())); + .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.filter(RelMetadataQueryEx.instance(), input, filter.getCondition())); return new IgniteLogicalFilter(filter.getCluster(), traits, input, filter.getCondition(), filter.getVariablesSet()); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java index 56c0f50..e67d371 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java @@ -21,12 +21,13 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution; +import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; public final class IgniteLogicalProject extends Project implements IgniteRel { public IgniteLogicalProject( @@ -43,10 +44,10 @@ public final class IgniteLogicalProject extends Project implements IgniteRel { return new IgniteLogicalProject(getCluster(), traitSet, input, projects, rowType); } - public static IgniteLogicalProject create(LogicalProject project, RelNode input) { + public static IgniteLogicalProject create(Project project, RelNode input) { RelTraitSet traits = project.getTraitSet() .replace(IgniteRel.LOGICAL_CONVENTION) - .replace(IgniteMdDistribution.project(project.getCluster().getMetadataQuery(), input, project.getProjects())); + .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.project(RelMetadataQueryEx.instance(), input, project.getProjects())); return new IgniteLogicalProject(project.getCluster(), traits, input, project.getProjects(), project.getRowType()); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java index ca2ae90..fb4cf58 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java @@ -39,7 +39,8 @@ public final class IgniteLogicalTableScan extends TableScan implements IgniteRel } public SourceDistribution tableDistribution() { - return getTable().unwrap(IgniteTable.class).tableDistribution(getCluster().getPlanner().getContext()); + return getTable().unwrap(IgniteTable.class) + .sourceDistribution(getCluster().getPlanner().getContext()); } @Override public <T> T accept(IgniteVisitor<T> visitor) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java index 7873c6c..2d155f5 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java @@ -158,17 +158,15 @@ public class IgniteRules { SubQueryRemoveRule.PROJECT, SubQueryRemoveRule.JOIN); - public static final List<RelOptRule> IGNITE_BASE_RULES = ImmutableList.of( + public static final List<RelOptRule> IGNITE_LOGICAL_RULES = ImmutableList.of( IgniteFilterRule.INSTANCE, IgniteProjectRule.INSTANCE, IgniteJoinRule.INSTANCE); public static List<RelOptRule> logicalRules(Context ctx) { return ImmutableList.<RelOptRule>builder() -// .addAll(BASE_RULES) -// .addAll(ABSTRACT_RULES) - .addAll(ABSTRACT_RELATIONAL_RULES) - .addAll(IGNITE_BASE_RULES) + .addAll(IGNITE_LOGICAL_RULES) + .add(AbstractConverter.ExpandConversionRule.INSTANCE) .build(); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java index a5abdea..3913cb4 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java @@ -27,6 +27,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; import org.apache.ignite.internal.processors.query.calcite.util.Commons; @@ -45,11 +46,11 @@ public class IgniteJoinRule extends RelOptRule { RelTraitSet leftTraits = join.getLeft().getTraitSet() .replace(IgniteRel.LOGICAL_CONVENTION) - .replace(IgniteDistributions.hash(join.analyzeCondition().leftKeys)); + .replace(IgniteDistributions.hash(join.analyzeCondition().leftKeys, IgniteDistributions.noOpFunction())); RelTraitSet rightTraits = join.getRight().getTraitSet() .replace(IgniteRel.LOGICAL_CONVENTION) - .replace(IgniteDistributions.hash(join.analyzeCondition().rightKeys)); + .replace(IgniteDistributions.hash(join.analyzeCondition().rightKeys, IgniteDistributions.noOpFunction())); RelNode left = convert(join.getLeft(), leftTraits); RelNode right = convert(join.getRight(), rightTraits); @@ -58,7 +59,7 @@ public class IgniteJoinRule extends RelOptRule { RelTraitSet traitSet = join.getTraitSet() .replace(IgniteRel.LOGICAL_CONVENTION) - .replace(IgniteMdDistribution.join(mq, left, right, join.getCondition())); + .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.join(mq, left, right, join.getCondition())); call.transformTo(new IgniteLogicalJoin(join.getCluster(), traitSet, left, right, join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone())); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java index bccf570..bf8b752 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan; import org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry; import org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; import org.apache.ignite.internal.util.GridIntList; @@ -71,23 +72,26 @@ public class IgniteTable extends AbstractTable implements TranslatableTable { @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { RelOptCluster cluster = context.getCluster(); RelTraitSet traitSet = cluster.traitSet().replace(IgniteRel.LOGICAL_CONVENTION) - .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteDistributions.hash(rowType.distributionKeys())); + .replaceIf(DistributionTraitDef.INSTANCE, () -> distributionTrait(cluster.getPlanner().getContext())); return new IgniteLogicalTableScan(cluster, traitSet, relOptTable); } - public SourceDistribution tableDistribution(Context context) { - SourceDistribution res = new SourceDistribution(); + public DistributionTrait distributionTrait(Context context) { + return IgniteDistributions.hash(rowType.distributionKeys(), IgniteDistributions.noOpFunction()); // TODO + } - GridIntList localInputs = new GridIntList(); + public SourceDistribution sourceDistribution(Context context) { + int cacheId = CU.cacheId(cacheName); - localInputs.add(CU.cacheId(cacheName)); + SourceDistribution res = new SourceDistribution(); + GridIntList localInputs = new GridIntList(); + localInputs.add(cacheId); res.localInputs = localInputs; PartitionsDistributionRegistry registry = context.unwrap(PartitionsDistributionRegistry.class); AffinityTopologyVersion topVer = context.unwrap(AffinityTopologyVersion.class); - - res.partitionMapping = registry.partitionsDistribution(CU.cacheId(cacheName), topVer); + res.partitionMapping = registry.get(cacheId, topVer); return res; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java new file mode 100644 index 0000000..f9ee3d1 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java @@ -0,0 +1,68 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.splitter; + +import org.apache.calcite.plan.Context; +import org.apache.calcite.rel.RelNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver; +import org.apache.ignite.internal.processors.query.calcite.exchange.Sender; +import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx; +import org.apache.ignite.internal.util.typedef.F; + +/** + * + */ +public class Fragment { + public final RelNode root; + + public SourceDistribution distribution; + + public Fragment(RelNode root) { + this.root = root; + } + + public void init(Context ctx) { + RelMetadataQueryEx mq = RelMetadataQueryEx.instance(); + + distribution = mq.getSourceDistribution(root); + + PartitionsDistribution mapping = distribution.partitionMapping; + + if (mapping == null) + distribution.partitionMapping = isRootFragment() ? registry(ctx).single() : registry(ctx).random(topologyVersion(ctx)); + else if (mapping.excessive) + distribution.partitionMapping = mapping.deduplicate(); + + if (!F.isEmpty(distribution.remoteInputs)) { + for (Receiver input : distribution.remoteInputs) + input.init(distribution, mq); + } + } + + private boolean isRootFragment() { + return !(root instanceof Sender); + } + + private PartitionsDistributionRegistry registry(Context ctx) { + return ctx.unwrap(PartitionsDistributionRegistry.class); + } + + private AffinityTopologyVersion topologyVersion(Context ctx) { + return ctx.unwrap(AffinityTopologyVersion.class); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java index 2a7707c..72ed1fa 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java @@ -42,7 +42,7 @@ public class PartitionsDistribution { int i = 0, j = 0, k = 0; - while (i < nodes0.length && j < other.nodes.length) { + while (i < nodes.length && j < other.nodes.length) { if (nodes[i] < other.nodes[j]) i++; else if (other.nodes[j] < nodes[i]) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java index f234ae7..568ce65 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java @@ -22,5 +22,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; * */ public interface PartitionsDistributionRegistry { - PartitionsDistribution partitionsDistribution(int cacheId, AffinityTopologyVersion topVer); + PartitionsDistribution get(int cacheId, AffinityTopologyVersion topVer); + PartitionsDistribution random(AffinityTopologyVersion topVer); + PartitionsDistribution single(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java similarity index 60% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java index 1fbe086..84985e6 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java @@ -16,26 +16,22 @@ package org.apache.ignite.internal.processors.query.calcite.splitter; -import org.apache.calcite.rel.RelNode; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.Context; /** * */ -public class SplitTask { - public final SourceDistribution distribution; - public final RelNode root; +public class QueryPlan { + private final ImmutableList<Fragment> fragments; - public SplitTask(RelNode root, SourceDistribution distribution) { - this.distribution = distribution; - this.root = root; - - init(); + public QueryPlan(ImmutableList<Fragment> fragments) { + this.fragments = fragments; } - private void init() { - PartitionsDistribution mapping = distribution.partitionMapping; - - if (mapping != null && mapping.excessive) - distribution.partitionMapping = mapping.deduplicate(); + public void init(Context ctx) { + for (Fragment fragment : fragments) { + fragment.init(ctx); + } } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java similarity index 73% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java index 22527b3..5220d74 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java @@ -16,15 +16,13 @@ package org.apache.ignite.internal.processors.query.calcite.splitter; -import java.util.ArrayList; +import com.google.common.collect.ImmutableList; import java.util.List; import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver; import org.apache.ignite.internal.processors.query.calcite.exchange.Sender; -import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdSourceDistribution; -import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor; import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalExchange; @@ -36,35 +34,26 @@ import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLog /** * */ -public class TaskSplitter implements IgniteVisitor<IgniteRel> { - /** */ - private List<IgniteRel> roots; +public class Splitter implements IgniteVisitor<IgniteRel> { + private ImmutableList.Builder<Fragment> b; - public List<SplitTask> go(IgniteRel root) { - roots = new ArrayList<>(); + public QueryPlan go(IgniteRel root) { + b = ImmutableList.builder(); - roots.add(root.accept(this)); - - ArrayList<SplitTask> splitTasks = new ArrayList<>(roots.size()); - - RelMetadataQuery mq = RelMetadataQueryEx.instance(); - - for (IgniteRel igniteRel : roots) { - splitTasks.add(new SplitTask(igniteRel, IgniteMdSourceDistribution.distribution(igniteRel, mq))); - } - - return splitTasks; + return new QueryPlan(b.add(new Fragment(root.accept(this))).build()); } @Override public IgniteRel visitExchange(IgniteLogicalExchange exchange) { RelOptCluster cluster = exchange.getCluster(); - RelNode input = exchange.getInput(); + RelTraitSet traitSet = exchange.getTraitSet(); + + IgniteRel input = visitChildren(exchange.getInput()); - Sender sender = new Sender(cluster, input.getTraitSet(), visitChildren(input)); + Sender sender = new Sender(cluster, traitSet, input); - roots.add(sender); + b.add(new Fragment(sender)); - return new Receiver(cluster, exchange.getTraitSet(), sender); + return new Receiver(cluster, traitSet, sender); } @Override public IgniteRel visitFilter(IgniteLogicalFilter filter) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java similarity index 70% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java index f234ae7..76af490 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java @@ -14,13 +14,14 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.calcite.splitter; +package org.apache.ignite.internal.processors.query.calcite.trait; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import java.util.List; +import org.apache.ignite.cluster.ClusterNode; /** * */ -public interface PartitionsDistributionRegistry { - PartitionsDistribution partitionsDistribution(int cacheId, AffinityTopologyVersion topVer); +public interface DistributionFunction { + List<ClusterNode> destination(Object row); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java similarity index 67% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java index f234ae7..8bc129e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java @@ -14,13 +14,14 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.calcite.splitter; +package org.apache.ignite.internal.processors.query.calcite.trait; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution; /** * */ -public interface PartitionsDistributionRegistry { - PartitionsDistribution partitionsDistribution(int cacheId, AffinityTopologyVersion topVer); +public interface DistributionFunctionFactory { + DistributionFunction create(SourceDistribution targetDistr, ImmutableIntList keys); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java index 3dabee1..2510c2c 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java @@ -16,6 +16,7 @@ package org.apache.ignite.internal.processors.query.calcite.trait; +import java.util.Objects; import org.apache.calcite.plan.RelTrait; import org.apache.calcite.plan.RelTraitDef; import org.apache.calcite.util.ImmutableIntList; @@ -45,10 +46,10 @@ public interface DistributionTrait extends RelTrait { } } - DistributionTrait ANY = IgniteDistributions.single(); - DistributionType type(); + DistributionFunctionFactory functionFactory(); + @Override default RelTraitDef getTraitDef() { return DistributionTraitDef.INSTANCE; } @@ -66,7 +67,8 @@ public interface DistributionTrait extends RelTrait { return true; if (type() == other.type()) - return type() != DistributionType.HASH || keys().equals(other.keys()); + return type() != DistributionType.HASH + || (Objects.equals(keys(), other.keys()) && Objects.equals(functionFactory(), other.functionFactory())); return other.type() == DistributionType.RANDOM && type() == DistributionType.HASH; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java index facc258..ef614bc 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java @@ -64,6 +64,6 @@ public class DistributionTraitDef extends RelTraitDef<DistributionTrait> { } @Override public DistributionTrait getDefault() { - return DistributionTrait.ANY; + return IgniteDistributions.any(); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java index 5b7b748..099c96b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java @@ -26,16 +26,22 @@ import org.apache.calcite.util.ImmutableIntList; public class DistributionTraitImpl implements DistributionTrait { private final DistributionType type; private final ImmutableIntList keys; + private final DistributionFunctionFactory functionFactory; - public DistributionTraitImpl(DistributionType type, ImmutableIntList keys) { + public DistributionTraitImpl(DistributionType type, ImmutableIntList keys, DistributionFunctionFactory functionFactory) { this.type = type; this.keys = keys; + this.functionFactory = functionFactory; } @Override public DistributionType type() { return type; } + @Override public DistributionFunctionFactory functionFactory() { + return functionFactory; + } + @Override public ImmutableIntList keys() { return keys; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java index 7537d63..59f69fe 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java @@ -25,11 +25,12 @@ import static org.apache.ignite.internal.processors.query.calcite.trait.Distribu * */ public class IgniteDistributions { - /** */ - private static final DistributionTraitDef traitDef = DistributionTraitDef.INSTANCE; - private static final DistributionTrait SINGLE = traitDef.canonize(new DistributionTraitImpl(DistributionTrait.DistributionType.SINGLE, ImmutableIntList.of())); - private static final DistributionTrait RANDOM = traitDef.canonize(new DistributionTraitImpl(DistributionTrait.DistributionType.RANDOM, ImmutableIntList.of())); - private static final DistributionTrait ANY = traitDef.canonize(new DistributionTraitImpl(DistributionTrait.DistributionType.ANY, ImmutableIntList.of())); + private static final DistributionFunctionFactory NO_OP_FACTORY = (t,k) -> null; + + private static final DistributionTrait BROADCAST = new DistributionTraitImpl(DistributionTrait.DistributionType.BROADCAST, ImmutableIntList.of(), allTargetsFunction()); + private static final DistributionTrait SINGLE = new DistributionTraitImpl(DistributionTrait.DistributionType.SINGLE, ImmutableIntList.of(), singleTargetFunction()); + private static final DistributionTrait RANDOM = new DistributionTraitImpl(DistributionTrait.DistributionType.RANDOM, ImmutableIntList.of(), randomTargetFunction()); + private static final DistributionTrait ANY = new DistributionTraitImpl(DistributionTrait.DistributionType.ANY, ImmutableIntList.of(), noOpFunction()); public static DistributionTrait any() { return ANY; @@ -43,7 +44,27 @@ public class IgniteDistributions { return SINGLE; } - public static DistributionTrait hash(List<Integer> keys) { - return traitDef.canonize(new DistributionTraitImpl(HASH, ImmutableIntList.copyOf(keys))); + public static DistributionTrait broadcast() { + return BROADCAST; + } + + public static DistributionTrait hash(List<Integer> keys, DistributionFunctionFactory factory) { + return new DistributionTraitImpl(HASH, ImmutableIntList.copyOf(keys), factory); + } + + public static DistributionFunctionFactory noOpFunction() { + return NO_OP_FACTORY; + } + + public static DistributionFunctionFactory singleTargetFunction() { + return noOpFunction(); // TODO + } + + public static DistributionFunctionFactory allTargetsFunction() { + return noOpFunction(); // TODO + } + + public static DistributionFunctionFactory randomTargetFunction() { + return noOpFunction(); // TODO } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java index b638815..739d411 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java @@ -31,7 +31,6 @@ import org.apache.calcite.plan.RelOptNode; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptRuleOperand; -import org.apache.calcite.plan.RelTrait; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.RelNode; @@ -52,11 +51,11 @@ public final class Commons { return ctx == null ? Contexts.empty() : Contexts.of(ctx.unwrap(Object[].class)); } - public static <T> @Nullable T contextParameter(Context ctx, Class<T> paramType, Supplier<T> paramSrc) { + public static <T> @Nullable T provided(Context ctx, Class<T> paramType, Supplier<T> paramSrc) { T param = ctx.unwrap(paramType); if (param != null) - return param; + return null; // Provided by parent context. return paramSrc.get(); } @@ -81,26 +80,10 @@ public final class Commons { return b.build(); } - public static RelOptRuleOperand any(Class<? extends RelNode> first, RelTrait trait){ - return RelOptRule.operand(first, trait, RelOptRule.any()); - } - - public static RelOptRuleOperand any(Class<? extends RelNode> first){ - return RelOptRule.operand(first, RelOptRule.any()); - } - public static RelOptRuleOperand any(Class<? extends RelNode> first, Class<? extends RelNode> second) { return RelOptRule.operand(first, RelOptRule.operand(second, RelOptRule.any())); } - public static RelOptRuleOperand some(Class<? extends RelNode> rel, RelOptRuleOperand first, RelOptRuleOperand... rest){ - return RelOptRule.operand(rel, RelOptRule.some(first, rest)); - } - - public static RelOptRuleOperand some(Class<? extends RelNode> rel, RelTrait trait, RelOptRuleOperand first, RelOptRuleOperand... rest){ - return RelOptRule.operand(rel, trait, RelOptRule.some(first, rest)); - } - public static <T extends RelNode> RelOp<T, Boolean> transformSubset(RelOptRuleCall call, RelNode input, BiFunction<T, RelNode, RelNode> transformFun) { return rel -> { if (!(input instanceof RelSubset)) diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java index 884b9ca..d9ebd5b 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.query.calcite; -import java.util.List; import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.ConventionTraitDef; @@ -40,13 +39,12 @@ import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; import org.apache.ignite.internal.processors.query.calcite.schema.RowType; import org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution; import org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry; -import org.apache.ignite.internal.processors.query.calcite.splitter.SplitTask; -import org.apache.ignite.internal.processors.query.calcite.splitter.TaskSplitter; +import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan; +import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.testframework.junits.GridTestKernalContext; -import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.BeforeClass; import org.junit.Test; @@ -54,7 +52,7 @@ import org.junit.Test; /** * */ -@WithSystemProperty(key = "calcite.debug", value = "true") +//@WithSystemProperty(key = "calcite.debug", value = "true") public class CalciteQueryProcessorTest extends GridCommonAbstractTest { private static CalciteQueryProcessor proc; @@ -62,6 +60,9 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { private static PartitionsDistribution developerDistribution; private static PartitionsDistribution projectDistribution; + private static PartitionsDistribution randomDistribution; + private static PartitionsDistribution singleDistribution; + private static PartitionsDistributionRegistry registry; @BeforeClass public static void setupClass() { @@ -80,12 +81,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { .field("cityId", Integer.class) .build())); - developerDistribution = new PartitionsDistribution(); - - developerDistribution.parts = 5; - developerDistribution.nodes = new int[]{0,1,2}; - developerDistribution.nodeParts = new int[][]{{1,2},{3,4},{5}}; - publicSchema.addTable("Project", new IgniteTable("Project", "Project", RowType.builder() .keyField("id", Integer.class, true) @@ -93,12 +88,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { .field("ver", Integer.class) .build())); - projectDistribution = new PartitionsDistribution(); - projectDistribution.excessive = true; - projectDistribution.parts = 5; - projectDistribution.nodes = new int[]{0,1,2}; - projectDistribution.nodeParts = new int[][]{{1,2,3,5},{2,3,4},{1,4,5}}; publicSchema.addTable("Country", new IgniteTable("Country", "Country", RowType.builder() @@ -117,6 +107,48 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { schema = Frameworks.createRootSchema(false); schema.add("PUBLIC", publicSchema); + + developerDistribution = new PartitionsDistribution(); + + developerDistribution.parts = 5; + developerDistribution.nodes = new int[]{0,1,2}; + developerDistribution.nodeParts = new int[][]{{1,2},{3,4},{5}}; + + projectDistribution = new PartitionsDistribution(); + + projectDistribution.excessive = true; + projectDistribution.parts = 5; + projectDistribution.nodes = new int[]{0,1,2}; + projectDistribution.nodeParts = new int[][]{{1,2,3,5},{2,3,4},{1,4,5}}; + + randomDistribution = new PartitionsDistribution(); + randomDistribution.parts = 3; + randomDistribution.nodes = new int[]{0,1,2}; + randomDistribution.nodeParts = new int[][]{{1},{2},{3}}; + + singleDistribution = new PartitionsDistribution(); + singleDistribution.parts = 1; + singleDistribution.nodes = new int[]{0}; + singleDistribution.nodeParts = new int[][]{{1}}; + + registry = new PartitionsDistributionRegistry() { + @Override public PartitionsDistribution get(int cacheId, AffinityTopologyVersion topVer) { + if (cacheId == CU.cacheId("Developer")) + return developerDistribution; + if (cacheId == CU.cacheId("Project")) + return projectDistribution; + + throw new AssertionError("Unexpected cache id:" + cacheId); + } + + @Override public PartitionsDistribution random(AffinityTopologyVersion topVer) { + return randomDistribution; + } + + @Override public PartitionsDistribution single() { + return singleDistribution; + } + }; } @Test @@ -128,15 +160,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.projectId = p.id0 " + "WHERE (d.projectId + 1) > ?"; - PartitionsDistributionRegistry registry = (id, top) -> { - if (id == CU.cacheId("Developer")) - return developerDistribution; - if (id == CU.cacheId("Project")) - return projectDistribution; - - throw new AssertionError("Unexpected cache id:" + id); - }; - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); assertNotNull(ctx); @@ -181,8 +204,12 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { assertNotNull(relRoot); - List<SplitTask> fragments = new TaskSplitter().go((IgniteRel) relRoot.rel); + QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel); + + assertNotNull(plan); + + plan.init(ctx); - assertNotNull(fragments); + assertNotNull(plan); } } \ No newline at end of file