This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch ignite-12248 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push: new 1824145 context refactoring 1824145 is described below commit 1824145fdfdd1e78fa8d78b1cc8cbe0984a46beb Author: Igor Seliverstov <gvvinbl...@gmail.com> AuthorDate: Mon Dec 2 21:42:56 2019 +0300 context refactoring --- .../query/calcite/CalciteQueryProcessor.java | 45 +++-- .../{RegistryImpl.java => MappingServiceImpl.java} | 59 +----- .../cluster/TableDistributionServiceImpl.java | 87 ++++++++ ...ExchangeService.java => ExchangeProcessor.java} | 2 +- .../processors/query/calcite/exchange/Outbox.java | 18 +- .../{LocationRegistry.java => MappingService.java} | 2 +- ...Registry.java => TableDistributionService.java} | 2 +- .../query/calcite/prepare/DataContextImpl.java | 14 +- .../calcite/prepare/DistributedExecution.java | 10 +- .../query/calcite/prepare/IgnitePlanner.java | 5 +- .../query/calcite/prepare/PlannerContext.java | 213 ++++++++++++++++++++ .../query/calcite/rel/IgniteTableScan.java | 3 +- .../processors/query/calcite/rule/IgniteRules.java | 4 +- .../query/calcite/rule/PlannerPhase.java | 8 +- .../query/calcite/schema/IgniteTable.java | 31 +-- .../query/calcite/splitter/Fragment.java | 20 +- .../query/calcite/splitter/QueryPlan.java | 4 +- .../processors/query/calcite/splitter/Source.java | 4 +- .../query/calcite/trait/AllTargetsFactory.java | 4 +- .../calcite/trait/DestinationFunctionFactory.java | 4 +- .../query/calcite/trait/HashFunctionFactory.java | 4 +- .../query/calcite/trait/NoOpFactory.java | 4 +- .../query/calcite/trait/RandomTargetFactory.java | 4 +- .../query/calcite/trait/SingleTargetFactory.java | 4 +- .../processors/query/calcite/util/Commons.java | 25 +-- .../query/calcite/CalciteQueryProcessorTest.java | 223 ++++++++++++++------- .../query/calcite/exchange/OutboxTest.java | 5 +- 27 files changed, 548 insertions(+), 260 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index 760a71c..c244f4e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -19,11 +19,11 @@ package org.apache.ignite.internal.processors.query.calcite; import java.util.Collections; import java.util.List; +import java.util.function.BiFunction; import org.apache.calcite.config.Lex; import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.RelTraitDef; -import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.fun.SqlLibrary; import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory; import org.apache.calcite.sql.parser.SqlParser; @@ -36,9 +36,11 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryContext; import org.apache.ignite.internal.processors.query.QueryEngine; -import org.apache.ignite.internal.processors.query.calcite.cluster.RegistryImpl; +import org.apache.ignite.internal.processors.query.calcite.cluster.MappingServiceImpl; +import org.apache.ignite.internal.processors.query.calcite.cluster.TableDistributionServiceImpl; import org.apache.ignite.internal.processors.query.calcite.prepare.DistributedExecution; import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; import org.apache.ignite.internal.processors.query.calcite.prepare.Query; import org.apache.ignite.internal.processors.query.calcite.prepare.QueryExecution; import org.apache.ignite.internal.processors.query.calcite.schema.CalciteSchemaHolder; @@ -49,8 +51,6 @@ import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.query.calcite.util.Commons.provided; - /** * */ @@ -110,7 +110,7 @@ public class CalciteQueryProcessor implements QueryEngine { } @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String query, Object... params) throws IgniteSQLException { - Context context = context(Commons.convert(ctx), query, params); + PlannerContext context = context(Commons.convert(ctx), query, params, this::buildContext); QueryExecution execution = prepare(context); FieldsQueryCursor<List<?>> cur = execution.execute(); return Collections.singletonList(cur); @@ -124,16 +124,12 @@ public class CalciteQueryProcessor implements QueryEngine { return log; } - public GridKernalContext context() { - return kernalContext; - } - /** */ - public IgnitePlanner planner(RelTraitDef[] traitDefs, Context ctx) { + public IgnitePlanner planner(RelTraitDef[] traitDefs, PlannerContext ctx0) { FrameworkConfig cfg = Frameworks.newConfigBuilder(config()) - .defaultSchema(ctx.unwrap(SchemaPlus.class)) + .defaultSchema(ctx0.schema()) .traitDefs(traitDefs) - .context(ctx) + .context(ctx0) .build(); return new IgnitePlanner(cfg); @@ -145,16 +141,25 @@ public class CalciteQueryProcessor implements QueryEngine { * @param params Query parameters. * @return Query execution context. */ - Context context(@NotNull Context ctx, String query, Object[] params) { // Package private visibility for tests. - return Contexts.chain(ctx, config.getContext(), - Contexts.of( - new Query(query, params), - new RegistryImpl(kernalContext), - provided(ctx, SchemaPlus.class, schemaHolder::schema), - provided(ctx, AffinityTopologyVersion.class, this::readyAffinityVersion))); + PlannerContext context(@NotNull Context ctx, String query, Object[] params, BiFunction<Context, Query, PlannerContext> clo) { // Package private visibility for tests. + return clo.apply(Contexts.chain(ctx, config.getContext()), new Query(query, params)); + } + + private PlannerContext buildContext(@NotNull Context parent, @NotNull Query query) { + return PlannerContext.builder() + .logger(log) + .kernalContext(kernalContext) + .queryProcessor(this) + .parentContext(parent) + .query(query) + .schema(schemaHolder.schema()) + .topologyVersion(readyAffinityVersion()) + .distributionService(new TableDistributionServiceImpl(kernalContext)) + .mappingService(new MappingServiceImpl(kernalContext)) + .build(); } - private QueryExecution prepare(Context ctx) { + private QueryExecution prepare(PlannerContext ctx) { return new DistributedExecution(ctx); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java similarity index 68% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java index 71f8077..66c7a74 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java @@ -20,52 +20,30 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.UUID; -import java.util.function.ToIntFunction; -import org.apache.calcite.plan.Context; -import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; -import org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry; -import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry; +import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; -import org.apache.ignite.internal.processors.query.calcite.trait.AbstractDestinationFunctionFactory; -import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction; -import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; -import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; -import org.apache.ignite.internal.processors.query.calcite.type.RowType; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping.DEDUPLICATED; /** * */ -public class RegistryImpl implements DistributionRegistry, LocationRegistry { +public class MappingServiceImpl implements MappingService { private final GridKernalContext ctx; - public RegistryImpl(GridKernalContext ctx) { + public MappingServiceImpl(GridKernalContext ctx) { this.ctx = ctx; } - @Override public DistributionTrait distribution(int cacheId, RowType rowType) { - CacheGroupContext grp = ctx.cache().context().cacheContext(cacheId).group(); - - if (grp.isReplicated()) - return IgniteDistributions.broadcast(); - - Object key = grp.affinity().similarAffinityKey(); - - return IgniteDistributions.hash(rowType.distributionKeys(), new AffinityFactory(cacheId, key)); - } - @Override public NodesMapping local() { return new NodesMapping(Collections.singletonList(ctx.discovery().localNode().id()), null, DEDUPLICATED); } @@ -153,35 +131,4 @@ public class RegistryImpl implements DistributionRegistry, LocationRegistry { } return true; } - - private final static class AffinityFactory extends AbstractDestinationFunctionFactory { - private final int cacheId; - private final Object key; - - AffinityFactory(int cacheId, Object key) { - this.cacheId = cacheId; - this.key = key; - } - - @Override public DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys) { - assert keys.size() == 1 && mapping != null && !F.isEmpty(mapping.assignments()); - - List<List<UUID>> assignments = mapping.assignments(); - - if (U.assertionsEnabled()) { - for (List<UUID> assignment : assignments) { - assert F.isEmpty(assignment) || assignment.size() == 1; - } - } - - ToIntFunction<Object> rowToPart = ctx.unwrap(GridKernalContext.class) - .cache().context().cacheContext(cacheId).affinity()::partition; - - return row -> assignments.get(rowToPart.applyAsInt(((Object[]) row)[keys.getInt(0)])); - } - - @Override public Object key() { - return key; - } - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java new file mode 100644 index 0000000..20da342 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java @@ -0,0 +1,87 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.cluster; + +import java.util.List; +import java.util.UUID; +import java.util.function.ToIntFunction; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; +import org.apache.ignite.internal.processors.query.calcite.trait.AbstractDestinationFunctionFactory; +import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.type.RowType; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * + */ +public class TableDistributionServiceImpl implements TableDistributionService { + private final GridKernalContext ctx; + + public TableDistributionServiceImpl(GridKernalContext ctx) { + this.ctx = ctx; + } + + @Override public DistributionTrait distribution(int cacheId, RowType rowType) { + CacheGroupContext grp = ctx.cache().context().cacheContext(cacheId).group(); + + if (grp.isReplicated()) + return IgniteDistributions.broadcast(); + + Object key = grp.affinity().similarAffinityKey(); + + return IgniteDistributions.hash(rowType.distributionKeys(), new AffinityFactory(cacheId, key)); + } + + private final static class AffinityFactory extends AbstractDestinationFunctionFactory { + private final int cacheId; + private final Object key; + + AffinityFactory(int cacheId, Object key) { + this.cacheId = cacheId; + this.key = key; + } + + @Override public DestinationFunction create(PlannerContext ctx, NodesMapping mapping, ImmutableIntList keys) { + assert keys.size() == 1 && mapping != null && !F.isEmpty(mapping.assignments()); + + List<List<UUID>> assignments = mapping.assignments(); + + if (U.assertionsEnabled()) { + for (List<UUID> assignment : assignments) { + assert F.isEmpty(assignment) || assignment.size() == 1; + } + } + + ToIntFunction<Object> rowToPart = ctx.kernalContext() + .cache().context().cacheContext(cacheId).affinity()::partition; + + return row -> assignments.get(rowToPart.applyAsInt(((Object[]) row)[keys.getInt(0)])); + } + + @Override public Object key() { + return key; + } + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java similarity index 96% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java index 788011a..692b4fa 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java @@ -23,7 +23,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; /** * */ -public interface ExchangeService { +public interface ExchangeProcessor { void register(Outbox outbox); void unregister(Outbox outbox); void send(GridCacheVersion queryId, long exchangeId, UUID nodeId, int batchId, List<?> rows); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java index 47351e2..b1cf688 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java @@ -43,7 +43,7 @@ public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, Sink<T> private final Collection<UUID> targets; private final DestinationFunction function; - private ExchangeService srvc; + private ExchangeProcessor srvc; protected Outbox(GridCacheVersion queryId, long exchangeId, Collection<UUID> targets, DestinationFunction function) { super(Sink.noOp()); @@ -54,6 +54,14 @@ public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, Sink<T> this.function = function; } + public void init(ExchangeProcessor srvc) { + this.srvc = srvc; + + srvc.register(this); + + signal(); + } + public void acknowledge(UUID nodeId, int batchId) { perNode.get(nodeId).acknowledge(batchId); } @@ -91,14 +99,6 @@ public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, Sink<T> return true; } - public void init(ExchangeService srvc) { - this.srvc = srvc; - - srvc.register(this); - - signal(); - } - @Override public void end() { for (UUID node : targets) perNode.computeIfAbsent(node, Destination::new).end(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java similarity index 96% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java index bf62302..9472cd1 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java @@ -21,7 +21,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; /** * */ -public interface LocationRegistry { +public interface MappingService { NodesMapping local(); // returns local node with single partition NodesMapping random(AffinityTopologyVersion topVer); // returns random distribution, partitions count depends on nodes count NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer); // returns cache distribution diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java similarity index 95% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java index 3a20908..ad710aa 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java @@ -22,6 +22,6 @@ import org.apache.ignite.internal.processors.query.calcite.type.RowType; /** * */ -public interface DistributionRegistry { +public interface TableDistributionService { DistributionTrait distribution(int cacheId, RowType rowType); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java index 49b57ad..06aaa024 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java @@ -20,18 +20,15 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; import java.util.Map; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.linq4j.QueryProvider; -import org.apache.calcite.plan.Context; import org.apache.calcite.schema.SchemaPlus; -import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor; /** * */ class DataContextImpl implements DataContext { /** */ - private final JavaTypeFactoryImpl typeFactory; + private final JavaTypeFactory typeFactory; /** */ private final SchemaPlus schema; @@ -46,10 +43,11 @@ class DataContextImpl implements DataContext { * @param params Parameters. * @param ctx Query context. */ - DataContextImpl(Map<String, Object> params, Context ctx) { - typeFactory = new JavaTypeFactoryImpl(ctx.unwrap(CalciteQueryProcessor.class).config().getTypeSystem()); - schema = ctx.unwrap(SchemaPlus.class); - queryProvider = ctx.unwrap(QueryProvider.class); + DataContextImpl(Map<String, Object> params, PlannerContext ctx) { + typeFactory = ctx.typeFactory(); + schema = ctx.schema(); + queryProvider = ctx.queryProvider(); + this.params = params; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java index fdaee5a..192e686 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java @@ -19,9 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; import java.util.Arrays; import java.util.List; -import java.util.Objects; import org.apache.calcite.linq4j.Linq4j; -import org.apache.calcite.plan.Context; import org.apache.calcite.plan.ConventionTraitDef; import org.apache.calcite.plan.RelTraitDef; import org.apache.calcite.plan.RelTraitSet; @@ -47,19 +45,19 @@ import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryC */ public class DistributedExecution implements QueryExecution { /** */ - private final Context ctx; + private final PlannerContext ctx; /** * @param ctx Query context. */ - public DistributedExecution(Context ctx) { + public DistributedExecution(PlannerContext ctx) { this.ctx = ctx; } /** {@inheritDoc} */ @Override public FieldsQueryCursor<List<?>> execute() { - CalciteQueryProcessor proc = Objects.requireNonNull(ctx.unwrap(CalciteQueryProcessor.class)); - Query query = Objects.requireNonNull(ctx.unwrap(Query.class)); + CalciteQueryProcessor proc = ctx.queryProcessor(); + Query query = ctx.query(); RelTraitDef[] traitDefs = { RelDistributionTraitDef.INSTANCE, diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java index 2462785..2506de8 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java @@ -80,6 +80,7 @@ import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan; import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter; import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; /** * @@ -124,6 +125,8 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander { .typeSystem(RelDataTypeSystem.class, IgniteTypeSystem.DEFAULT); typeFactory = new IgniteTypeFactory(typeSystem); + + Commons.plannerContext(context).planner(this); } private CalciteConnectionConfig connConfig() { @@ -302,7 +305,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander { ready(); RelTraitSet toTraits = targetTraits.simplify(); - RuleSet rules = plannerPhase.getRules(context); + RuleSet rules = plannerPhase.getRules(Commons.plannerContext(context)); input.accept(new MetaDataProviderModifier(metadataProvider)); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java new file mode 100644 index 0000000..dc20880 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java @@ -0,0 +1,213 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.prepare; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.plan.Context; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor; +import org.apache.ignite.internal.processors.query.calcite.exchange.ExchangeProcessor; +import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService; +import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; +import org.apache.ignite.internal.processors.query.calcite.type.RowType; + +/** + * + */ +public final class PlannerContext implements Context { + private final Context parentContext; + private final Query query; + private final AffinityTopologyVersion topologyVersion; + private final SchemaPlus schema; + private final IgniteLogger logger; + private final GridKernalContext kernalContext; + private final CalciteQueryProcessor queryProcessor; + private final MappingService mappingService; + private final TableDistributionService distributionService; + private final ExchangeProcessor exchangeProcessor; + + private IgnitePlanner planner; + + private PlannerContext(Context parentContext, Query query, AffinityTopologyVersion topologyVersion, + SchemaPlus schema, IgniteLogger logger, GridKernalContext kernalContext, CalciteQueryProcessor queryProcessor, MappingService mappingService, + TableDistributionService distributionService, ExchangeProcessor exchangeProcessor) { + this.parentContext = parentContext; + this.query = query; + this.topologyVersion = topologyVersion; + this.schema = schema; + this.logger = logger; + this.kernalContext = kernalContext; + this.queryProcessor = queryProcessor; + this.mappingService = mappingService; + this.distributionService = distributionService; + this.exchangeProcessor = exchangeProcessor; + } + + public Query query() { + return query; + } + + public AffinityTopologyVersion topologyVersion() { + return topologyVersion; + } + + public SchemaPlus schema() { + return schema; + } + + public IgniteLogger logger() { + return logger; + } + + public GridKernalContext kernalContext() { + return kernalContext; + } + + public CalciteQueryProcessor queryProcessor() { + return queryProcessor; + } + + void planner(IgnitePlanner planner) { + this.planner = planner; + } + + public IgnitePlanner planner() { + return planner; + } + + public MappingService mappingService() { + return mappingService; + } + + public TableDistributionService distributionService() { + return distributionService; + } + + public ExchangeProcessor exchangeProcessor() { + return exchangeProcessor; + } + + // Helper methods + + public JavaTypeFactory typeFactory() { + return planner.getTypeFactory(); + } + + public NodesMapping mapForLocal() { + return mappingService.local(); + } + + public NodesMapping mapForRandom(AffinityTopologyVersion topVer) { + return mappingService.random(topVer); + } + + public NodesMapping mapForCache(int cacheId, AffinityTopologyVersion topVer) { + return mappingService.distributed(cacheId, topVer); + } + + public DistributionTrait distributionTrait(int cacheId, RowType rowType) { + return distributionService.distribution(cacheId, rowType); + } + + public QueryProvider queryProvider() { + return null; // TODO + } + + @Override public <C> C unwrap(Class<C> aClass) { + if (aClass == getClass()) + return aClass.cast(this); + + return parentContext.unwrap(aClass); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Context parentContext; + private Query query; + private AffinityTopologyVersion topologyVersion; + private SchemaPlus schema; + private IgniteLogger logger; + private GridKernalContext kernalContext; + private CalciteQueryProcessor queryProcessor; + private MappingService mappingService; + private TableDistributionService distributionService; + private ExchangeProcessor exchangeProcessor; + + public Builder parentContext(Context parentContext) { + this.parentContext = parentContext; + return this; + } + + public Builder query(Query query) { + this.query = query; + return this; + } + + public Builder topologyVersion(AffinityTopologyVersion topologyVersion) { + this.topologyVersion = topologyVersion; + return this; + } + + public Builder schema(SchemaPlus schema) { + this.schema = schema; + return this; + } + + public Builder logger(IgniteLogger logger) { + this.logger = logger; + return this; + } + + public Builder kernalContext(GridKernalContext kernalContext) { + this.kernalContext = kernalContext; + return this; + } + + public Builder queryProcessor(CalciteQueryProcessor queryProcessor) { + this.queryProcessor = queryProcessor; + return this; + } + + public Builder mappingService(MappingService mappingService) { + this.mappingService = mappingService; + return this; + } + + public Builder distributionService(TableDistributionService distributionService) { + this.distributionService = distributionService; + return this; + } + + public Builder exchangeProcessor(ExchangeProcessor exchangeProcessor) { + this.exchangeProcessor = exchangeProcessor; + return this; + } + + public PlannerContext build() { + return new PlannerContext(parentContext, query, topologyVersion, schema, logger, kernalContext, queryProcessor, mappingService, distributionService, exchangeProcessor); + } + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java index 12b7b99..c529aee 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java @@ -24,6 +24,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.TableScan; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo; import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor; public final class IgniteTableScan extends TableScan implements IgniteRel { @@ -44,6 +45,6 @@ public final class IgniteTableScan extends TableScan implements IgniteRel { public FragmentInfo fragmentInfo() { return getTable().unwrap(IgniteTable.class) - .fragmentInfo(getCluster().getPlanner().getContext()); + .fragmentInfo(Commons.plannerContext(getCluster().getPlanner().getContext())); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java index d089339..cf514d5 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.query.calcite.rule; import com.google.common.collect.ImmutableList; import java.util.List; -import org.apache.calcite.plan.Context; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.volcano.AbstractConverter; import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule; @@ -62,6 +61,7 @@ import org.apache.calcite.rel.rules.UnionMergeRule; import org.apache.calcite.rel.rules.UnionPullUpConstantsRule; import org.apache.calcite.rel.rules.UnionToDistinctRule; import org.apache.calcite.rel.rules.ValuesReduceRule; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; /** * @@ -149,7 +149,7 @@ public class IgniteRules { IgniteProjectRule.INSTANCE, IgniteJoinRule.INSTANCE); - public static List<RelOptRule> logicalRules(Context ctx) { + public static List<RelOptRule> logicalRules(PlannerContext ctx) { return ImmutableList.<RelOptRule>builder() .addAll(BASE_RULES) .addAll(ABSTRACT_RULES) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java index aa82187..24938a1 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java @@ -17,9 +17,9 @@ package org.apache.ignite.internal.processors.query.calcite.rule; -import org.apache.calcite.plan.Context; import org.apache.calcite.tools.RuleSet; import org.apache.calcite.tools.RuleSets; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; /** * @@ -27,14 +27,14 @@ import org.apache.calcite.tools.RuleSets; public enum PlannerPhase { /** */ SUBQUERY_REWRITE("Sub-queries rewrites") { - @Override public RuleSet getRules(Context ctx) { + @Override public RuleSet getRules(PlannerContext ctx) { return RuleSets.ofList(IgniteRules.SUBQUERY_REWRITE_RULES); } }, /** */ LOGICAL("Logical planning") { - @Override public RuleSet getRules(Context ctx) { + @Override public RuleSet getRules(PlannerContext ctx) { return RuleSets.ofList(IgniteRules.logicalRules(ctx)); } }; @@ -45,5 +45,5 @@ public enum PlannerPhase { this.description = description; } - public abstract RuleSet getRules(Context ctx); + public abstract RuleSet getRules(PlannerContext ctx); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java index b14ea3b..805c455 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query.calcite.schema; -import org.apache.calcite.plan.Context; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; @@ -26,15 +25,14 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.TranslatableTable; import org.apache.calcite.schema.impl.AbstractTable; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo; -import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.type.RowType; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.typedef.internal.CU; /** */ @@ -71,30 +69,19 @@ public class IgniteTable extends AbstractTable implements TranslatableTable { /** {@inheritDoc} */ @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { RelOptCluster cluster = context.getCluster(); + PlannerContext ctx = Commons.plannerContext(cluster.getPlanner().getContext()); RelTraitSet traitSet = cluster.traitSet().replace(IgniteRel.IGNITE_CONVENTION) - .replaceIf(DistributionTraitDef.INSTANCE, () -> distributionTrait(cluster.getPlanner().getContext())); + .replaceIf(DistributionTraitDef.INSTANCE, () -> distributionTrait(ctx)); return new IgniteTableScan(cluster, traitSet, relOptTable); } - public DistributionTrait distributionTrait(Context context) { - return distributionRegistry(context).distribution(CU.cacheId(cacheName), rowType); + public DistributionTrait distributionTrait(PlannerContext context) { + return Commons.plannerContext(context).distributionTrait(CU.cacheId(cacheName), rowType); } - public FragmentInfo fragmentInfo(Context ctx) { - int cacheId = CU.cacheId(cacheName); + public FragmentInfo fragmentInfo(PlannerContext ctx) { + PlannerContext ctx0 = Commons.plannerContext(ctx); - return new FragmentInfo(locationRegistry(ctx).distributed(cacheId, topologyVersion(ctx))); - } - - private LocationRegistry locationRegistry(Context ctx) { - return ctx.unwrap(LocationRegistry.class); - } - - public DistributionRegistry distributionRegistry(Context ctx) { - return ctx.unwrap(DistributionRegistry.class); - } - - private AffinityTopologyVersion topologyVersion(Context ctx) { - return ctx.unwrap(AffinityTopologyVersion.class); + return new FragmentInfo(ctx0.mapForCache(CU.cacheId(cacheName), ctx0.topologyVersion())); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java index d12c73a..45c4591 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java @@ -17,17 +17,14 @@ package org.apache.ignite.internal.processors.query.calcite.splitter; import com.google.common.collect.ImmutableList; -import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; -import org.apache.calcite.plan.Context; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.Pair; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentInfo; -import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; import org.apache.ignite.internal.processors.query.calcite.rel.Receiver; import org.apache.ignite.internal.processors.query.calcite.rel.Sender; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; @@ -49,11 +46,12 @@ public class Fragment implements Source { this.root = root; } - public void init(Context ctx, RelMetadataQuery mq) { + public void init(PlannerContext ctx, RelMetadataQuery mq) { + FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(root, mq); if (info.mapping() == null) - mapping = remote() ? registry(ctx).random(topologyVersion(ctx)) : registry(ctx).local(); + mapping = remote() ? ctx.mapForRandom(ctx.topologyVersion()) : ctx.mapForLocal(); else mapping = info.mapping().deduplicate(); @@ -73,7 +71,7 @@ public class Fragment implements Source { return exchangeId; } - @Override public void init(NodesMapping mapping, DistributionTrait distribution, Context ctx, RelMetadataQuery mq) { + @Override public void init(NodesMapping mapping, DistributionTrait distribution, PlannerContext ctx, RelMetadataQuery mq) { assert remote(); ((Sender) root).init(new TargetImpl(exchangeId, mapping, distribution)); @@ -92,12 +90,4 @@ public class Fragment implements Source { private boolean remote() { return root instanceof Sender; } - - private LocationRegistry registry(Context ctx) { - return Objects.requireNonNull(ctx.unwrap(LocationRegistry.class)); - } - - private AffinityTopologyVersion topologyVersion(Context ctx) { - return Objects.requireNonNull(ctx.unwrap(AffinityTopologyVersion.class)); - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java index c03879b..b0369bd 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java @@ -17,13 +17,13 @@ package org.apache.ignite.internal.processors.query.calcite.splitter; import java.util.List; -import org.apache.calcite.plan.Context; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException; import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; import org.apache.ignite.internal.processors.query.calcite.rel.Receiver; import org.apache.ignite.internal.processors.query.calcite.rel.Sender; import org.apache.ignite.internal.processors.query.calcite.util.Edge; @@ -39,7 +39,7 @@ public class QueryPlan { this.fragments = fragments; } - public void init(Context ctx) { + public void init(PlannerContext ctx) { int i = 0; RelMetadataQueryEx mq = RelMetadataQueryEx.instance(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java index 773dee1..e0108b7 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java @@ -16,9 +16,9 @@ package org.apache.ignite.internal.processors.query.calcite.splitter; -import org.apache.calcite.plan.Context; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; /** @@ -41,7 +41,7 @@ public interface Source { * @param ctx Context. * @param mq Metadata query instance. */ - default void init(NodesMapping mapping, DistributionTrait distribution, Context ctx, RelMetadataQuery mq) { + default void init(NodesMapping mapping, DistributionTrait distribution, PlannerContext ctx, RelMetadataQuery mq) { // No-op. } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java index 2dcfe33..30ce8b8 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java @@ -19,9 +19,9 @@ package org.apache.ignite.internal.processors.query.calcite.trait; import java.io.ObjectStreamException; import java.util.List; import java.util.UUID; -import org.apache.calcite.plan.Context; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; /** * @@ -29,7 +29,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping public final class AllTargetsFactory extends AbstractDestinationFunctionFactory { public static final DestinationFunctionFactory INSTANCE = new AllTargetsFactory(); - @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) { + @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) { List<UUID> nodes = m.nodes(); return r -> nodes; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java index 587c172..d12ec15 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java @@ -17,15 +17,15 @@ package org.apache.ignite.internal.processors.query.calcite.trait; import java.io.Serializable; -import org.apache.calcite.plan.Context; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; /** * */ public interface DestinationFunctionFactory extends Serializable { - DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys); + DestinationFunction create(PlannerContext ctx, NodesMapping mapping, ImmutableIntList keys); Object key(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java index 3c2420b..863f93c 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java @@ -20,9 +20,9 @@ import java.io.ObjectStreamException; import java.util.List; import java.util.UUID; import java.util.function.ToIntFunction; -import org.apache.calcite.plan.Context; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -32,7 +32,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; public final class HashFunctionFactory extends AbstractDestinationFunctionFactory { public static final DestinationFunctionFactory INSTANCE = new HashFunctionFactory(); - @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) { + @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) { assert m != null && !F.isEmpty(m.assignments()); int[] fields = k.toIntArray(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java index d8495b6..7086277 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java @@ -17,9 +17,9 @@ package org.apache.ignite.internal.processors.query.calcite.trait; import java.io.ObjectStreamException; -import org.apache.calcite.plan.Context; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; /** * @@ -27,7 +27,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping public final class NoOpFactory extends AbstractDestinationFunctionFactory { public static final DestinationFunctionFactory INSTANCE = new NoOpFactory(); - @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) { + @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) { return null; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java index 78013d3..8a643c3 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java @@ -21,9 +21,9 @@ import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; -import org.apache.calcite.plan.Context; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; /** * @@ -31,7 +31,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping public final class RandomTargetFactory extends AbstractDestinationFunctionFactory { public static final DestinationFunctionFactory INSTANCE = new RandomTargetFactory(); - @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) { + @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) { List<UUID> nodes = m.nodes(); return r -> Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()))); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java index 4d631fc..8a8b27c 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java @@ -21,9 +21,9 @@ import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.UUID; -import org.apache.calcite.plan.Context; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; import org.apache.ignite.internal.util.typedef.F; /** @@ -32,7 +32,7 @@ import org.apache.ignite.internal.util.typedef.F; public final class SingleTargetFactory extends AbstractDestinationFunctionFactory { public static final DestinationFunctionFactory INSTANCE = new SingleTargetFactory(); - @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) { + @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) { List<UUID> nodes = Collections.singletonList(Objects.requireNonNull(F.first(m.nodes()))); return r -> nodes; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java index c6d6a6c..3a461cf 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java @@ -28,7 +28,6 @@ import java.util.Objects; import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; @@ -42,11 +41,11 @@ import org.apache.calcite.rel.RelNode; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.QueryContext; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.type.RowType; import org.apache.ignite.internal.util.typedef.F; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; /** * @@ -58,24 +57,6 @@ public final class Commons { return ctx == null ? Contexts.empty() : Contexts.of(ctx.unwrap(Object[].class)); } - public static <T> @Nullable T provided(Context ctx, Class<T> paramType, Supplier<T> paramSrc) { - T param = ctx.unwrap(paramType); - - if (param != null) - return null; // Provided by parent context. - - return paramSrc.get(); - } - - public static <T> T contextParam(Context ctx, Class<T> paramType, Supplier<T> paramSrc) { - T param = ctx.unwrap(paramType); - - if (param != null) - return param; - - return paramSrc.get(); - } - /** */ public static RowType rowType(GridQueryTypeDescriptor desc) { RowType.Builder b = RowType.builder(); @@ -189,4 +170,8 @@ public final class Commons { return set; } + + public static PlannerContext plannerContext(Context ctx) { + return Objects.requireNonNull(ctx.unwrap(PlannerContext.class)); + } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java index 6c04c1e..644633b 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.ConventionTraitDef; @@ -35,10 +34,11 @@ import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.tools.Frameworks; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry; -import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry; +import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService; import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; import org.apache.ignite.internal.processors.query.calcite.prepare.Query; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase; @@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTra import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; import org.apache.ignite.internal.processors.query.calcite.type.RowType; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.testframework.junits.GridTestKernalContext; @@ -68,18 +69,17 @@ import org.junit.Test; //@WithSystemProperty(key = "calcite.debug", value = "true") public class CalciteQueryProcessorTest extends GridCommonAbstractTest { + private static GridTestKernalContext kernalContext; private static CalciteQueryProcessor proc; private static SchemaPlus schema; - - private static TestRegistry registry; private static List<UUID> nodes; @BeforeClass public static void setupClass() { + kernalContext = new GridTestKernalContext(log); proc = new CalciteQueryProcessor(); - proc.setLogger(log); - proc.start(new GridTestKernalContext(log)); + proc.start(kernalContext); IgniteSchema publicSchema = new IgniteSchema("PUBLIC"); @@ -123,8 +123,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { for (int i = 0; i < 4; i++) { nodes.add(UUID.randomUUID()); } - - registry = new TestRegistry(); } @Test @@ -136,7 +134,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.projectId = p.id0 + 1" + "WHERE (d.projectId + 1) > ?"; - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context); assertNotNull(ctx); @@ -149,7 +147,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ assertNotNull(planner); - Query query = ctx.unwrap(Query.class); + Query query = Commons.plannerContext(ctx).query(); assertNotNull(query); @@ -175,7 +173,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.projectId = p.id0 " + "WHERE (d.projectId + 1) > ?"; - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context); assertNotNull(ctx); @@ -188,7 +186,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ assertNotNull(planner); - Query query = ctx.unwrap(Query.class); + Query query = Commons.plannerContext(ctx).query(); assertNotNull(query); @@ -212,7 +210,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { String sql = "SELECT d.id, (SELECT p.name FROM Project p WHERE p.id = d.id) name, d.projectId " + "FROM Developer d"; - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context); assertNotNull(ctx); @@ -225,7 +223,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ assertNotNull(planner); - Query query = ctx.unwrap(Query.class); + Query query = Commons.plannerContext(ctx).query(); assertNotNull(query); @@ -251,7 +249,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.projectId = p.id0 " + "WHERE (d.projectId + 1) > ?"; - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context); assertNotNull(ctx); @@ -264,7 +262,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ assertNotNull(planner); - Query query = ctx.unwrap(Query.class); + Query query = Commons.plannerContext(ctx).query(); assertNotNull(query); @@ -297,7 +295,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.projectId = p.id0 " + "WHERE (d.projectId + 1) > ?"; - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context); assertNotNull(ctx); @@ -311,7 +309,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ assertNotNull(planner); - Query query = ctx.unwrap(Query.class); + Query query = Commons.plannerContext(ctx).query(); assertNotNull(query); @@ -357,7 +355,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.id = p.id0 " + "WHERE (d.projectId + 1) > ?"; - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context); assertNotNull(ctx); @@ -371,7 +369,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ assertNotNull(planner); - Query query = ctx.unwrap(Query.class); + Query query = Commons.plannerContext(ctx).query(); assertNotNull(planner); @@ -433,7 +431,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.id = p.id0 " + "WHERE (d.projectId + 1) > ?"; - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context); assertNotNull(ctx); @@ -447,7 +445,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ assertNotNull(planner); - Query query = ctx.unwrap(Query.class); + Query query = Commons.plannerContext(ctx).query(); assertNotNull(planner); @@ -497,10 +495,20 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.id = p.id0 " + "WHERE (d.projectId + 1) > ?"; - TestRegistry registry = new TestRegistry(){ + TableDistributionService ds = new TableDistributionService(){ @Override public DistributionTrait distribution(int cacheId, RowType rowType) { return IgniteDistributions.broadcast(); } + }; + + MappingService ms = new MappingService() { + @Override public NodesMapping random(AffinityTopologyVersion topVer) { + return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0); + } + + @Override public NodesMapping local() { + return new NodesMapping(select(nodes, 0), null, (byte) 0); + } @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) { if (cacheId == CU.cacheId("Developer")) @@ -512,9 +520,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { } }; - - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); - + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds)); assertNotNull(ctx); RelTraitDef[] traitDefs = { @@ -527,7 +533,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ assertNotNull(planner); - Query query = ctx.unwrap(Query.class); + Query query = Commons.plannerContext(ctx).query(); assertNotNull(planner); @@ -577,13 +583,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.id = p.id0 " + "WHERE (d.projectId + 1) > ?"; - TestRegistry registry = new TestRegistry(){ + TableDistributionService ds = new TableDistributionService(){ @Override public DistributionTrait distribution(int cacheId, RowType rowType) { if (cacheId == CU.cacheId("Project")) return IgniteDistributions.broadcast(); return IgniteDistributions.hash(rowType.distributionKeys()); } + }; + + MappingService ms = new MappingService() { + @Override public NodesMapping random(AffinityTopologyVersion topVer) { + return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0); + } + + @Override public NodesMapping local() { + return new NodesMapping(select(nodes, 0), null, (byte) 0); + } @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) { if (cacheId == CU.cacheId("Developer")) @@ -601,7 +617,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { } }; - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds)); assertNotNull(ctx); @@ -615,7 +631,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ assertNotNull(planner); - Query query = ctx.unwrap(Query.class); + Query query = Commons.plannerContext(ctx).query(); assertNotNull(planner); @@ -665,13 +681,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.projectId = p.id0 " + "WHERE (d.projectId + 1) > ?"; - TestRegistry registry = new TestRegistry(){ + TableDistributionService ds = new TableDistributionService(){ @Override public DistributionTrait distribution(int cacheId, RowType rowType) { if (cacheId == CU.cacheId("Project")) return IgniteDistributions.broadcast(); return IgniteDistributions.hash(rowType.distributionKeys()); } + }; + + MappingService ms = new MappingService() { + @Override public NodesMapping random(AffinityTopologyVersion topVer) { + return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0); + } + + @Override public NodesMapping local() { + return new NodesMapping(select(nodes, 0), null, (byte) 0); + } @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) { if (cacheId == CU.cacheId("Developer")) @@ -689,7 +715,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { } }; - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds)); assertNotNull(ctx); @@ -703,7 +729,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ assertNotNull(planner); - Query query = ctx.unwrap(Query.class); + Query query = Commons.plannerContext(ctx).query(); assertNotNull(planner); @@ -753,10 +779,20 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.projectId = p.ver0 " + "WHERE (d.projectId + 1) > ?"; - TestRegistry registry = new TestRegistry(){ + TableDistributionService ds = new TableDistributionService(){ @Override public DistributionTrait distribution(int cacheId, RowType rowType) { return IgniteDistributions.broadcast(); } + }; + + MappingService ms = new MappingService() { + @Override public NodesMapping random(AffinityTopologyVersion topVer) { + return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0); + } + + @Override public NodesMapping local() { + return new NodesMapping(select(nodes, 0), null, (byte) 0); + } @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) { if (cacheId == CU.cacheId("Developer")) @@ -769,7 +805,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { } }; - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds)); assertNotNull(ctx); @@ -783,7 +819,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ assertNotNull(planner); - Query query = ctx.unwrap(Query.class); + Query query = Commons.plannerContext(ctx).query(); assertNotNull(planner); @@ -834,13 +870,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "WHERE (d.projectId + 1) > ?"; - TestRegistry registry = new TestRegistry(){ + TableDistributionService ds = new TableDistributionService(){ @Override public DistributionTrait distribution(int cacheId, RowType rowType) { if (cacheId == CU.cacheId("Project")) return IgniteDistributions.broadcast(); return IgniteDistributions.hash(rowType.distributionKeys()); } + }; + + MappingService ms = new MappingService() { + @Override public NodesMapping random(AffinityTopologyVersion topVer) { + return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0); + } + + @Override public NodesMapping local() { + return new NodesMapping(select(nodes, 0), null, (byte) 0); + } @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) { if (cacheId == CU.cacheId("Developer")) @@ -858,7 +904,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { } }; - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds)); assertNotNull(ctx); @@ -872,7 +918,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ assertNotNull(planner); - Query query = ctx.unwrap(Query.class); + Query query = Commons.plannerContext(ctx).query(); assertNotNull(planner); @@ -923,13 +969,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "WHERE (d.projectId + 1) > ?"; - TestRegistry registry = new TestRegistry(){ + TableDistributionService ds = new TableDistributionService() { @Override public DistributionTrait distribution(int cacheId, RowType rowType) { if (cacheId == CU.cacheId("Project")) return IgniteDistributions.broadcast(); return IgniteDistributions.hash(rowType.distributionKeys()); } + }; + + MappingService ms = new MappingService() { + @Override public NodesMapping random(AffinityTopologyVersion topVer) { + return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0); + } + + @Override public NodesMapping local() { + return new NodesMapping(select(nodes, 0), null, (byte) 0); + } @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) { if (cacheId == CU.cacheId("Developer")) @@ -946,7 +1002,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { throw new AssertionError("Unexpected cache id:" + cacheId); } }; - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); + + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds)); assertNotNull(ctx); @@ -960,7 +1017,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ assertNotNull(planner); - Query query = ctx.unwrap(Query.class); + Query query = Commons.plannerContext(ctx).query(); assertNotNull(planner); @@ -1011,40 +1068,58 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { return res; } - private static class TestRegistry implements LocationRegistry, DistributionRegistry { - private AtomicLong idGen = new AtomicLong(); + private PlannerContext context(Context c, Query q) { + MappingService ms = new MappingService() { + @Override public NodesMapping random(AffinityTopologyVersion topVer) { + return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0); + } - @Override public NodesMapping random(AffinityTopologyVersion topVer) { - return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0); - } + @Override public NodesMapping local() { + return new NodesMapping(select(nodes, 0), null, (byte) 0); + } - @Override public NodesMapping local() { - return new NodesMapping(select(nodes, 0), null, (byte) 0); - } + @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) { + if (cacheId == CU.cacheId("Developer")) + return new NodesMapping(null, Arrays.asList( + select(nodes, 0,1), + select(nodes, 1,2), + select(nodes, 2,0), + select(nodes, 0,1), + select(nodes, 1,2) + ), NodesMapping.HAS_PARTITIONED_CACHES); + if (cacheId == CU.cacheId("Project")) + return new NodesMapping(null, Arrays.asList( + select(nodes, 0,1), + select(nodes, 1,2), + select(nodes, 2,0), + select(nodes, 0,1), + select(nodes, 1,2) + ), NodesMapping.HAS_PARTITIONED_CACHES); - @Override public DistributionTrait distribution(int cacheId, RowType rowType) { - return IgniteDistributions.hash(rowType.distributionKeys()); - } + throw new AssertionError("Unexpected cache id:" + cacheId); + } + }; - @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) { - if (cacheId == CU.cacheId("Developer")) - return new NodesMapping(null, Arrays.asList( - select(nodes, 0,1), - select(nodes, 1,2), - select(nodes, 2,0), - select(nodes, 0,1), - select(nodes, 1,2) - ), NodesMapping.HAS_PARTITIONED_CACHES); - if (cacheId == CU.cacheId("Project")) - return new NodesMapping(null, Arrays.asList( - select(nodes, 0,1), - select(nodes, 1,2), - select(nodes, 2,0), - select(nodes, 0,1), - select(nodes, 1,2) - ), NodesMapping.HAS_PARTITIONED_CACHES); - - throw new AssertionError("Unexpected cache id:" + cacheId); - } + TableDistributionService ds = new TableDistributionService() { + @Override public DistributionTrait distribution(int cacheId, RowType rowType) { + return IgniteDistributions.hash(rowType.distributionKeys()); + } + }; + + return context(c, q, ms, ds); + } + + private PlannerContext context(Context parent, Query query, MappingService ms, TableDistributionService ds) { + return PlannerContext.builder() + .parentContext(parent) + .logger(log) + .kernalContext(kernalContext) + .queryProcessor(proc) + .query(query) + .schema(schema) + .topologyVersion(AffinityTopologyVersion.NONE) + .distributionService(ds) + .mappingService(ms) + .build(); } } \ No newline at end of file diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java index 89efa7c..a08142c 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java @@ -5,7 +5,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; -import org.apache.calcite.plan.Contexts; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.calcite.exec.Sink; @@ -38,7 +37,7 @@ public class OutboxTest extends GridCommonAbstractTest { NodesMapping mapping = new NodesMapping(Collections.singletonList(nodeId), null, NodesMapping.DEDUPLICATED); targets = mapping.nodes(); - func = SingleTargetFactory.INSTANCE.create(Contexts.empty(), mapping, ImmutableIntList.of()); + func = SingleTargetFactory.INSTANCE.create(null, mapping, ImmutableIntList.of()); } @@ -100,7 +99,7 @@ public class OutboxTest extends GridCommonAbstractTest { assertEquals(EndMarker.INSTANCE, F.last(exch.lastBatch)); } - private static class TestExchangeService implements ExchangeService { + private static class TestExchangeService implements ExchangeProcessor { private boolean registered; private boolean unregistered; private List<Integer> ids = new ArrayList<>();