This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch ignite-12248 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push: new c082b7b pending c082b7b is described below commit c082b7b14dae76455ca40c8a279598f373d7410a Author: Igor Seliverstov <gvvinbl...@gmail.com> AuthorDate: Tue Nov 19 20:04:59 2019 +0300 pending --- .../query/calcite/CalciteQueryProcessor.java | 4 +- .../query/calcite/cluster/RegistryImpl.java | 6 +- .../calcite/metadata/DistributionRegistry.java | 2 +- .../query/calcite/metadata/NodesMapping.java | 13 +-- .../query/calcite/prepare/IgnitePlanner.java | 21 +++-- .../query/calcite/rel/IgniteExchange.java | 4 +- .../processors/query/calcite/rel/IgniteFilter.java | 4 +- .../processors/query/calcite/rel/IgniteJoin.java | 4 +- .../query/calcite/rel/IgniteProject.java | 4 +- .../processors/query/calcite/rel/IgniteRel.java | 4 +- .../query/calcite/rel/IgniteTableScan.java | 4 +- .../processors/query/calcite/rel/Receiver.java | 4 +- .../processors/query/calcite/rel/Sender.java | 12 ++- .../query/calcite/rule/IgniteJoinRule.java | 4 +- .../query/calcite/schema/IgniteTable.java | 1 + .../query/calcite/serialize/CallExpression.java | 29 +++---- ...rializationContext.java => ExpImplementor.java} | 18 ++-- .../calcite/serialize/ExpToRexTranslator.java | 96 ++++++++++++++++++++++ .../{Expression.java => ExpressionType.java} | 13 ++- .../query/calcite/serialize/FieldType.java | 59 +++++++++++++ .../processors/query/calcite/serialize/Graph.java | 3 +- .../calcite/serialize/InputRefExpression.java | 14 ++-- .../query/calcite/serialize/LiteralExpression.java | 14 ++-- .../calcite/serialize/LocalRefExpression.java | 15 ++-- .../{Expression.java => LogicalExpression.java} | 7 +- .../{Expression.java => RelGraphNode.java} | 13 ++- .../calcite/serialize/RelToGraphConverter.java | 53 +++++++----- .../calcite/serialize/RexToExpTranslator.java | 52 ++++++------ .../SenderNode.java} | 18 ++-- .../query/calcite/serialize/StructType.java | 51 ++++++++++++ ...ava => AbstractDestinationFunctionFactory.java} | 32 +++----- ...FunctionFactory.java => AllTargetsFactory.java} | 16 +++- .../calcite/trait/DestinationFunctionFactory.java | 7 +- .../query/calcite/trait/DistributionTrait.java | 28 ++++--- .../query/calcite/trait/DistributionType.java | 4 +- .../query/calcite/trait/HashFunctionFactory.java | 67 +++++++++++++++ .../query/calcite/trait/IgniteDistributions.java | 82 ++---------------- ...nationFunctionFactory.java => NoOpFactory.java} | 12 ++- ...nctionFactory.java => RandomTargetFactory.java} | 19 ++++- ...nctionFactory.java => SingleTargetFactory.java} | 16 +++- .../IgniteTypeFactory.java} | 16 ++-- .../IgniteTypeSystem.java} | 11 +-- .../query/calcite/{schema => type}/RowType.java | 2 +- .../processors/query/calcite/util/Commons.java | 2 +- .../util/{Implementor.java => RelImplementor.java} | 2 +- .../query/calcite/CalciteQueryProcessorTest.java | 69 ++++++++++++++-- 46 files changed, 623 insertions(+), 308 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index f188665..760a71c 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -23,7 +23,6 @@ import org.apache.calcite.config.Lex; import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.RelTraitDef; -import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.fun.SqlLibrary; import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory; @@ -43,6 +42,7 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner import org.apache.ignite.internal.processors.query.calcite.prepare.Query; import org.apache.ignite.internal.processors.query.calcite.prepare.QueryExecution; import org.apache.ignite.internal.processors.query.calcite.schema.CalciteSchemaHolder; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.resources.LoggerResource; @@ -83,7 +83,7 @@ public class CalciteQueryProcessor implements QueryEngine { .context(Contexts.of(this)) // Custom cost factory to use during optimization .costFactory(null) - .typeSystem(RelDataTypeSystem.DEFAULT) + .typeSystem(IgniteTypeSystem.DEFAULT) .build(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java index 48c9618..95ad49f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java @@ -32,11 +32,11 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid import org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry; import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; -import org.apache.ignite.internal.processors.query.calcite.schema.RowType; +import org.apache.ignite.internal.processors.query.calcite.trait.AbstractDestinationFunctionFactory; import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction; -import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.type.RowType; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -141,7 +141,7 @@ public class RegistryImpl implements DistributionRegistry, LocationRegistry { return new NodesMapping(nodes, null, flags); } - private static class AffinityFactory implements DestinationFunctionFactory { + private static class AffinityFactory extends AbstractDestinationFunctionFactory { private final int cacheId; private final Object key; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java index 32cf357..3a20908 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java @@ -16,8 +16,8 @@ package org.apache.ignite.internal.processors.query.calcite.metadata; -import org.apache.ignite.internal.processors.query.calcite.schema.RowType; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; +import org.apache.ignite.internal.processors.query.calcite.type.RowType; /** * diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java index d40af2a..fdbed1d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java @@ -16,6 +16,7 @@ package org.apache.ignite.internal.processors.query.calcite.metadata; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -30,12 +31,12 @@ import org.apache.ignite.internal.util.typedef.internal.U; /** * */ -public class NodesMapping { - public static final byte HAS_MOVING_PARTITIONS = 0x1; - public static final byte HAS_REPLICATED_CACHES = 0x2; - public static final byte HAS_PARTITIONED_CACHES = 0x4; - public static final byte PARTIALLY_REPLICATED = 0x8; - public static final byte DEDUPLICATED = 0x16; +public class NodesMapping implements Serializable { + public static final byte HAS_MOVING_PARTITIONS = 1; + public static final byte HAS_REPLICATED_CACHES = 1 << 1; + public static final byte HAS_PARTITIONED_CACHES = 1 << 2; + public static final byte PARTIALLY_REPLICATED = 1 << 3; + public static final byte DEDUPLICATED = 1 << 4; private final List<ClusterNode> nodes; private final List<List<ClusterNode>> assignments; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java index 0f37a01..e7f4bb8 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java @@ -26,7 +26,6 @@ import org.apache.calcite.config.CalciteConnectionConfig; import org.apache.calcite.config.CalciteConnectionConfigImpl; import org.apache.calcite.config.CalciteConnectionProperty; import org.apache.calcite.jdbc.CalciteSchema; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.plan.Context; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCostImpl; @@ -73,6 +72,8 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetada import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase; import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType; import org.apache.ignite.internal.processors.query.calcite.serialize.Graph; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem; /** * @@ -114,9 +115,9 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander { connectionConfig = connConfig(); RelDataTypeSystem typeSystem = connectionConfig - .typeSystem(RelDataTypeSystem.class, RelDataTypeSystem.DEFAULT); + .typeSystem(RelDataTypeSystem.class, IgniteTypeSystem.DEFAULT); - typeFactory = new JavaTypeFactoryImpl(typeSystem); + typeFactory = new IgniteTypeFactory(typeSystem); } private CalciteConnectionConfig connConfig() { @@ -157,7 +158,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander { planner.setExecutor(executor); metadataProvider = new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER, planner); - validator = new IgniteSqlValidator(operatorTable, createCatalogReader(), typeFactory, conformance()); + validator = new IgniteSqlValidator(operatorTable(), createCatalogReader(), typeFactory, conformance()); validator.setIdentifierExpansion(true); for (RelTraitDef def : traitDefs) { @@ -247,7 +248,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander { SqlConformance conformance = conformance(); CalciteCatalogReader catalogReader = createCatalogReader().withSchemaPath(schemaPath); - SqlValidator validator = new IgniteSqlValidator(operatorTable, catalogReader, typeFactory, conformance); + SqlValidator validator = new IgniteSqlValidator(operatorTable(), catalogReader, typeFactory, conformance); validator.setIdentifierExpansion(true); RexBuilder rexBuilder = createRexBuilder(); @@ -327,15 +328,19 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander { return typeFactory; } - private SqlConformance conformance() { + public SqlConformance conformance() { return connectionConfig.conformance(); } - private RexBuilder createRexBuilder() { + public SqlOperatorTable operatorTable() { + return operatorTable; + } + + public RexBuilder createRexBuilder() { return new RexBuilder(typeFactory); } - private CalciteCatalogReader createCatalogReader() { + public CalciteCatalogReader createCatalogReader() { SchemaPlus rootSchema = rootSchema(defaultSchema); return new CalciteCatalogReader( diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java index 13f9b92..51be890 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java @@ -27,7 +27,7 @@ import org.apache.calcite.rel.SingleRel; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.Util; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; -import org.apache.ignite.internal.processors.query.calcite.util.Implementor; +import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor; /** * @@ -56,7 +56,7 @@ public final class IgniteExchange extends SingleRel implements IgniteRel { } /** {@inheritDoc} */ - @Override public <T> T implement(Implementor<T> implementor) { + @Override public <T> T implement(RelImplementor<T> implementor) { return implementor.implement(this); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java index 9e261d4..5a96897 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java @@ -29,7 +29,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; -import org.apache.ignite.internal.processors.query.calcite.util.Implementor; +import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor; public final class IgniteFilter extends Filter implements IgniteRel { private final Set<CorrelationId> variablesSet; @@ -50,7 +50,7 @@ public final class IgniteFilter extends Filter implements IgniteRel { } /** {@inheritDoc} */ - @Override public <T> T implement(Implementor<T> implementor) { + @Override public <T> T implement(RelImplementor<T> implementor) { return implementor.implement(this); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java index ad60afd..e576897 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java @@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.CorrelationId; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexNode; -import org.apache.ignite.internal.processors.query.calcite.util.Implementor; +import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor; public final class IgniteJoin extends Join implements IgniteRel { private final boolean semiJoinDone; @@ -49,7 +49,7 @@ public final class IgniteJoin extends Join implements IgniteRel { } /** {@inheritDoc} */ - @Override public <T> T implement(Implementor<T> implementor) { + @Override public <T> T implement(RelImplementor<T> implementor) { return implementor.implement(this); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java index 85c7029..fe0d55e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java @@ -28,7 +28,7 @@ import org.apache.calcite.rex.RexUtil; import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; -import org.apache.ignite.internal.processors.query.calcite.util.Implementor; +import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor; public final class IgniteProject extends Project implements IgniteRel { public IgniteProject( @@ -46,7 +46,7 @@ public final class IgniteProject extends Project implements IgniteRel { } /** {@inheritDoc} */ - @Override public <T> T implement(Implementor<T> implementor) { + @Override public <T> T implement(RelImplementor<T> implementor) { return implementor.implement(this); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java index 211dc20..dbfbb3f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.query.calcite.rel; import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; -import org.apache.ignite.internal.processors.query.calcite.util.Implementor; +import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor; /** * @@ -34,5 +34,5 @@ public interface IgniteRel extends RelNode { } }; - <T> T implement(Implementor<T> implementor); + <T> T implement(RelImplementor<T> implementor); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java index 536290d..12b7b99 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java @@ -24,7 +24,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.TableScan; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo; import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; -import org.apache.ignite.internal.processors.query.calcite.util.Implementor; +import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor; public final class IgniteTableScan extends TableScan implements IgniteRel { public IgniteTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { @@ -38,7 +38,7 @@ public final class IgniteTableScan extends TableScan implements IgniteRel { } /** {@inheritDoc} */ - @Override public <T> T implement(Implementor<T> implementor) { + @Override public <T> T implement(RelImplementor<T> implementor) { return implementor.implement(this); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java index 01a3dd7..47cd875 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java @@ -21,7 +21,7 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.AbstractRelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment; -import org.apache.ignite.internal.processors.query.calcite.util.Implementor; +import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor; /** * @@ -40,7 +40,7 @@ public final class Receiver extends AbstractRelNode implements IgniteRel { } /** {@inheritDoc} */ - @Override public <T> T implement(Implementor<T> implementor) { + @Override public <T> T implement(RelImplementor<T> implementor) { return implementor.implement(this); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java index 98a70d7..8e7c956 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java @@ -23,7 +23,7 @@ import org.apache.calcite.rel.SingleRel; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; -import org.apache.ignite.internal.processors.query.calcite.util.Implementor; +import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor; import org.jetbrains.annotations.NotNull; /** @@ -48,7 +48,7 @@ public final class Sender extends SingleRel implements IgniteRel { } /** {@inheritDoc} */ - @Override public <T> T implement(Implementor<T> implementor) { + @Override public <T> T implement(RelImplementor<T> implementor) { return implementor.implement(this); } @@ -56,6 +56,14 @@ public final class Sender extends SingleRel implements IgniteRel { targetMapping = mapping; } + public DistributionTrait targetDistribution() { + return targetDistr; + } + + public NodesMapping targetMapping() { + return targetMapping; + } + public DestinationFunction targetFunction(org.apache.calcite.plan.Context ctx) { return targetDistr.destinationFunctionFactory().create(ctx, targetMapping, targetDistr.keys()); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java index 9bd8527ee..3f380d3 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java @@ -74,10 +74,10 @@ public class IgniteJoinRule extends RelOptRule { } List<DistributionTrait> leftDists = Commons.concat(leftDerived, - IgniteDistributions.hash(join.analyzeCondition().leftKeys, IgniteDistributions.hashFunction())); + IgniteDistributions.hash(join.analyzeCondition().leftKeys)); List<DistributionTrait> rightDists = Commons.concat(rightDerived, - IgniteDistributions.hash(join.analyzeCondition().rightKeys, IgniteDistributions.hashFunction())); + IgniteDistributions.hash(join.analyzeCondition().rightKeys)); for (DistributionTrait leftDist0 : leftDists) { for (DistributionTrait rightDist0 : rightDists) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java index d298e40..25d1931 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; +import org.apache.ignite.internal.processors.query.calcite.type.RowType; import org.apache.ignite.internal.util.typedef.internal.CU; /** */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/CallExpression.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/CallExpression.java index f3e5bb9..384256a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/CallExpression.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/CallExpression.java @@ -16,34 +16,25 @@ package org.apache.ignite.internal.processors.query.calcite.serialize; -import java.util.ArrayList; import java.util.List; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSyntax; /** * */ -public class CallExpression implements Expression { - private final RelDataType type; - private final SqlOperator op; - private final List<Expression> operands; +public class CallExpression implements LogicalExpression { + public final String opName; + public final SqlSyntax opSyntax; + public final List<LogicalExpression> operands; - public CallExpression(RelDataType type, SqlOperator op, List<Expression> operands) { - this.type = type; - this.op = op; + public CallExpression(SqlOperator op, List<LogicalExpression> operands) { this.operands = operands; + opName = op.getName(); + opSyntax = op.getSyntax(); } - @Override public RexNode toRex(RexBuilder builder) { - ArrayList<RexNode> operands0 = new ArrayList<>(operands.size()); - - for (Expression operand : operands) { - operands0.add(operand.toRex(builder)); - } - - return builder.makeCall(type, op, operands0); + @Override public <T> T implement(ExpImplementor<T> implementor) { + return implementor.implement(this); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializationContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java similarity index 69% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializationContext.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java index d2fcb32..1c08bd9 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializationContext.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpImplementor.java @@ -16,17 +16,15 @@ package org.apache.ignite.internal.processors.query.calcite.serialize; -import org.apache.calcite.plan.Context; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptSchema; -import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner; - /** * */ -class SerializationContext { - RelOptCluster cluster; - RelOptSchema schema; - IgnitePlanner planner; - Context ctx; +public interface ExpImplementor<T> { + T implement(CallExpression callExpression); + + T implement(InputRefExpression inputRefExpression); + + T implement(LiteralExpression literalExpression); + + T implement(LocalRefExpression localRefExpression); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java new file mode 100644 index 0000000..820043f --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpToRexTranslator.java @@ -0,0 +1,96 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.serialize; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.SqlSyntax; +import org.apache.calcite.util.Pair; +import org.apache.ignite.internal.util.typedef.F; + +/** + * + */ +public class ExpToRexTranslator implements ExpImplementor<RexNode> { + private final RexBuilder builder; + private final RelDataTypeFactory typeFactory; + private final Map<Pair<String, SqlSyntax>, SqlOperator> ops; + + public ExpToRexTranslator(RexBuilder builder, RelDataTypeFactory typeFactory, SqlOperatorTable opTable) { + this.builder = builder; + this.typeFactory = typeFactory; + + List<SqlOperator> opList = opTable.getOperatorList(); + + HashMap<Pair<String, SqlSyntax>, SqlOperator> ops = new HashMap<>(opList.size()); + + for (SqlOperator op : opList) { + ops.put(Pair.of(op.getName(), op.getSyntax()), op); + } + + this.ops = ops; + } + + public List<RexNode> translate(List<LogicalExpression> exps) { + if (F.isEmpty(exps)) + return Collections.emptyList(); + + if (exps.size() == 1) + return F.asList(translate(F.first(exps))); + + List<RexNode> res = new ArrayList<>(exps.size()); + + for (LogicalExpression exp : exps) { + res.add(exp.implement(this)); + } + + return res; + } + + public RexNode translate(LogicalExpression exp) { + return exp.implement(this); + } + + @Override public RexNode implement(CallExpression exp) { + return builder.makeCall(op(exp.opName, exp.opSyntax), translate(exp.operands)); + } + + @Override public RexNode implement(InputRefExpression exp) { + return builder.makeInputRef(exp.type.toRelDataType(typeFactory), exp.index); + } + + @Override public RexNode implement(LiteralExpression exp) { + return builder.makeLiteral(exp.value, exp.type.toRelDataType(typeFactory), false); + } + + @Override public RexNode implement(LocalRefExpression exp) { + return new RexLocalRef(exp.index, exp.type.toRelDataType(typeFactory)); + } + + private SqlOperator op(String name, SqlSyntax syntax) { + return ops.get(Pair.of(name, syntax)); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpressionType.java similarity index 66% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpressionType.java index abe2b36..05762d8 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ExpressionType.java @@ -16,12 +16,17 @@ package org.apache.ignite.internal.processors.query.calcite.serialize; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; +import java.io.Serializable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; /** * */ -public interface Expression { - RexNode toRex(RexBuilder builder); +public interface ExpressionType extends Serializable { + static ExpressionType fromType(RelDataType type) { + return type.isStruct() ? StructType.fromType(type) : FieldType.fromType(type); + } + + RelDataType toRelDataType(RelDataTypeFactory factory); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FieldType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FieldType.java new file mode 100644 index 0000000..e1e1b3a --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/FieldType.java @@ -0,0 +1,59 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.serialize; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * + */ +public class FieldType implements ExpressionType { + private final Class clazz; + private final SqlTypeName typeName; + private final int precision; + private final int scale; + + public static FieldType fromType(RelDataType type) { + assert !type.isStruct(); + + if (type instanceof RelDataTypeFactoryImpl.JavaType) + return new FieldType(((RelDataTypeFactoryImpl.JavaType) type).getJavaClass(), null, 0, 0); + + return new FieldType(null, type.getSqlTypeName(), type.getPrecision(), type.getScale()); + } + + private FieldType(Class clazz, SqlTypeName typeName, int precision, int scale) { + this.clazz = clazz; + this.typeName = typeName; + this.precision = precision; + this.scale = scale; + } + + @Override public RelDataType toRelDataType(RelDataTypeFactory factory) { + if (clazz != null) + return factory.createJavaType(clazz); + if (typeName.allowsNoPrecNoScale()) + return factory.createSqlType(typeName); + if (typeName.allowsPrecNoScale()) + return factory.createSqlType(typeName, precision); + + return factory.createSqlType(typeName, precision, scale); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java index 54bc932..06b7b04 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java @@ -16,6 +16,7 @@ package org.apache.ignite.internal.processors.query.calcite.serialize; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.ignite.internal.util.GridIntList; @@ -23,7 +24,7 @@ import org.apache.ignite.internal.util.GridIntList; /** * */ -public class Graph { +public class Graph implements Serializable { private final List<GraphNode> nodes = new ArrayList<>(); private final List<GridIntList> edges = new ArrayList<>(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java index 421c4bb..bc12e25 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java @@ -17,22 +17,20 @@ package org.apache.ignite.internal.processors.query.calcite.serialize; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; /** * */ -public class InputRefExpression implements Expression { - private final RelDataType type; - private final int index; +public class InputRefExpression implements LogicalExpression { + public final ExpressionType type; + public final int index; public InputRefExpression(RelDataType type, int index) { - this.type = type; + this.type = ExpressionType.fromType(type); this.index = index; } - @Override public RexNode toRex(RexBuilder builder) { - return builder.makeInputRef(type, index); + @Override public <T> T implement(ExpImplementor<T> implementor) { + return implementor.implement(this); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java index 923fff4..3c95b96 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java @@ -17,22 +17,20 @@ package org.apache.ignite.internal.processors.query.calcite.serialize; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; /** * */ -public class LiteralExpression implements Expression { - private final Comparable value; - private final RelDataType type; +public class LiteralExpression implements LogicalExpression { + public final ExpressionType type; + public final Comparable value; public LiteralExpression(RelDataType type, Comparable value) { + this.type = ExpressionType.fromType(type); this.value = value; - this.type = type; } - @Override public RexNode toRex(RexBuilder builder) { - return builder.makeLiteral(value, type, false); + @Override public <T> T implement(ExpImplementor<T> implementor) { + return implementor.implement(this); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java index b0947ac..4f3ed54 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java @@ -17,23 +17,20 @@ package org.apache.ignite.internal.processors.query.calcite.serialize; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexLocalRef; -import org.apache.calcite.rex.RexNode; /** * */ -public class LocalRefExpression implements Expression { - private final RelDataType type; - private final int index; +public class LocalRefExpression implements LogicalExpression { + public final ExpressionType type; + public final int index; public LocalRefExpression(RelDataType type, int index) { - this.type = type; + this.type = ExpressionType.fromType(type); this.index = index; } - @Override public RexNode toRex(RexBuilder builder) { - return new RexLocalRef(index, type); + @Override public <T> T implement(ExpImplementor<T> implementor) { + return implementor.implement(this); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LogicalExpression.java similarity index 83% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LogicalExpression.java index abe2b36..bc8a5bc 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LogicalExpression.java @@ -16,12 +16,11 @@ package org.apache.ignite.internal.processors.query.calcite.serialize; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; +import java.io.Serializable; /** * */ -public interface Expression { - RexNode toRex(RexBuilder builder); +public interface LogicalExpression extends Serializable { + <T> T implement(ExpImplementor<T> implementor); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java similarity index 70% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java index abe2b36..14a2019 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelGraphNode.java @@ -16,12 +16,17 @@ package org.apache.ignite.internal.processors.query.calcite.serialize; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; +import java.io.Serializable; +import org.apache.calcite.plan.RelTrait; +import org.apache.calcite.plan.RelTraitSet; /** * */ -public interface Expression { - RexNode toRex(RexBuilder builder); +public class RelGraphNode implements GraphNode, Serializable { + protected RelTrait[] traits; + + public RelGraphNode(RelTraitSet traits) { + this.traits = traits.toArray(new RelTrait[0]); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java index c5eaae5..40a9158 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.serialize; import java.util.ArrayDeque; import java.util.Deque; import java.util.List; -import org.apache.calcite.rel.RelNode; +import org.apache.calcite.util.Pair; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin; @@ -28,54 +28,67 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan; import org.apache.ignite.internal.processors.query.calcite.rel.Receiver; import org.apache.ignite.internal.processors.query.calcite.rel.Sender; -import org.apache.ignite.internal.processors.query.calcite.util.Implementor; +import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor; /** * */ -public class RelToGraphConverter implements Implementor<List<RelNode>> { - private Deque<List<RelNode>> stack1 = new ArrayDeque<>(); - private Deque<Integer> stack2 = new ArrayDeque<>(); +public class RelToGraphConverter implements RelImplementor<Pair<Integer, List<IgniteRel>>> { + private Deque<Pair<Integer, List<IgniteRel>>> stack = new ArrayDeque<>(); private Graph graph; + private int parentId; - public Graph convert(RelNode root) { - stack1 = new ArrayDeque<>(); - stack2 = new ArrayDeque<>(); - + public Graph convert(IgniteRel root) { + stack = new ArrayDeque<>(); graph = new Graph(); + parentId = -1; - return null; + stack.push(root.implement(this)); + + while (!stack.isEmpty()) { + Pair<Integer, List<IgniteRel>> pair = stack.pop(); + + parentId = pair.left; + + for (IgniteRel child : pair.right) { + stack.push(child.implement(this)); + } + } + + return graph; } - @Override public List<RelNode> implement(IgniteExchange rel) { + @Override public Pair<Integer, List<IgniteRel>> implement(IgniteFilter rel) { return null; } - @Override public List<RelNode> implement(IgniteFilter rel) { + @Override public Pair<Integer, List<IgniteRel>> implement(IgniteJoin rel) { return null; } - @Override public List<RelNode> implement(IgniteJoin rel) { + @Override public Pair<Integer, List<IgniteRel>> implement(IgniteProject rel) { return null; } - @Override public List<RelNode> implement(IgniteProject rel) { + @Override public Pair<Integer, List<IgniteRel>> implement(IgniteTableScan rel) { return null; } - @Override public List<RelNode> implement(IgniteTableScan rel) { + @Override public Pair<Integer, List<IgniteRel>> implement(Receiver rel) { return null; } - @Override public List<RelNode> implement(Receiver rel) { + @Override public Pair<Integer, List<IgniteRel>> implement(Sender rel) { + assert parentId == -1; + return null; } - @Override public List<RelNode> implement(Sender rel) { - return null; + @Override public Pair<Integer, List<IgniteRel>> implement(IgniteExchange rel) { + throw new UnsupportedOperationException(); } - @Override public List<RelNode> implement(IgniteRel other) { - return null; + @Override public Pair<Integer, List<IgniteRel>> implement(IgniteRel other) { + throw new AssertionError(); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java index a30edfa..dfb92d3 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java @@ -36,62 +36,66 @@ import org.apache.calcite.rex.RexVisitor; /** * */ -public class RexToExpTranslator implements RexVisitor<Expression> { - @Override public Expression visitInputRef(RexInputRef inputRef) { +public class RexToExpTranslator implements RexVisitor<LogicalExpression> { + public List<LogicalExpression> translate(List<RexNode> operands) { + ArrayList<LogicalExpression> res = new ArrayList<>(operands.size()); + + for (RexNode operand : operands) { + res.add(translate(operand)); + } + + return res; + } + + public LogicalExpression translate(RexNode rex) { + return rex.accept(this); + } + + @Override public LogicalExpression visitInputRef(RexInputRef inputRef) { return new InputRefExpression(inputRef.getType(), inputRef.getIndex()); } - @Override public Expression visitLocalRef(RexLocalRef localRef) { + @Override public LogicalExpression visitLocalRef(RexLocalRef localRef) { return new LocalRefExpression(localRef.getType(), localRef.getIndex()); } - @Override public Expression visitLiteral(RexLiteral literal) { + @Override public LogicalExpression visitLiteral(RexLiteral literal) { return new LiteralExpression(literal.getType(), literal.getValue()); } - @Override public Expression visitCall(RexCall call) { - return new CallExpression(call.getType(), call.getOperator(), visitList(call.getOperands())); + @Override public LogicalExpression visitCall(RexCall call) { + return new CallExpression(call.getOperator(), translate(call.getOperands())); } - @Override public Expression visitOver(RexOver over) { + @Override public LogicalExpression visitOver(RexOver over) { throw new UnsupportedOperationException(); } - @Override public Expression visitCorrelVariable(RexCorrelVariable correlVariable) { + @Override public LogicalExpression visitCorrelVariable(RexCorrelVariable correlVariable) { throw new UnsupportedOperationException(); } - @Override public Expression visitDynamicParam(RexDynamicParam dynamicParam) { + @Override public LogicalExpression visitDynamicParam(RexDynamicParam dynamicParam) { throw new UnsupportedOperationException(); } - @Override public Expression visitRangeRef(RexRangeRef rangeRef) { + @Override public LogicalExpression visitRangeRef(RexRangeRef rangeRef) { throw new UnsupportedOperationException(); } - @Override public Expression visitFieldAccess(RexFieldAccess fieldAccess) { + @Override public LogicalExpression visitFieldAccess(RexFieldAccess fieldAccess) { throw new UnsupportedOperationException(); } - @Override public Expression visitSubQuery(RexSubQuery subQuery) { + @Override public LogicalExpression visitSubQuery(RexSubQuery subQuery) { throw new UnsupportedOperationException(); } - @Override public Expression visitTableInputRef(RexTableInputRef fieldRef) { + @Override public LogicalExpression visitTableInputRef(RexTableInputRef fieldRef) { throw new UnsupportedOperationException(); } - @Override public Expression visitPatternFieldRef(RexPatternFieldRef fieldRef) { + @Override public LogicalExpression visitPatternFieldRef(RexPatternFieldRef fieldRef) { throw new UnsupportedOperationException(); } - - public List<Expression> visitList(List<RexNode> operands) { - ArrayList<Expression> res = new ArrayList<>(operands.size()); - - for (RexNode operand : operands) { - res.add(operand.accept(this)); - } - - return res; - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java similarity index 59% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java index d239c5c..41820d6 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SenderNode.java @@ -14,19 +14,23 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.calcite.trait; +package org.apache.ignite.internal.processors.query.calcite.serialize; -import org.apache.calcite.plan.Context; -import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.processors.query.calcite.rel.Sender; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; /** * */ -public interface DestinationFunctionFactory { - DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys); +public class SenderNode extends RelGraphNode { + private DistributionTrait targetDistr; + private NodesMapping targetMapping; - default Object key() { - return getClass(); + public SenderNode(Sender sender) { + super(sender.getTraitSet()); + + targetDistr = sender.targetDistribution(); + targetMapping = sender.targetMapping(); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java new file mode 100644 index 0000000..45d5e44 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/StructType.java @@ -0,0 +1,51 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.serialize; + +import java.util.LinkedHashMap; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; + +/** + * + */ +public class StructType implements ExpressionType { + private final LinkedHashMap<String, FieldType> fields; + + public static StructType fromType(RelDataType type) { + assert type.isStruct(); + + LinkedHashMap<String, FieldType> fields = new LinkedHashMap<>(); + + for (RelDataTypeField field : type.getFieldList()) { + fields.put(field.getName(), FieldType.fromType(field.getType())); + } + + return new StructType(fields); + } + + private StructType(LinkedHashMap<String, FieldType> fields) { + this.fields = fields; + } + + @Override public RelDataType toRelDataType(RelDataTypeFactory factory) { + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(factory); + fields.forEach((n,f) -> builder.add(n,f.toRelDataType(factory))); + return builder.build(); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AbstractDestinationFunctionFactory.java similarity index 65% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AbstractDestinationFunctionFactory.java index 60e37ed..e6b6bb5 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AbstractDestinationFunctionFactory.java @@ -16,32 +16,20 @@ package org.apache.ignite.internal.processors.query.calcite.trait; +import java.util.Objects; + /** * */ -public enum DistributionType { - HASH("hash"), - RANDOM("random"), - BROADCAST("broadcast"), - SINGLE("single"), - ANY("any"); - - /** - * - */ - private final String description; - - /** - * - */ - DistributionType(String description) { - this.description = description; +public abstract class AbstractDestinationFunctionFactory implements DestinationFunctionFactory { + @Override public int hashCode() { + return Objects.hashCode(key()); } - /** - * - */ - @Override public String toString() { - return description; + @Override public boolean equals(Object obj) { + if (obj instanceof DestinationFunctionFactory) + return Objects.equals(key(), ((DestinationFunctionFactory) obj).key()); + + return false; } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java similarity index 66% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java index d239c5c..535c3bb 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java @@ -16,17 +16,25 @@ package org.apache.ignite.internal.processors.query.calcite.trait; +import java.util.List; import org.apache.calcite.plan.Context; import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; /** * */ -public interface DestinationFunctionFactory { - DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys); +class AllTargetsFactory extends AbstractDestinationFunctionFactory { + static final DestinationFunctionFactory INSTANCE = new AllTargetsFactory(); - default Object key() { - return getClass(); + @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) { + List<ClusterNode> nodes = m.nodes(); + + return r -> nodes; + } + + @Override public Object key() { + return "AllTargetsFactory"; } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java index d239c5c..587c172 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java @@ -16,6 +16,7 @@ package org.apache.ignite.internal.processors.query.calcite.trait; +import java.io.Serializable; import org.apache.calcite.plan.Context; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; @@ -23,10 +24,8 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping /** * */ -public interface DestinationFunctionFactory { +public interface DestinationFunctionFactory extends Serializable { DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys); - default Object key() { - return getClass(); - } + Object key(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java index 34773f2..756bf6b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java @@ -16,6 +16,8 @@ package org.apache.ignite.internal.processors.query.calcite.trait; +import java.io.Serializable; +import java.util.Arrays; import java.util.Objects; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTrait; @@ -25,12 +27,15 @@ import org.apache.calcite.util.ImmutableIntList; /** * */ -public final class DistributionTrait implements RelTrait { - private final DistributionType type; - private final ImmutableIntList keys; - private final DestinationFunctionFactory functionFactory; +public final class DistributionTrait implements RelTrait, Serializable { + private DistributionType type; + private int[] keys; + private DestinationFunctionFactory functionFactory; - public DistributionTrait(DistributionType type, ImmutableIntList keys, DestinationFunctionFactory functionFactory) { + public DistributionTrait() { + } + + public DistributionTrait(DistributionType type, int[] keys, DestinationFunctionFactory functionFactory) { this.type = type; this.keys = keys; this.functionFactory = functionFactory; @@ -45,7 +50,7 @@ public final class DistributionTrait implements RelTrait { } public ImmutableIntList keys() { - return keys; + return ImmutableIntList.of(keys); } @Override public void register(RelOptPlanner planner) {} @@ -57,18 +62,18 @@ public final class DistributionTrait implements RelTrait { if (o instanceof DistributionTrait) { DistributionTrait that = (DistributionTrait) o; - return type == that.type() && keys.equals(that.keys()); + return type == that.type() && Arrays.equals(keys, that.keys); } return false; } @Override public int hashCode() { - return Objects.hash(type, keys); + return Objects.hash(type, Arrays.hashCode(keys)); } @Override public String toString() { - return type + (type == DistributionType.HASH ? keys.toString() : ""); + return type + (type == DistributionType.HASH ? Arrays.toString(keys) : ""); } @Override public RelTraitDef getTraitDef() { @@ -89,10 +94,9 @@ public final class DistributionTrait implements RelTrait { if (type() == other.type()) return type() != DistributionType.HASH - || (Objects.equals(keys(), other.keys()) - && Objects.equals(destinationFunctionFactory().key(), other.destinationFunctionFactory().key())); + || (Arrays.equals(keys, other.keys) + && Objects.equals(functionFactory, other.functionFactory)); return other.type() == DistributionType.RANDOM && type() == DistributionType.HASH; } - } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java index 60e37ed..c67964f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java @@ -26,9 +26,7 @@ public enum DistributionType { SINGLE("single"), ANY("any"); - /** - * - */ + /** */ private final String description; /** diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java new file mode 100644 index 0000000..c6a3eb5 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java @@ -0,0 +1,67 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.trait; + +import java.util.List; +import java.util.function.ToIntFunction; +import org.apache.calcite.plan.Context; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * + */ +class HashFunctionFactory extends AbstractDestinationFunctionFactory { + static final DestinationFunctionFactory INSTANCE = new HashFunctionFactory(); + + @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) { + assert m != null && !F.isEmpty(m.assignments()); + + int[] fields = k.toIntArray(); + + ToIntFunction<Object> hashFun = r -> { + Object[] row = (Object[]) r; + + if (row == null) + return 0; + + int hash = 1; + + for (int i : fields) + hash = 31 * hash + (row[i] == null ? 0 : row[i].hashCode()); + + return hash; + }; + + List<List<ClusterNode>> assignments = m.assignments(); + + if (U.assertionsEnabled()) { + for (List<ClusterNode> assignment : assignments) { + assert F.isEmpty(assignment) || assignment.size() == 1; + } + } + + return r -> assignments.get(hashFun.applyAsInt(r) % assignments.size()); + } + + @Override public Object key() { + return "HashFunctionFactory"; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java index 8363bbd..a95734a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java @@ -20,15 +20,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.concurrent.ThreadLocalRandom; -import java.util.function.ToIntFunction; import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.metadata.RelMetadataQuery; -import org.apache.calcite.util.ImmutableIntList; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH; @@ -37,42 +32,11 @@ import static org.apache.ignite.internal.processors.query.calcite.trait.Distribu * */ public class IgniteDistributions { - private static final DestinationFunctionFactory NO_OP_FACTORY = (ctx, m, k) -> null; - private static final DestinationFunctionFactory HASH_FACTORY = (ctx, m, k) -> { - assert m != null && !F.isEmpty(m.assignments()); - - int[] fields = k.toIntArray(); - - ToIntFunction<Object> hashFun = r -> { - Object[] row = (Object[]) r; - - if (row == null) - return 0; - - int hash = 1; - - for (int i : fields) - hash = 31 * hash + (row[i] == null ? 0 : row[i].hashCode()); - - return hash; - }; - - List<List<ClusterNode>> assignments = m.assignments(); - - if (U.assertionsEnabled()) { - for (List<ClusterNode> assignment : assignments) { - assert F.isEmpty(assignment) || assignment.size() == 1; - } - } - - return r -> assignments.get(hashFun.applyAsInt(r) % assignments.size()); - }; - - - private static final DistributionTrait BROADCAST = new DistributionTrait(DistributionType.BROADCAST, ImmutableIntList.of(), allTargetsFunction()); - private static final DistributionTrait SINGLE = new DistributionTrait(DistributionType.SINGLE, ImmutableIntList.of(), singleTargetFunction()); - private static final DistributionTrait RANDOM = new DistributionTrait(DistributionType.RANDOM, ImmutableIntList.of(), randomTargetFunction()); - private static final DistributionTrait ANY = new DistributionTrait(DistributionType.ANY, ImmutableIntList.of(), noOpFunction()); + private static final int[] EMPTY_KEYS = new int[0]; + private static final DistributionTrait BROADCAST = new DistributionTrait(DistributionType.BROADCAST, EMPTY_KEYS, AllTargetsFactory.INSTANCE); + private static final DistributionTrait SINGLE = new DistributionTrait(DistributionType.SINGLE, EMPTY_KEYS, SingleTargetFactory.INSTANCE); + private static final DistributionTrait RANDOM = new DistributionTrait(DistributionType.RANDOM, EMPTY_KEYS, RandomTargetFactory.INSTANCE); + private static final DistributionTrait ANY = new DistributionTrait(DistributionType.ANY, EMPTY_KEYS, NoOpFactory.INSTANCE); public static DistributionTrait any() { return ANY; @@ -90,40 +54,12 @@ public class IgniteDistributions { return BROADCAST; } - public static DistributionTrait hash(List<Integer> keys, DestinationFunctionFactory factory) { - return new DistributionTrait(HASH, ImmutableIntList.copyOf(keys), factory); + public static DistributionTrait hash(List<Integer> keys) { + return new DistributionTrait(HASH, U.toIntArray(keys), HashFunctionFactory.INSTANCE); } - public static DestinationFunctionFactory noOpFunction() { - return NO_OP_FACTORY; - } - - public static DestinationFunctionFactory singleTargetFunction() { - return (ctx, m, k) -> { - List<ClusterNode> nodes = m.nodes().subList(0, 1); - - return r -> nodes; - }; - } - - public static DestinationFunctionFactory allTargetsFunction() { - return (ctx, m, k) -> { - List<ClusterNode> nodes = m.nodes(); - - return r -> nodes; - }; - } - - public static DestinationFunctionFactory randomTargetFunction() { - return (ctx, m, k) -> { - List<ClusterNode> nodes = m.nodes(); - - return r -> Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()))); - }; - } - - public static DestinationFunctionFactory hashFunction() { - return HASH_FACTORY; + public static DistributionTrait hash(List<Integer> keys, DestinationFunctionFactory factory) { + return new DistributionTrait(HASH, U.toIntArray(keys), factory); } public static List<DistributionTrait> deriveDistributions(RelNode rel, RelMetadataQuery mq) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java similarity index 74% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java index d239c5c..5dff5a9 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java @@ -23,10 +23,14 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping /** * */ -public interface DestinationFunctionFactory { - DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys); +class NoOpFactory extends AbstractDestinationFunctionFactory { + static final DestinationFunctionFactory INSTANCE = new NoOpFactory(); - default Object key() { - return getClass(); + @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) { + return null; + } + + @Override public Object key() { + return "NoOpFactory"; } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java similarity index 59% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java index d239c5c..a4b27b7 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java @@ -16,17 +16,28 @@ package org.apache.ignite.internal.processors.query.calcite.trait; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import org.apache.calcite.plan.Context; import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; /** * */ -public interface DestinationFunctionFactory { - DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys); +class RandomTargetFactory extends AbstractDestinationFunctionFactory { + static final DestinationFunctionFactory INSTANCE = new RandomTargetFactory(); - default Object key() { - return getClass(); + @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) { + List<ClusterNode> nodes = m.nodes(); + + return r -> Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()))); + } + + @Override public Object key() { + return "RandomTargetFactory"; } + } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java similarity index 65% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java index d239c5c..4d21a60 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java @@ -16,17 +16,25 @@ package org.apache.ignite.internal.processors.query.calcite.trait; +import java.util.List; import org.apache.calcite.plan.Context; import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; /** * */ -public interface DestinationFunctionFactory { - DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys); +class SingleTargetFactory extends AbstractDestinationFunctionFactory { + static final DestinationFunctionFactory INSTANCE = new SingleTargetFactory(); - default Object key() { - return getClass(); + @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) { + List<ClusterNode> nodes = m.nodes().subList(0, 1); + + return r -> nodes; + } + + @Override public Object key() { + return "SingleTargetFactory"; } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeFactory.java similarity index 63% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeFactory.java index 32cf357..83a2a19 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeFactory.java @@ -14,14 +14,20 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.calcite.metadata; +package org.apache.ignite.internal.processors.query.calcite.type; -import org.apache.ignite.internal.processors.query.calcite.schema.RowType; -import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataTypeSystem; /** * */ -public interface DistributionRegistry { - DistributionTrait distribution(int cacheId, RowType rowType); +public class IgniteTypeFactory extends JavaTypeFactoryImpl { + public IgniteTypeFactory() { + super(IgniteTypeSystem.DEFAULT); + } + + public IgniteTypeFactory(RelDataTypeSystem typeSystem) { + super(typeSystem); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeSystem.java similarity index 65% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeSystem.java index 32cf357..6dbfd02 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeSystem.java @@ -14,14 +14,15 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.calcite.metadata; +package org.apache.ignite.internal.processors.query.calcite.type; -import org.apache.ignite.internal.processors.query.calcite.schema.RowType; -import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; +import java.io.Serializable; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; /** * */ -public interface DistributionRegistry { - DistributionTrait distribution(int cacheId, RowType rowType); +public class IgniteTypeSystem extends RelDataTypeSystemImpl implements Serializable { + public static final RelDataTypeSystem DEFAULT = new IgniteTypeSystem(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/RowType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/RowType.java similarity index 98% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/RowType.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/RowType.java index 0da2c70..2133c56 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/RowType.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/RowType.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.calcite.schema; +package org.apache.ignite.internal.processors.query.calcite.type; import java.util.ArrayList; import java.util.BitSet; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java index d594bb4..aab2302 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java @@ -42,7 +42,7 @@ import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.QueryContext; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; -import org.apache.ignite.internal.processors.query.calcite.schema.RowType; +import org.apache.ignite.internal.processors.query.calcite.type.RowType; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java similarity index 97% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java index c45af43..58ba064 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java @@ -28,7 +28,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.Sender; /** * */ -public interface Implementor<T> { +public interface RelImplementor<T> { T implement(IgniteExchange rel); T implement(IgniteFilter rel); T implement(IgniteJoin rel); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java index 1891f44..bc7d179 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java @@ -33,7 +33,16 @@ import org.apache.calcite.rel.core.Project; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.tools.Frameworks; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.BinaryConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler; +import org.apache.ignite.internal.binary.BinaryContext; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.managers.systemview.GridSystemViewManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry; import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry; @@ -45,15 +54,21 @@ import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase; import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType; import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; -import org.apache.ignite.internal.processors.query.calcite.schema.RowType; -import org.apache.ignite.internal.processors.query.calcite.serialize.Expression; +import org.apache.ignite.internal.processors.query.calcite.serialize.LogicalExpression; import org.apache.ignite.internal.processors.query.calcite.serialize.RexToExpTranslator; import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan; import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.type.RowType; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.logger.NullLogger; +import org.apache.ignite.marshaller.MarshallerContextTestImpl; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.systemview.jmx.JmxSystemViewExporterSpi; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -341,7 +356,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { Project proj = (Project) relRoot.rel.getInput(0); - List<Expression> expressions = translator.visitList(proj.getProjects()); + List<LogicalExpression> expressions = translator.translate(proj.getProjects()); assertNotNull(expressions); } @@ -412,7 +427,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { @Test public void testSplitterCollocatedReplicatedReplicated() throws Exception { - String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + String sql = "SELECT d.id, (d.id + 1) as id2, d.name, d.projectId, p.id0, p.ver0 " + "FROM PUBLIC.Developer d JOIN (" + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + ") p " + @@ -504,7 +519,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { if (cacheId == CU.cacheId("Project")) return IgniteDistributions.broadcast(); - return IgniteDistributions.hash(rowType.distributionKeys(), IgniteDistributions.hashFunction()); + return IgniteDistributions.hash(rowType.distributionKeys()); } @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) { @@ -592,7 +607,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { if (cacheId == CU.cacheId("Project")) return IgniteDistributions.broadcast(); - return IgniteDistributions.hash(rowType.distributionKeys(), IgniteDistributions.hashFunction()); + return IgniteDistributions.hash(rowType.distributionKeys()); } @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) { @@ -761,7 +776,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { if (cacheId == CU.cacheId("Project")) return IgniteDistributions.broadcast(); - return IgniteDistributions.hash(rowType.distributionKeys(), IgniteDistributions.hashFunction()); + return IgniteDistributions.hash(rowType.distributionKeys()); } @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) { @@ -850,7 +865,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { if (cacheId == CU.cacheId("Project")) return IgniteDistributions.broadcast(); - return IgniteDistributions.hash(rowType.distributionKeys(), IgniteDistributions.hashFunction()); + return IgniteDistributions.hash(rowType.distributionKeys()); } @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) { @@ -943,7 +958,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { } @Override public DistributionTrait distribution(int cacheId, RowType rowType) { - return IgniteDistributions.hash(rowType.distributionKeys(), IgniteDistributions.hashFunction()); + return IgniteDistributions.hash(rowType.distributionKeys()); } @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) { @@ -967,4 +982,40 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { throw new AssertionError("Unexpected cache id:" + cacheId); } } + + /** + * @return Binary marshaller. + */ + private BinaryMarshaller binaryMarshaller() throws IgniteCheckedException { + IgniteConfiguration iCfg = new IgniteConfiguration(); + + BinaryConfiguration bCfg = new BinaryConfiguration(); + iCfg.setBinaryConfiguration(bCfg); + iCfg.setClientMode(false); + iCfg.setDiscoverySpi(new TcpDiscoverySpi() { + @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + //No-op. + } + }); + iCfg.setSystemViewExporterSpi(new JmxSystemViewExporterSpi()); + + BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), iCfg, new NullLogger()); + + BinaryMarshaller marsh = new BinaryMarshaller(); + + MarshallerContextTestImpl marshCtx = new MarshallerContextTestImpl(null, null); + + GridTestKernalContext kernCtx = new GridTestKernalContext(log, iCfg); + + kernCtx.add(new GridSystemViewManager(kernCtx)); + kernCtx.add(new GridDiscoveryManager(kernCtx)); + + marshCtx.onMarshallerProcessorStarted(kernCtx, null); + + marsh.setContext(marshCtx); + + IgniteUtils.invoke(BinaryMarshaller.class, marsh, "setBinaryContext", ctx, iCfg); + + return marsh; + } } \ No newline at end of file