This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new 9125a61 pending
9125a61 is described below
commit 9125a610a1e560ee89db2d655f7ec726261cf975
Author: Igor Seliverstov <[email protected]>
AuthorDate: Thu Nov 7 19:04:59 2019 +0300
pending
---
.../query/calcite/CalciteQueryProcessor.java | 2 +
.../query/calcite/cluster/RegistryImpl.java | 173 ++++++++
.../DistributionRegistry.java} | 10 +-
.../FragmentLocation.java} | 12 +-
.../calcite/metadata/IgniteMdDistribution.java | 7 +-
.../calcite/metadata/IgniteMdFragmentLocation.java | 161 +++++++
.../metadata/IgniteMdSourceDistribution.java | 128 ------
.../query/calcite/metadata/IgniteMetadata.java | 17 +-
.../query/calcite/metadata/Location.java | 165 ++++++++
.../LocationMappingException.java} | 11 +-
.../LocationRegistry.java} | 10 +-
.../OptimisticPlanningException.java} | 18 +-
.../query/calcite/metadata/RelMetadataQueryEx.java | 47 +--
.../processors/query/calcite/rel/CloneContext.java | 48 ---
.../query/calcite/rel/IgniteExchange.java | 4 -
.../processors/query/calcite/rel/IgniteFilter.java | 4 -
.../query/calcite/rel/IgniteHashJoin.java | 4 -
.../query/calcite/rel/IgniteProject.java | 6 +-
.../processors/query/calcite/rel/IgniteRel.java | 2 -
.../query/calcite/rel/IgniteTableScan.java | 10 +-
.../processors/query/calcite/rel/Receiver.java | 23 +-
.../processors/query/calcite/rel/Sender.java | 48 ++-
.../query/calcite/schema/IgniteTable.java | 33 +-
.../query/calcite/splitter/Fragment.java | 51 ++-
.../calcite/splitter/PartitionsDistribution.java | 196 ---------
.../query/calcite/splitter/QueryPlan.java | 56 ++-
.../query/calcite/splitter/Splitter.java | 48 ++-
...utionFunction.java => DestinationFunction.java} | 2 +-
...actory.java => DestinationFunctionFactory.java} | 6 +-
.../query/calcite/trait/DistributionTrait.java | 72 ++--
.../query/calcite/trait/DistributionTraitImpl.java | 71 ----
...nFunctionFactory.java => DistributionType.java} | 30 +-
.../query/calcite/trait/IgniteDistributions.java | 70 +++-
.../processors/query/calcite/util/Commons.java | 93 +++++
.../Edge.java} | 29 +-
.../query/calcite/util/IgniteMethod.java | 4 +-
.../query/calcite/util/IgniteRelShuttle.java | 83 ++++
.../query/calcite/CalciteQueryProcessorTest.java | 461 ++++++++++++++++++---
38 files changed, 1483 insertions(+), 732 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index b036397..f188665 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -37,6 +37,7 @@ import
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.QueryEngine;
+import
org.apache.ignite.internal.processors.query.calcite.cluster.RegistryImpl;
import
org.apache.ignite.internal.processors.query.calcite.prepare.DistributedExecution;
import
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
@@ -148,6 +149,7 @@ public class CalciteQueryProcessor implements QueryEngine {
return Contexts.chain(ctx, config.getContext(),
Contexts.of(
new Query(query, params),
+ new RegistryImpl(kernalContext),
provided(ctx, SchemaPlus.class, schemaHolder::schema),
provided(ctx, AffinityTopologyVersion.class,
this::readyAffinityVersion)));
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
new file mode 100644
index 0000000..4ad7d48
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.cluster;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.ToIntFunction;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
+import org.apache.ignite.internal.processors.query.calcite.metadata.Location;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class RegistryImpl implements DistributionRegistry, LocationRegistry {
+ private final GridKernalContext ctx;
+
+ public RegistryImpl(GridKernalContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override public DistributionTrait distribution(int cacheId, RowType
rowType) {
+ if (ctx.cache().context().cacheContext(cacheId).isReplicated())
+ return IgniteDistributions.broadcast();
+
+ Object key =
ctx.cache().context().affinity().affinity(cacheId).similarAffinityKey();
+ ToIntFunction<Object> partFun =
ctx.cache().context().cacheContext(cacheId).affinity()::partition;
+
+ return IgniteDistributions.hash(rowType.distributionKeys(), new
AffinityFactory(partFun, key));
+ }
+
+ @Override public Location single(AffinityTopologyVersion topVer) {
+ return new
Location(Collections.singletonList(ctx.discovery().localNode()), null, (byte)
0);
+ }
+
+ @Override public Location random(AffinityTopologyVersion topVer) {
+ return new Location(ctx.discovery().discoCache(topVer).serverNodes(),
null, (byte) 0);
+ }
+
+ @Override public Location distributed(int cacheId, AffinityTopologyVersion
topVer) {
+ GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
+
+ return cctx.isReplicated() ? replicatedLocation(cctx, topVer) :
partitionedLocation(cctx, topVer);
+ }
+
+ private Location partitionedLocation(GridCacheContext cctx,
AffinityTopologyVersion topVer) {
+ byte flags = Location.HAS_PARTITIONED_CACHES;
+
+ List<List<ClusterNode>> assignments =
cctx.affinity().assignments(topVer);
+
+ if (cctx.config().getWriteSynchronizationMode() ==
CacheWriteSynchronizationMode.PRIMARY_SYNC) {
+ List<List<ClusterNode>> assignments0 = new
ArrayList<>(assignments.size());
+
+ for (List<ClusterNode> partNodes : assignments)
+ assignments0.add(F.isEmpty(partNodes) ?
Collections.emptyList() : Collections.singletonList(F.first(partNodes)));
+
+ assignments = assignments0;
+ }
+ else if (!cctx.topology().rebalanceFinished(topVer)) {
+ flags |= Location.HAS_MOVING_PARTITIONS;
+
+ List<List<ClusterNode>> assignments0 = new
ArrayList<>(assignments.size());
+
+ for (int part = 0; part < assignments.size(); part++) {
+ List<ClusterNode> partNodes = assignments0.get(part),
partNodes0 = new ArrayList<>(partNodes.size());
+
+ for (ClusterNode partNode : partNodes) {
+ if (cctx.topology().partitionState(partNode.id(), part) ==
GridDhtPartitionState.OWNING)
+ partNodes0.add(partNode);
+ }
+
+ assignments0.add(partNodes0);
+ }
+
+ assignments = assignments0;
+ }
+
+ return new Location(null, assignments, flags);
+ }
+
+ private Location replicatedLocation(GridCacheContext cctx,
AffinityTopologyVersion topVer) {
+ byte flags = Location.HAS_REPLICATED_CACHES;
+
+ if (cctx.config().getNodeFilter() != null)
+ flags |= Location.PARTIALLY_REPLICATED;
+
+ List<ClusterNode> nodes =
cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.cacheId());
+
+ if (!cctx.topology().rebalanceFinished(topVer)) {
+ flags |= Location.PARTIALLY_REPLICATED;
+
+ List<ClusterNode> nodes0 = new ArrayList<>(nodes.size());
+
+ int parts = cctx.topology().partitions();
+
+ parent:
+ for (ClusterNode node : nodes) {
+ for (int part = 0; part < parts; part++) {
+ if (cctx.topology().partitionState(node.id(), part) !=
GridDhtPartitionState.OWNING)
+ continue parent;
+ }
+
+ nodes0.add(node);
+ }
+
+ nodes = nodes0;
+ }
+
+ return new Location(nodes, null, flags);
+ }
+
+ private static class AffinityFactory implements DestinationFunctionFactory
{
+ private final ToIntFunction<Object> partFun;
+ private final Object key;
+
+ AffinityFactory(ToIntFunction<Object> partFun, Object key) {
+ this.partFun = partFun;
+ this.key = key;
+ }
+
+ @Override public DestinationFunction create(FragmentLocation
targetLocation, ImmutableIntList keys) {
+ assert keys.size() == 1 && targetLocation.location != null;
+
+ return create(targetLocation.location, partFun, keys.getInt(0));
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ return key.equals(((AffinityFactory) o).key);
+ }
+
+ @Override public int hashCode() {
+ return key.hashCode();
+ }
+
+ private static DestinationFunction create(Location location,
ToIntFunction<Object> partFun, int affField) {
+ return row -> location.nodes(partFun.applyAsInt(((Object[])
row)[affField]));
+ }
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
similarity index 67%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
index 76af490..32cf357 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
@@ -14,14 +14,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.metadata;
-import java.util.List;
-import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
/**
*
*/
-public interface DistributionFunction {
- List<ClusterNode> destination(Object row);
+public interface DistributionRegistry {
+ DistributionTrait distribution(int cacheId, RowType rowType);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
similarity index 68%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
index 618b3d8..7106e81 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
@@ -14,17 +14,19 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.metadata;
import java.util.List;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
import org.apache.ignite.internal.util.GridIntList;
/**
*
*/
-public class SourceDistribution {
- public PartitionsDistribution partitionMapping; // partitions mapping.
- public List<Receiver> remoteInputs; // remote inputs to notify particular
senders about final task distribution
- public GridIntList localInputs; // involved caches, used for partitions
reservation
+public class FragmentLocation {
+ public Location location;
+ public List<Receiver> remoteInputs;
+ public GridIntList localInputs;
+ public AffinityTopologyVersion topVer;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
index f644d0e..b2f91a9 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
@@ -42,14 +42,15 @@ import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTra
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
-import static
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait.DistributionType.HASH;
+import static
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
/**
*
*/
public class IgniteMdDistribution implements
MetadataHandler<IgniteMetadata.DistributionTraitMetadata> {
public static final RelMetadataProvider SOURCE =
-
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.DISTRIBUTION_TRAIT.method(),
new IgniteMdDistribution());
+ ReflectiveRelMetadataProvider.reflectiveSource(
+ IgniteMethod.DISTRIBUTION_TRAIT.method(), new
IgniteMdDistribution());
@Override public MetadataDef<IgniteMetadata.DistributionTraitMetadata>
getDef() {
return IgniteMetadata.DistributionTraitMetadata.DEF;
@@ -108,7 +109,7 @@ public class IgniteMdDistribution implements
MetadataHandler<IgniteMetadata.Dist
newKeys.add(mapped);
}
- return IgniteDistributions.hash(newKeys, trait.functionFactory());
+ return IgniteDistributions.hash(newKeys,
trait.destinationFunctionFactory());
}
return trait;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
new file mode 100644
index 0000000..0c20e59
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentLocationMetadata;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.Edge;
+import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class IgniteMdFragmentLocation implements
MetadataHandler<FragmentLocationMetadata> {
+ public static final RelMetadataProvider SOURCE =
+ ReflectiveRelMetadataProvider.reflectiveSource(
+ IgniteMethod.FRAGMENT_LOCATION.method(), new
IgniteMdFragmentLocation());
+
+ @Override public MetadataDef<FragmentLocationMetadata> getDef() {
+ return FragmentLocationMetadata.DEF;
+ }
+
+ public FragmentLocation getLocation(RelNode rel, RelMetadataQuery mq) {
+ throw new AssertionError();
+ }
+
+ public FragmentLocation getLocation(RelSubset rel, RelMetadataQuery mq) {
+ throw new AssertionError();
+ }
+
+ public FragmentLocation getLocation(SingleRel rel, RelMetadataQuery mq) {
+ return location(rel.getInput(), mq);
+ }
+
+ public FragmentLocation getLocation(Sender rel, RelMetadataQuery mq) {
+ return rel.location(mq);
+ }
+
+ public FragmentLocation getLocation(BiRel rel, RelMetadataQuery mq) {
+ mq = RelMetadataQueryEx.wrap(mq);
+
+ FragmentLocation leftLoc = location(rel.getLeft(), mq);
+ FragmentLocation rightLoc = location(rel.getRight(), mq);
+
+ try {
+ return merge(leftLoc, rightLoc);
+ }
+ catch (LocationMappingException e) {
+ // a replicated cache is cheaper to redistribute
+ if (!leftLoc.location.hasPartitionedCaches())
+ throw planningException(rel, e, true);
+ else if (!rightLoc.location.hasPartitionedCaches())
+ throw planningException(rel, e, false);
+
+ // both sub-trees have partitioned sources, less cost is better
+ RelOptCluster cluster = rel.getCluster();
+
+ RelOptCost leftCost =
rel.getLeft().computeSelfCost(cluster.getPlanner(), mq);
+ RelOptCost rightCost =
rel.getRight().computeSelfCost(cluster.getPlanner(), mq);
+
+ throw planningException(rel, e, leftCost.isLe(rightCost));
+ }
+ }
+
+ private OptimisticPlanningException planningException(BiRel rel, Exception
cause, boolean splitLeft) {
+ String msg = "Failed to calculate physical distribution";
+
+ if (splitLeft)
+ return new OptimisticPlanningException(msg, new Edge(rel,
rel.getLeft(), 0), cause);
+
+ return new OptimisticPlanningException(msg, new Edge(rel,
rel.getRight(), 1), cause);
+ }
+
+ public FragmentLocation getLocation(Receiver rel, RelMetadataQuery mq) {
+ FragmentLocation res = new FragmentLocation();
+
+ res.remoteInputs = Collections.singletonList(rel);
+ res.topVer =
rel.getCluster().getPlanner().getContext().unwrap(AffinityTopologyVersion.class);
+
+ return res;
+ }
+
+ public FragmentLocation getLocation(IgniteTableScan rel, RelMetadataQuery
mq) {
+ return rel.location();
+ }
+
+ public static FragmentLocation location(RelNode rel, RelMetadataQuery mq) {
+ return RelMetadataQueryEx.wrap(mq).getFragmentLocation(rel);
+ }
+
+ private static FragmentLocation merge(FragmentLocation left,
FragmentLocation right) throws LocationMappingException {
+ FragmentLocation res = new FragmentLocation();
+
+ res.location = merge(left.location, right.location);
+ res.remoteInputs = merge(left.remoteInputs, right.remoteInputs);
+ res.localInputs = merge(left.localInputs, right.localInputs);
+ res.topVer = U.firstNotNull(left.topVer, right.topVer);
+
+ return res;
+ }
+
+ private static Location merge(Location left, Location right) throws
LocationMappingException {
+ if (left == null)
+ return right;
+ if (right == null)
+ return left;
+
+ return left.mergeWith(right);
+ }
+
+ private static <T> List<T> merge(List<T> left, List<T> right) {
+ if (left == null)
+ return right;
+ if (right == null)
+ return left;
+
+ return Commons.union(left, right);
+ }
+
+ private static GridIntList merge(GridIntList left, GridIntList right) {
+ if (left == null)
+ return right;
+
+ if (right != null)
+ left.addAll(right);
+
+ return left;
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
deleted file mode 100644
index a944db1..0000000
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.metadata;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.BiRel;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.calcite.rel.metadata.MetadataDef;
-import org.apache.calcite.rel.metadata.MetadataHandler;
-import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
-import org.apache.calcite.rel.metadata.RelMetadataProvider;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.SourceDistributionMetadata;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
-import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
-import org.apache.ignite.internal.util.GridIntList;
-
-/**
- *
- */
-public class IgniteMdSourceDistribution implements
MetadataHandler<SourceDistributionMetadata> {
- public static final RelMetadataProvider SOURCE =
-
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.SOURCE_DISTRIBUTION.method(),
new IgniteMdSourceDistribution());
-
- @Override public MetadataDef<SourceDistributionMetadata> getDef() {
- return SourceDistributionMetadata.DEF;
- }
-
- public SourceDistribution getSourceDistribution(RelNode rel,
RelMetadataQuery mq) {
- throw new AssertionError();
- }
-
- public SourceDistribution getSourceDistribution(RelSubset rel,
RelMetadataQuery mq) {
- throw new AssertionError();
- }
-
- public SourceDistribution getSourceDistribution(SingleRel rel,
RelMetadataQuery mq) {
- return distribution(rel.getInput(), mq);
- }
-
- public SourceDistribution getSourceDistribution(Sender rel,
RelMetadataQuery mq) {
- return rel.sourceDistribution(mq);
- }
-
- public SourceDistribution getSourceDistribution(BiRel rel,
RelMetadataQuery mq) {
- mq = RelMetadataQueryEx.wrap(mq);
-
- return merge(distribution(rel.getLeft(), mq),
distribution(rel.getRight(), mq));
- }
-
- public SourceDistribution getSourceDistribution(Receiver rel,
RelMetadataQuery mq) {
- SourceDistribution res = new SourceDistribution();
-
- ArrayList<Receiver> remoteInputs = new ArrayList<>();
- remoteInputs.add(rel);
- res.remoteInputs = remoteInputs;
-
- return res;
- }
-
- public SourceDistribution getSourceDistribution(IgniteTableScan rel,
RelMetadataQuery mq) {
- return rel.tableDistribution();
- }
-
- public static SourceDistribution distribution(RelNode rel,
RelMetadataQuery mq) {
- return RelMetadataQueryEx.wrap(mq).getSourceDistribution(rel);
- }
-
- private static SourceDistribution merge(SourceDistribution left,
SourceDistribution right) {
- SourceDistribution res = new SourceDistribution();
-
- res.partitionMapping = merge(left.partitionMapping,
right.partitionMapping);
- res.remoteInputs = merge(left.remoteInputs, right.remoteInputs);
- res.localInputs = merge(left.localInputs, right.localInputs);
-
- return res;
- }
-
- private static PartitionsDistribution merge(PartitionsDistribution left,
PartitionsDistribution right) {
- if (left == null)
- return right;
- if (right == null)
- return left;
-
- return left.mergeWith(right);
- }
-
- private static <T> List<T> merge(List<T> left, List<T> right) {
- if (left == null)
- return right;
-
- if (right != null)
- left.addAll(right);
-
- return left;
- }
-
- private static GridIntList merge(GridIntList left, GridIntList right) {
- if (left == null)
- return right;
-
- if (right != null)
- left.addAll(right);
-
- return left;
- }
-}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index 090ef53..d12c19c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -25,7 +25,6 @@ import org.apache.calcite.rel.metadata.MetadataDef;
import org.apache.calcite.rel.metadata.MetadataHandler;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
@@ -37,11 +36,12 @@ public class IgniteMetadata {
ChainedRelMetadataProvider.of(
ImmutableList.of(
IgniteMdDistribution.SOURCE,
- IgniteMdSourceDistribution.SOURCE,
+ IgniteMdFragmentLocation.SOURCE,
DefaultRelMetadataProvider.INSTANCE));
public interface DistributionTraitMetadata extends Metadata {
- MetadataDef<DistributionTraitMetadata> DEF =
MetadataDef.of(DistributionTraitMetadata.class,
DistributionTraitMetadata.Handler.class,
IgniteMethod.DISTRIBUTION_TRAIT.method());
+ MetadataDef<DistributionTraitMetadata> DEF =
MetadataDef.of(DistributionTraitMetadata.class,
+ DistributionTraitMetadata.Handler.class,
IgniteMethod.DISTRIBUTION_TRAIT.method());
/** Determines how the rows are distributed. */
DistributionTrait getDistributionTrait();
@@ -52,15 +52,16 @@ public class IgniteMetadata {
}
}
- public interface SourceDistributionMetadata extends Metadata {
- MetadataDef<SourceDistributionMetadata> DEF =
MetadataDef.of(SourceDistributionMetadata.class,
SourceDistributionMetadata.Handler.class,
IgniteMethod.SOURCE_DISTRIBUTION.method());
+ public interface FragmentLocationMetadata extends Metadata {
+ MetadataDef<FragmentLocationMetadata> DEF =
MetadataDef.of(FragmentLocationMetadata.class,
+ FragmentLocationMetadata.Handler.class,
IgniteMethod.FRAGMENT_LOCATION.method());
/** Determines how the rows are distributed. */
- SourceDistribution getSourceDistribution();
+ FragmentLocation getLocation();
/** Handler API. */
- interface Handler extends MetadataHandler<SourceDistributionMetadata> {
- SourceDistribution getSourceDistribution(RelNode r,
RelMetadataQuery mq);
+ interface Handler extends MetadataHandler<FragmentLocationMetadata> {
+ FragmentLocation getLocation(RelNode r, RelMetadataQuery mq);
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/Location.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/Location.java
new file mode 100644
index 0000000..c83d16b
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/Location.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class Location {
+ public static byte HAS_MOVING_PARTITIONS = 0x1;
+ public static byte HAS_REPLICATED_CACHES = 0x2;
+ public static byte HAS_PARTITIONED_CACHES = 0x4;
+ public static byte PARTIALLY_REPLICATED = 0x8;
+ public static byte DEDUPLICATED = 0x16;
+
+ private final List<ClusterNode> nodes;
+ private final List<List<ClusterNode>> assignments;
+ private final byte flags;
+
+ public Location(List<ClusterNode> nodes, List<List<ClusterNode>>
assignments, byte flags) {
+ this.nodes = nodes;
+ this.assignments = assignments;
+ this.flags = flags;
+ }
+
+ public List<ClusterNode> nodes() {
+ return nodes;
+ }
+
+ public List<ClusterNode> nodes(int part) {
+ return assignments.get(part % assignments.size());
+ }
+
+ public Location mergeWith(Location other) throws LocationMappingException {
+ byte flags = (byte) (this.flags | other.flags);
+
+ if ((flags & PARTIALLY_REPLICATED) == 0)
+ return new Location(U.firstNotNull(nodes, other.nodes),
mergeAssignments(other, null), flags);
+
+ List<ClusterNode> nodes;
+
+ if (this.nodes == null)
+ nodes = other.nodes;
+ else if (other.nodes == null)
+ nodes = this.nodes;
+ else
+ nodes = Commons.intersect(this.nodes, other.nodes);
+
+ if (nodes != null && nodes.isEmpty())
+ throw new LocationMappingException("Failed to map fragment to
location.");
+
+ return new Location(nodes, mergeAssignments(other, nodes), flags);
+ }
+
+ private List<List<ClusterNode>> mergeAssignments(Location other,
List<ClusterNode> nodes) throws LocationMappingException {
+ byte flags = (byte) (this.flags | other.flags);
List<List<ClusterNode>> left = assignments, right = other.assignments;
+
+ if (left == null && right == null)
+ return null; // nothing to intersect;
+
+ if (left == null || right == null || (flags & HAS_MOVING_PARTITIONS)
== 0) {
+ List<List<ClusterNode>> assignments = U.firstNotNull(left, right);
+
+ if (nodes == null || (flags & PARTIALLY_REPLICATED) == 0)
+ return assignments;
+
+ List<List<ClusterNode>> assignments0 = new
ArrayList<>(assignments.size());
+ HashSet<ClusterNode> nodesSet = new HashSet<>(nodes);
+
+ for (List<ClusterNode> partNodes : assignments) {
+ List<ClusterNode> partNodes0 = new
ArrayList<>(partNodes.size());
+
+ for (ClusterNode partNode : partNodes) {
+ if (nodesSet.contains(partNode))
+ partNodes0.add(partNode);
+ }
+
+ if (partNodes0.isEmpty())
+ throw new LocationMappingException("Failed to map fragment
to location.");
+
+ assignments0.add(partNodes0);
+ }
+
+ return assignments0;
+ }
+
+ List<List<ClusterNode>> assignments = new ArrayList<>(left.size());
+ HashSet<ClusterNode> nodesSet = nodes != null ? new HashSet<>(nodes) :
null;
+
+ for (int i = 0; i < left.size(); i++) {
+ List<ClusterNode> leftNodes = left.get(i), partNodes = new
ArrayList<>(leftNodes.size());
+ HashSet<ClusterNode> rightNodesSet = new HashSet<>(right.get(i));
+
+ for (ClusterNode partNode : leftNodes) {
+ if (rightNodesSet.contains(partNode) && (nodesSet == null ||
nodesSet.contains(partNode)))
+ partNodes.add(partNode);
+ }
+
+ if (partNodes.isEmpty())
+ throw new LocationMappingException("Failed to map fragment to
location.");
+
+ assignments.add(partNodes);
+ }
+
+ return assignments;
+ }
+
+ public Location deduplicate() throws LocationMappingException {
+ if (assignments == null || (flags & DEDUPLICATED) == DEDUPLICATED)
+ return this;
+
+ HashSet<ClusterNode> nodes0 = new HashSet<>();
+ List<List<ClusterNode>> assignments0 = new
ArrayList<>(assignments.size());
+
+ for (List<ClusterNode> partNodes : assignments) {
+ ClusterNode node = F.first(partNodes);
+
+ if (node == null)
+ throw new LocationMappingException("Failed to map fragment to
location.");
+
+ assignments0.add(Collections.singletonList(node));
+ nodes0.add(node);
+ }
+
+ return new Location(new ArrayList<>(nodes0), assignments0,
(byte)(flags | DEDUPLICATED));
+ }
+
+ public boolean hasMovingPartitions() {
+ return (flags & HAS_MOVING_PARTITIONS) == HAS_MOVING_PARTITIONS;
+ }
+
+ public boolean hasReplicatedCaches() {
+ return (flags & HAS_REPLICATED_CACHES) == HAS_REPLICATED_CACHES;
+ }
+
+ public boolean hasPartitionedCaches() {
+ return (flags & HAS_PARTITIONED_CACHES) == HAS_PARTITIONED_CACHES;
+ }
+
+ public boolean partiallyReplicated() {
+ return (flags & PARTIALLY_REPLICATED) == PARTIALLY_REPLICATED;
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationMappingException.java
similarity index 75%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationMappingException.java
index 76af490..e1d884a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationMappingException.java
@@ -14,14 +14,13 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.trait;
-
-import java.util.List;
-import org.apache.ignite.cluster.ClusterNode;
+package org.apache.ignite.internal.processors.query.calcite.metadata;
/**
*
*/
-public interface DistributionFunction {
- List<ClusterNode> destination(Object row);
+public class LocationMappingException extends Exception {
+ public LocationMappingException(String message) {
+ super(message);
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
similarity index 63%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
index 345957e..d81e440 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
@@ -14,15 +14,15 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.metadata;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
/**
*
*/
-public interface PartitionsDistributionRegistry {
- PartitionsDistribution single(); // returns local node with single
partition
- PartitionsDistribution random(AffinityTopologyVersion topVer); // returns
random distribution, partitions count depends on nodes count
- PartitionsDistribution distributed(int cacheId, AffinityTopologyVersion
topVer); // returns cache distribution
+public interface LocationRegistry {
+ Location single(AffinityTopologyVersion topVer); // returns local node
with single partition
+ Location random(AffinityTopologyVersion topVer); // returns random
distribution, partitions count depends on nodes count
+ Location distributed(int cacheId, AffinityTopologyVersion topVer); //
returns cache distribution
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
similarity index 61%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
index 8bc129e..fb609ea 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
@@ -14,14 +14,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.metadata;
-import org.apache.calcite.util.ImmutableIntList;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import org.apache.ignite.internal.processors.query.calcite.util.Edge;
/**
*
*/
-public interface DistributionFunctionFactory {
- DistributionFunction create(SourceDistribution targetDistr,
ImmutableIntList keys);
+public class OptimisticPlanningException extends RuntimeException{
+ private final Edge edge;
+
+ public OptimisticPlanningException(String message, Edge edge, Throwable
cause) {
+ super(message, cause);
+ this.edge = edge;
+ }
+
+ public Edge edge() {
+ return edge;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index 8c10a51..fc6e4fa 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -19,7 +19,6 @@ package
org.apache.ignite.internal.processors.query.calcite.metadata;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import org.jetbrains.annotations.NotNull;
@@ -31,47 +30,47 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
private static final JaninoRelMetadataProvider PROVIDER =
JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER);
private IgniteMetadata.DistributionTraitMetadata.Handler
distributionTraitHandler;
- private IgniteMetadata.SourceDistributionMetadata.Handler
sourceDistributionHandler;
+ private IgniteMetadata.FragmentLocationMetadata.Handler
sourceDistributionHandler;
- private RelMetadataQueryEx() {
- super(JaninoRelMetadataProvider.DEFAULT, RelMetadataQuery.EMPTY);
+ @SuppressWarnings("MethodOverridesStaticMethodOfSuperclass")
+ public static RelMetadataQueryEx instance() {
+ return new RelMetadataQueryEx(PROTO);
+ }
- distributionTraitHandler =
initialHandler(IgniteMetadata.DistributionTraitMetadata.Handler.class);
- sourceDistributionHandler =
initialHandler(IgniteMetadata.SourceDistributionMetadata.Handler.class);
+ public static RelMetadataQueryEx wrap(@NotNull RelMetadataQuery mq) {
+ if (mq.getClass() == RelMetadataQueryEx.class)
+ return (RelMetadataQueryEx) mq;
+
+ return new RelMetadataQueryEx(mq);
}
- protected RelMetadataQueryEx(JaninoRelMetadataProvider metadataProvider,
RelMetadataQueryEx prototype) {
- super(metadataProvider, prototype);
+ private RelMetadataQueryEx(@NotNull RelMetadataQueryEx parent) {
+ super(PROVIDER, parent);
- distributionTraitHandler = prototype.distributionTraitHandler;
- sourceDistributionHandler = prototype.sourceDistributionHandler;
+ distributionTraitHandler = parent.distributionTraitHandler;
+ sourceDistributionHandler = parent.sourceDistributionHandler;
}
- protected RelMetadataQueryEx(JaninoRelMetadataProvider metadataProvider,
RelMetadataQuery parent) {
- super(metadataProvider, parent);
+ private RelMetadataQueryEx(@NotNull RelMetadataQuery parent) {
+ super(PROVIDER, parent);
distributionTraitHandler = PROTO.distributionTraitHandler;
sourceDistributionHandler = PROTO.sourceDistributionHandler;
}
- @SuppressWarnings("MethodOverridesStaticMethodOfSuperclass")
- public static RelMetadataQueryEx instance() {
- return new RelMetadataQueryEx(PROVIDER, PROTO);
- }
-
- public static RelMetadataQueryEx wrap(@NotNull RelMetadataQuery mq) {
- if (mq.getClass() == RelMetadataQueryEx.class)
- return (RelMetadataQueryEx) mq;
+ private RelMetadataQueryEx() {
+ super(JaninoRelMetadataProvider.DEFAULT, RelMetadataQuery.EMPTY);
- return new RelMetadataQueryEx(PROVIDER, mq);
+ distributionTraitHandler =
initialHandler(IgniteMetadata.DistributionTraitMetadata.Handler.class);
+ sourceDistributionHandler =
initialHandler(IgniteMetadata.FragmentLocationMetadata.Handler.class);
}
- public SourceDistribution getSourceDistribution(RelNode rel) {
+ public FragmentLocation getFragmentLocation(RelNode rel) {
for (;;) {
try {
- return sourceDistributionHandler.getSourceDistribution(rel,
this);
+ return sourceDistributionHandler.getLocation(rel, this);
} catch (JaninoRelMetadataProvider.NoHandler e) {
- sourceDistributionHandler = revise(e.relClass,
IgniteMetadata.SourceDistributionMetadata.DEF);
+ sourceDistributionHandler = revise(e.relClass,
IgniteMetadata.FragmentLocationMetadata.DEF);
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/CloneContext.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/CloneContext.java
deleted file mode 100644
index 967606f..0000000
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/CloneContext.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rel;
-
-import java.util.IdentityHashMap;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.rel.RelNode;
-
-/**
- *
- */
-public final class CloneContext {
- private final RelOptCluster cluster;
- private final IdentityHashMap<IgniteRel, IgniteRel> mapping = new
IdentityHashMap<>();
-
- public CloneContext(RelOptCluster cluster) {
- this.cluster = cluster;
- }
-
- public RelOptCluster getCluster() {
- return cluster;
- }
-
- public <T extends RelNode> T clone(T src) {
- if (!(src instanceof IgniteRel))
- throw new IllegalStateException("Unexpected node type: " +
src.getClass());
-
- return (T) mapping.computeIfAbsent((IgniteRel) src, this::clone0);
- }
-
- private IgniteRel clone0(IgniteRel src) {
- return src.clone(this);
- }
-}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
index 1bd637c..5fce676 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
@@ -54,10 +54,6 @@ public final class IgniteExchange extends SingleRel
implements IgniteRel {
return new IgniteExchange(getCluster(), traitSet, sole(inputs));
}
- @Override public IgniteRel clone(CloneContext ctx) {
- return new IgniteExchange(ctx.getCluster(), getTraitSet(),
ctx.clone(getInput()));
- }
-
@Override public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw)
.item("distribution",
getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index 1e23373..ed8b52f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -42,10 +42,6 @@ public final class IgniteFilter extends Filter implements
IgniteRel {
return variablesSet;
}
- @Override public IgniteRel clone(CloneContext ctx) {
- return new IgniteFilter(ctx.getCluster(), getTraitSet(),
ctx.clone(getInput()), getCondition(), variablesSet);
- }
-
@Override public IgniteFilter copy(RelTraitSet traitSet, RelNode input,
RexNode condition) {
return new IgniteFilter(getCluster(), traitSet, input, condition,
variablesSet);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
index 2818382..6450abd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
@@ -42,10 +42,6 @@ public final class IgniteHashJoin extends Join implements
IgniteRel {
this.semiJoinDone = semiJoinDone;
}
- @Override public IgniteRel clone(CloneContext ctx) {
- return new IgniteHashJoin(ctx.getCluster(), getTraitSet(),
ctx.clone(getLeft()), ctx.clone(getRight()), getCondition(), variablesSet,
getJoinType(), semiJoinDone);
- }
-
@Override public IgniteHashJoin copy(RelTraitSet traitSet, RexNode
conditionExpr,
RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone)
{
return new IgniteHashJoin(getCluster(), traitSet, left, right,
conditionExpr, variablesSet, joinType, semiJoinDone);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index 1c1e4ec..79408cd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -37,11 +37,7 @@ public final class IgniteProject extends Project implements
IgniteRel {
super(cluster, traitSet, input, projects, rowType);
}
- @Override public IgniteRel clone(CloneContext ctx) {
- return new IgniteProject(ctx.getCluster(), getTraitSet(),
ctx.clone(getInput()), getProjects(), getRowType());
- }
-
- @Override public IgniteProject copy(RelTraitSet traitSet, RelNode input,
+ @Override public IgniteProject copy(RelTraitSet traitSet, RelNode input,
List<RexNode> projects, RelDataType rowType) {
return new IgniteProject(getCluster(), traitSet, input, projects, rowType);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
index d2a47da..9a761ad 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
@@ -31,6 +31,4 @@ public interface IgniteRel extends RelNode {
return true; // Enables trait definition conversion
}
};
-
- IgniteRel clone(CloneContext ctx);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
index 5ceaaae..4f854c3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
@@ -22,8 +22,8 @@ import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
public final class IgniteTableScan extends TableScan implements IgniteRel {
@@ -31,18 +31,14 @@ public final class IgniteTableScan extends TableScan
implements IgniteRel {
super(cluster, traitSet, table);
}
- @Override public IgniteRel clone(CloneContext ctx) {
- return new IgniteTableScan(ctx.getCluster(), getTraitSet(), getTable());
- }
-
@Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
assert inputs.isEmpty();
return this;
}
- public SourceDistribution tableDistribution() {
+ public FragmentLocation location() {
boolean local = !getTraitSet().isEnabled(DistributionTraitDef.INSTANCE);
- return
getTable().unwrap(IgniteTable.class).sourceDistribution(getCluster().getPlanner().getContext(),
local);
+ return
getTable().unwrap(IgniteTable.class).location(getCluster().getPlanner().getContext(),
local);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
index a4d46f7..74c9d1b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
@@ -21,14 +21,15 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
/**
*
*/
-public class Receiver extends SingleRel implements IgniteRel {
- private SourceDistribution sourceDistribution;
+public final class Receiver extends SingleRel implements IgniteRel {
+ private FragmentLocation sourceDistribution;
/**
* @param cluster Cluster this relational expression belongs to
@@ -49,17 +50,17 @@ public class Receiver extends SingleRel implements
IgniteRel {
return new Receiver(getCluster(), traitSet, (Sender) sole(inputs));
}
- @Override public IgniteRel clone(CloneContext ctx) {
- return new Receiver(ctx.getCluster(), getTraitSet(),
ctx.clone(getInput()));
- }
-
- public void init(SourceDistribution targetDistribution, RelMetadataQueryEx
mq) {
- getInput().init(targetDistribution);
+ public void init(FragmentLocation targetDistribution, RelMetadataQueryEx
mq) {
+ getInput().init(targetDistribution,
getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
- sourceDistribution = getInput().sourceDistribution(mq);
+ sourceDistribution = getInput().location(mq);
}
- public SourceDistribution sourceDistribution() {
+ public FragmentLocation sourceDistribution() {
return sourceDistribution;
}
+
+ public void reset() {
+ sourceDistribution = null;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
index 715eb3f..f53b953 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
@@ -22,19 +22,19 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
-import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
/**
*
*/
-public class Sender extends SingleRel implements IgniteRel {
- private SourceDistribution sourceDistribution;
- private SourceDistribution targetDistribution;
- private DistributionFunction targetFunction;
+public final class Sender extends SingleRel implements IgniteRel {
+ private FragmentLocation location;
+ private FragmentLocation targetLocation;
+ private DistributionTrait targetDistribution;
+ private DestinationFunction destinationFunction;
/**
* Creates a <code>SingleRel</code>.
@@ -47,34 +47,36 @@ public class Sender extends SingleRel implements IgniteRel {
super(cluster, traits, input);
}
- @Override public IgniteRel clone(CloneContext ctx) {
- return new Sender(ctx.getCluster(), getTraitSet(),
ctx.clone(getInput()));
- }
-
@Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new Sender(getCluster(), traitSet, sole(inputs));
}
- public void init(SourceDistribution targetDistribution) {
+ public void init(FragmentLocation targetLocation, DistributionTrait
targetDistribution) {
+ this.targetLocation = targetLocation;
this.targetDistribution = targetDistribution;
}
- public DistributionFunction targetFunction() {
- if (targetFunction == null) {
- assert targetDistribution != null &&
targetDistribution.partitionMapping != null;
-
- DistributionTrait distribution =
getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+ public DestinationFunction targetFunction() {
+ if (destinationFunction == null) {
+ assert targetLocation != null && targetLocation.location != null
&& targetDistribution != null;
- targetFunction =
distribution.functionFactory().create(targetDistribution, distribution.keys());
+ destinationFunction =
targetDistribution.destinationFunctionFactory().create(targetLocation,
targetDistribution.keys());
}
- return targetFunction;
+ return destinationFunction;
}
- public SourceDistribution sourceDistribution(RelMetadataQuery mq) {
- if (sourceDistribution == null)
- sourceDistribution =
RelMetadataQueryEx.wrap(mq).getSourceDistribution(getInput());
+ public FragmentLocation location(RelMetadataQuery mq) {
+ if (location == null)
+ location =
RelMetadataQueryEx.wrap(mq).getFragmentLocation(getInput());
+
+ return location;
+ }
- return sourceDistribution;
+ public void reset() {
+ location = null;
+ targetLocation = null;
+ targetDistribution = null;
+ destinationFunction = null;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 41ffb12..b5940b0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -27,13 +27,13 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -77,24 +77,35 @@ public class IgniteTable extends AbstractTable implements
TranslatableTable {
}
public DistributionTrait distributionTrait(Context context) {
- return IgniteDistributions.hash(rowType.distributionKeys(),
IgniteDistributions.noOpFunction()); // TODO
+ return
distributionRegistry(context).distribution(CU.cacheId(cacheName), rowType);
}
- public SourceDistribution sourceDistribution(Context context, boolean
local) {
+ public FragmentLocation location(Context context, boolean local) {
int cacheId = CU.cacheId(cacheName);
- SourceDistribution res = new SourceDistribution();
+ FragmentLocation res = new FragmentLocation();
GridIntList localInputs = new GridIntList();
localInputs.add(cacheId);
res.localInputs = localInputs;
- if (!local) {
- PartitionsDistributionRegistry registry =
context.unwrap(PartitionsDistributionRegistry.class);
- AffinityTopologyVersion topVer =
context.unwrap(AffinityTopologyVersion.class);
- res.partitionMapping = registry.distributed(cacheId, topVer);
- }
+ if (!local)
+ res.location = locationRegistry(context).distributed(cacheId,
topologyVersion(context));
+
+ res.topVer = topologyVersion(context);
return res;
}
+
+ private LocationRegistry locationRegistry(Context ctx) {
+ return ctx.unwrap(LocationRegistry.class);
+ }
+
+ public DistributionRegistry distributionRegistry(Context ctx) {
+ return ctx.unwrap(DistributionRegistry.class);
+ }
+
+ private AffinityTopologyVersion topologyVersion(Context ctx) {
+ return ctx.unwrap(AffinityTopologyVersion.class);
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index 2a98eaa..2fd35d7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -19,6 +19,10 @@ package
org.apache.ignite.internal.processors.query.calcite.splitter;
import org.apache.calcite.plan.Context;
import org.apache.calcite.rel.RelNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationMappingException;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
@@ -30,7 +34,7 @@ import org.apache.ignite.internal.util.typedef.F;
public class Fragment {
public final RelNode rel;
- public SourceDistribution distribution;
+ public FragmentLocation fragmentLocation;
public Fragment(RelNode rel) {
this.rel = rel;
@@ -39,22 +43,28 @@ public class Fragment {
public void init(Context ctx) {
RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
- distribution = mq.getSourceDistribution(rel);
+ fragmentLocation = mq.getFragmentLocation(rel);
- PartitionsDistribution mapping = distribution.partitionMapping;
+ AffinityTopologyVersion topVer = topologyVersion(ctx);
- if (mapping == null) {
+ if (fragmentLocation.location == null) {
if (!isRoot())
- distribution.partitionMapping =
registry(ctx).random(topologyVersion(ctx));
- else if (!F.isEmpty(distribution.remoteInputs))
- distribution.partitionMapping = registry(ctx).single();
+ fragmentLocation.location = registry(ctx).random(topVer);
+ else if (!F.isEmpty(fragmentLocation.remoteInputs))
+ fragmentLocation.location = registry(ctx).single(topVer);
+ }
+ else {
+ try {
+ fragmentLocation.location =
fragmentLocation.location.deduplicate();
+ }
+ catch (LocationMappingException e) {
+ throw new IgniteSQLException("Failed to map fragment to
location, partition lost.", e);
+ }
}
- else if (mapping.excessive)
- distribution.partitionMapping = mapping.deduplicate();
- if (!F.isEmpty(distribution.remoteInputs)) {
- for (Receiver input : distribution.remoteInputs)
- input.init(distribution, mq);
+ if (!F.isEmpty(fragmentLocation.remoteInputs)) {
+ for (Receiver input : fragmentLocation.remoteInputs)
+ input.init(fragmentLocation, mq);
}
}
@@ -62,11 +72,24 @@ public class Fragment {
return !(rel instanceof Sender);
}
- private PartitionsDistributionRegistry registry(Context ctx) {
- return ctx.unwrap(PartitionsDistributionRegistry.class);
+ private LocationRegistry registry(Context ctx) {
+ return ctx.unwrap(LocationRegistry.class);
}
private AffinityTopologyVersion topologyVersion(Context ctx) {
return ctx.unwrap(AffinityTopologyVersion.class);
}
+
+ public void reset() {
+ if (rel instanceof Sender)
+ ((Sender) rel).reset();
+
+ if (fragmentLocation != null &&
!F.isEmpty(fragmentLocation.remoteInputs)) {
+ for (Receiver receiver : fragmentLocation.remoteInputs) {
+ receiver.reset();
+ }
+ }
+
+ fragmentLocation = null;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
deleted file mode 100644
index fb2728a..0000000
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.splitter;
-
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- *
- */
-public class PartitionsDistribution {
- public static final int[] ALL_PARTS = new int[0];
- private static final int[] EMPTY = new int[0];
-
- public boolean excessive;
- public int parts;
- public int[] nodes;
- public int[][] nodeParts;
-
- public PartitionsDistribution mergeWith(PartitionsDistribution other) {
- if (parts != 0 && other.parts != 0 && parts != other.parts)
- throw new IllegalStateException("Non-collocated query fragment.");
-
- int[] nodes0 = null;
- int[][] nodeParts0 = null;
-
- int i = 0, j = 0, k = 0;
-
- while (i < nodes.length && j < other.nodes.length) {
- if (nodes[i] < other.nodes[j])
- i++;
- else if (other.nodes[j] < nodes[i])
- j++;
- else {
- int[] mergedParts = merge(nodeParts[i], other.nodeParts[j]);
-
- if (mergedParts == ALL_PARTS || mergedParts.length > 0) {
- if (nodes0 == null) {
- int len = Math.min(nodes.length, other.nodes.length);
-
- nodes0 = new int[len];
- nodeParts0 = new int[len][];
- }
-
- nodes0[k] = nodes[i];
- nodeParts0[k] = mergedParts;
-
- k++;
- }
-
- i++;
- j++;
- }
- }
-
- PartitionsDistribution res = new PartitionsDistribution();
-
- res.excessive = excessive && other.excessive;
- res.parts = Math.max(parts, other.parts);
- res.nodes = nodes0.length == k ? nodes0 : Arrays.copyOf(nodes0, k);
- res.nodeParts = nodeParts0.length == k ? nodeParts0 :
Arrays.copyOf(nodeParts0, k);
-
- check(res);
-
- return res;
- }
-
- private void check(PartitionsDistribution res) {
- if (res.parts == 0)
- return; // Only receivers and/or replicated caches in task subtree.
-
- BitSet check = new BitSet(res.parts);
-
- int checkedParts = 0;
-
- for (int[] nodePart : res.nodeParts) {
- for (int p : nodePart) {
- if (!check.get(p)) {
- check.set(p);
- checkedParts++;
- }
- }
- }
-
- if (checkedParts < res.parts)
- throw new IllegalStateException("Failed to collocate used
caches.");
- }
-
- private int[] merge(int[] left, int[] right) {
- if (left == ALL_PARTS)
- return right;
- if (right == ALL_PARTS)
- return left;
-
- int[] nodeParts = null;
-
- int i = 0, j = 0, k = 0;
-
- while (i < left.length && j < right.length) {
- if (left[i] < right[j])
- i++;
- else if (right[j] < left[i])
- j++;
- else {
- if (nodeParts == null)
- nodeParts = new int[Math.min(left.length, right.length)];
-
- nodeParts[k++] = left[i];
-
- i++;
- j++;
- }
- }
-
- if (k == 0)
- return EMPTY;
-
- return nodeParts.length == k ? nodeParts : Arrays.copyOf(nodeParts, k);
- }
-
- public PartitionsDistribution deduplicate() {
- if (!excessive)
- return this;
-
- Map<Integer, Integer> map = new HashMap<>();
-
- int idx = 0; int[] idxs = new int[nodeParts.length];
- while (map.size() < parts) {
- int[] nodePart = nodeParts[idx];
-
- int j = idxs[idx];
-
- while (j < nodePart.length) {
- if (map.putIfAbsent(nodePart[j], nodes[idx]) == null || j + 1
== nodePart.length) {
- idxs[idx] = j + 1;
-
- break;
- }
-
- j++;
- }
-
- idx = (idx + 1) % nodes.length;
- }
-
- int[] nodes0 = new int[nodes.length]; int[][] nodeParts0 = new
int[nodes.length][];
-
- int k = 0;
-
- for (int i = 0; i < nodes.length; i++) {
- int j = 0;
-
- int[] nodePart0 = null;
-
- for (int p : nodeParts[i]) {
- if (map.get(p) == nodes[i]) {
- if (nodePart0 == null)
- nodePart0 = new int[nodeParts[i].length];
-
- nodePart0[j++] = p;
- }
- }
-
- if (nodePart0 != null) {
- nodes0[k] = nodes[i];
- nodeParts0[k] = nodePart0.length == j ? nodePart0 :
Arrays.copyOf(nodePart0, j);
-
- k++;
- }
- }
-
- PartitionsDistribution res = new PartitionsDistribution();
-
- res.parts = parts;
- res.nodes = nodes0.length == k ? nodes0 : Arrays.copyOf(nodes0, k);
- res.nodeParts = nodeParts0.length == k ? nodeParts0 :
Arrays.copyOf(nodeParts0, k);
-
- return res;
- }
-}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index d23091d..b944a9b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -16,37 +16,61 @@
package org.apache.ignite.internal.processors.query.calcite.splitter;
-import com.google.common.collect.ImmutableList;
+import java.util.List;
import org.apache.calcite.plan.Context;
-import org.apache.ignite.internal.processors.query.calcite.rel.CloneContext;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.util.Edge;
/**
*
*/
public class QueryPlan {
- private final ImmutableList<Fragment> fragments;
+ private final List<Fragment> fragments;
- public QueryPlan(ImmutableList<Fragment> fragments) {
+ public QueryPlan(List<Fragment> fragments) {
this.fragments = fragments;
}
public void init(Context ctx) {
- for (Fragment fragment : fragments) {
- fragment.init(ctx);
- }
- }
+ int i = 0;
- public ImmutableList<Fragment> fragments() {
- return fragments;
- }
+ while (true) {
+ try {
+ for (Fragment fragment : fragments)
+ fragment.init(ctx);
+
+ break;
+ }
+ catch (OptimisticPlanningException e) {
+ if (++i > 3)
+ throw new IgniteSQLException("Failed to map query.", e);
+
+ for (Fragment fragment0 : fragments)
+ fragment0.reset();
- public QueryPlan clone(CloneContext ctx) {
- ImmutableList.Builder<Fragment> b = ImmutableList.builder();
+ Edge edge = e.edge();
- for (Fragment f : fragments) {
- b.add(new Fragment(ctx.clone(f.rel)));
+ RelNode parent = edge.parent();
+ RelNode child = edge.child();
+
+ RelOptCluster cluster = child.getCluster();
+ RelTraitSet traitSet = child.getTraitSet();
+
+ Sender sender = new Sender(cluster, traitSet, child);
+ parent.replaceInput(edge.childIdx(), new Receiver(cluster,
traitSet, sender));
+
+ fragments.add(new Fragment(sender));
+ }
}
+ }
- return new QueryPlan(b.build());
+ public List<Fragment> fragments() {
+ return fragments;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index 31b8819..5f751d3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -16,46 +16,52 @@
package org.apache.ignite.internal.processors.query.calcite.splitter;
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import
org.apache.ignite.internal.processors.query.calcite.util.IgniteRelShuttle;
/**
*
*/
-public class Splitter extends RelShuttleImpl {
- private ImmutableList.Builder<Fragment> b;
+public class Splitter extends IgniteRelShuttle {
+ private List<Fragment> fragments;
public QueryPlan go(IgniteRel root) {
- b = ImmutableList.builder();
+ fragments = new ArrayList<>();
- return new QueryPlan(b.add(new Fragment(root.accept(this))).build());
+ fragments.add(new Fragment(root.accept(this)));
+
+ Collections.reverse(fragments);
+
+ return new QueryPlan(fragments);
}
- @Override public RelNode visit(RelNode rel) {
- if (!(rel instanceof IgniteRel))
- throw new AssertionError("Unexpected node: " + rel);
- else if (rel instanceof Sender || rel instanceof Receiver)
- throw new AssertionError("An attempt to split an already split
task.");
- else if (rel instanceof IgniteExchange) {
- IgniteExchange exchange = (IgniteExchange) rel;
+ @Override public RelNode visit(IgniteExchange rel) {
+ RelOptCluster cluster = rel.getCluster();
+
+ Sender sender = new Sender(cluster, rel.getInput().getTraitSet(),
visit(rel.getInput()));
- RelOptCluster cluster = exchange.getCluster();
- RelTraitSet traitSet = exchange.getTraitSet();
+ fragments.add(new Fragment(sender));
- Sender sender = new Sender(cluster, traitSet,
visit(exchange.getInput()));
+ return new Receiver(cluster, rel.getTraitSet(), sender);
+ }
- b.add(new Fragment(sender));
+ @Override public RelNode visit(Receiver rel) {
+ throw new AssertionError("An attempt to split an already split task.");
+ }
- return new Receiver(cluster, traitSet, sender);
- }
+ @Override public RelNode visit(Sender rel) {
+ throw new AssertionError("An attempt to split an already split task.");
+ }
- return super.visit(rel);
+ @Override protected RelNode visitOther(RelNode rel) {
+ throw new AssertionError("Unexpected node: " + rel);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
similarity index 95%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
index 76af490..7577767 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunction.java
@@ -22,6 +22,6 @@ import org.apache.ignite.cluster.ClusterNode;
/**
*
*/
-public interface DistributionFunction {
+public interface DestinationFunction {
List<ClusterNode> destination(Object row);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
similarity index 80%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
index 8bc129e..d4dc4fc 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
@@ -17,11 +17,11 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
import org.apache.calcite.util.ImmutableIntList;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
/**
*
*/
-public interface DistributionFunctionFactory {
- DistributionFunction create(SourceDistribution targetDistr,
ImmutableIntList keys);
+public interface DestinationFunctionFactory {
+ DestinationFunction create(FragmentLocation targetLocation,
ImmutableIntList keys);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index 2510c2c..e733461 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
import java.util.Objects;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.util.ImmutableIntList;
@@ -24,37 +25,57 @@ import org.apache.calcite.util.ImmutableIntList;
/**
*
*/
-public interface DistributionTrait extends RelTrait {
- enum DistributionType {
- HASH("hash"),
- RANDOM("random"),
- BROADCAST("broadcast"),
- SINGLE("single"),
- ANY("any");
-
- /** */
- private final String description;
-
- /** */
- DistributionType(String description) {
- this.description = description;
- }
+public final class DistributionTrait implements RelTrait {
+ private final DistributionType type;
+ private final ImmutableIntList keys;
+ private final DestinationFunctionFactory functionFactory;
+
+ public DistributionTrait(DistributionType type, ImmutableIntList keys,
DestinationFunctionFactory functionFactory) {
+ this.type = type;
+ this.keys = keys;
+ this.functionFactory = functionFactory;
+ }
+
+ public DistributionType type() {
+ return type;
+ }
+
+ public DestinationFunctionFactory destinationFunctionFactory() {
+ return functionFactory;
+ }
+
+ public ImmutableIntList keys() {
+ return keys;
+ }
- /** */
- @Override public String toString() {
- return description;
+ @Override public void register(RelOptPlanner planner) {}
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o instanceof DistributionTrait) {
+ DistributionTrait that = (DistributionTrait) o;
+
+ return type == that.type() && keys.equals(that.keys());
}
+
+ return false;
}
- DistributionType type();
+ @Override public int hashCode() {
+ return Objects.hash(type, keys);
+ }
- DistributionFunctionFactory functionFactory();
+ @Override public String toString() {
+ return type + (type == DistributionType.HASH ? keys.toString() : "");
+ }
- @Override default RelTraitDef getTraitDef() {
+ @Override public RelTraitDef getTraitDef() {
return DistributionTraitDef.INSTANCE;
}
- @Override default boolean satisfies(RelTrait trait) {
+ @Override public boolean satisfies(RelTrait trait) {
if (trait == this)
return true;
@@ -68,13 +89,10 @@ public interface DistributionTrait extends RelTrait {
if (type() == other.type())
return type() != DistributionType.HASH
- || (Objects.equals(keys(), other.keys()) &&
Objects.equals(functionFactory(), other.functionFactory()));
+ || (Objects.equals(keys(), other.keys())
+ && Objects.equals(destinationFunctionFactory(),
other.destinationFunctionFactory()));
return other.type() == DistributionType.RANDOM && type() ==
DistributionType.HASH;
}
- /**
- * @return Hash distribution columns ordinals or empty list otherwise.
- */
- ImmutableIntList keys();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
deleted file mode 100644
index 099c96b..0000000
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.trait;
-
-import java.util.Objects;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.util.ImmutableIntList;
-
-/**
- *
- */
-public class DistributionTraitImpl implements DistributionTrait {
- private final DistributionType type;
- private final ImmutableIntList keys;
- private final DistributionFunctionFactory functionFactory;
-
- public DistributionTraitImpl(DistributionType type, ImmutableIntList keys,
DistributionFunctionFactory functionFactory) {
- this.type = type;
- this.keys = keys;
- this.functionFactory = functionFactory;
- }
-
- @Override public DistributionType type() {
- return type;
- }
-
- @Override public DistributionFunctionFactory functionFactory() {
- return functionFactory;
- }
-
- @Override public ImmutableIntList keys() {
- return keys;
- }
-
- @Override public void register(RelOptPlanner planner) {}
-
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o instanceof DistributionTrait) {
- DistributionTrait that = (DistributionTrait) o;
-
- return type == that.type() && keys.equals(that.keys());
- }
-
- return false;
- }
-
- @Override public int hashCode() {
- return Objects.hash(type, keys);
- }
-
- @Override public String toString() {
- return type + (type == DistributionType.HASH ? keys.toString() : "");
- }
-}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
similarity index 65%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
index 8bc129e..60e37ed 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
@@ -16,12 +16,32 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
-import org.apache.calcite.util.ImmutableIntList;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
-
/**
*
*/
-public interface DistributionFunctionFactory {
- DistributionFunction create(SourceDistribution targetDistr,
ImmutableIntList keys);
+public enum DistributionType {
+ HASH("hash"),
+ RANDOM("random"),
+ BROADCAST("broadcast"),
+ SINGLE("single"),
+ ANY("any");
+
+ /**
+ *
+ */
+ private final String description;
+
+ /**
+ *
+ */
+ DistributionType(String description) {
+ this.description = description;
+ }
+
+ /**
+ *
+ */
+ @Override public String toString() {
+ return description;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 59f69fe..8f83227 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -16,21 +16,45 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.ToIntFunction;
import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.cluster.ClusterNode;
-import static
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait.DistributionType.HASH;
+import static
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
/**
*
*/
public class IgniteDistributions {
- private static final DistributionFunctionFactory NO_OP_FACTORY = (t,k) ->
null;
+ private static final DestinationFunctionFactory NO_OP_FACTORY = (t,k) ->
null;
+ private static final DestinationFunctionFactory HASH_FACTORY = (t,k) -> {
+ int[] fields = k.toIntArray();
- private static final DistributionTrait BROADCAST = new
DistributionTraitImpl(DistributionTrait.DistributionType.BROADCAST,
ImmutableIntList.of(), allTargetsFunction());
- private static final DistributionTrait SINGLE = new
DistributionTraitImpl(DistributionTrait.DistributionType.SINGLE,
ImmutableIntList.of(), singleTargetFunction());
- private static final DistributionTrait RANDOM = new
DistributionTraitImpl(DistributionTrait.DistributionType.RANDOM,
ImmutableIntList.of(), randomTargetFunction());
- private static final DistributionTrait ANY = new
DistributionTraitImpl(DistributionTrait.DistributionType.ANY,
ImmutableIntList.of(), noOpFunction());
+ ToIntFunction<Object> hashFun = r -> {
+ Object[] row = (Object[]) r;
+
+ if (row == null)
+ return 0;
+
+ int hash = 1;
+
+ for (int i : fields)
+ hash = 31 * hash + (row[i] == null ? 0 : row[i].hashCode());
+
+ return hash;
+ };
+
+ return r -> t.location.nodes(hashFun.applyAsInt(r));
+ };
+
+
+ private static final DistributionTrait BROADCAST = new
DistributionTrait(DistributionType.BROADCAST, ImmutableIntList.of(),
allTargetsFunction());
+ private static final DistributionTrait SINGLE = new
DistributionTrait(DistributionType.SINGLE, ImmutableIntList.of(),
singleTargetFunction());
+ private static final DistributionTrait RANDOM = new
DistributionTrait(DistributionType.RANDOM, ImmutableIntList.of(),
randomTargetFunction());
+ private static final DistributionTrait ANY = new
DistributionTrait(DistributionType.ANY, ImmutableIntList.of(), noOpFunction());
public static DistributionTrait any() {
return ANY;
@@ -48,23 +72,39 @@ public class IgniteDistributions {
return BROADCAST;
}
- public static DistributionTrait hash(List<Integer> keys,
DistributionFunctionFactory factory) {
- return new DistributionTraitImpl(HASH, ImmutableIntList.copyOf(keys),
factory);
+ public static DistributionTrait hash(List<Integer> keys,
DestinationFunctionFactory factory) {
+ return new DistributionTrait(HASH, ImmutableIntList.copyOf(keys),
factory);
}
- public static DistributionFunctionFactory noOpFunction() {
+ public static DestinationFunctionFactory noOpFunction() {
return NO_OP_FACTORY;
}
- public static DistributionFunctionFactory singleTargetFunction() {
- return noOpFunction(); // TODO
+ public static DestinationFunctionFactory singleTargetFunction() {
+ return (t, k) -> {
+ List<ClusterNode> nodes = t.location.nodes();
+
+ return r -> nodes;
+ };
+ }
+
+ public static DestinationFunctionFactory allTargetsFunction() {
+ return (t, k) -> {
+ List<ClusterNode> nodes = t.location.nodes();
+
+ return r -> nodes;
+ };
}
- public static DistributionFunctionFactory allTargetsFunction() {
- return noOpFunction(); // TODO
+ public static DestinationFunctionFactory randomTargetFunction() {
+ return (t, k) -> {
+ List<ClusterNode> nodes = t.location.nodes();
+
+ return r ->
Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())));
+ };
}
- public static DistributionFunctionFactory randomTargetFunction() {
- return noOpFunction(); // TODO
+ public static DestinationFunctionFactory hashFunction() {
+ return HASH_FACTORY;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index fc4a6eb..ea3ecc3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -17,8 +17,12 @@
package org.apache.ignite.internal.processors.query.calcite.util;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.IdentityHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -39,12 +43,16 @@ import
org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
/**
*
*/
public final class Commons {
+ private static final int[] EMPTY = new int[0];
+
private Commons(){}
public static Context convert(QueryContext ctx) {
@@ -129,4 +137,89 @@ public final class Commons {
return transform;
};
}
+
+ public static int[] intersect(int[] left, int[] right) {
+ if (F.isEmpty(left) || F.isEmpty(right))
+ return EMPTY;
+
+ int[] res = null;
+
+ int i = 0, j = 0, k = 0, size = Math.min(left.length, right.length);
+
+ while (i < left.length && j < right.length) {
+ if (left[i] < right[j])
+ i++;
+ else if (right[j] < left[i])
+ j++;
+ else {
+ if (res == null)
+ res = new int[size];
+
+ res[k++] = left[i];
+
+ i++;
+ j++;
+ }
+ }
+
+ if (k == 0)
+ return EMPTY;
+
+ return res.length == k ? res : Arrays.copyOf(res, k);
+ }
+
+ public static <T> List<T> intersect(List<T> left, List<T> right) {
+ if (F.isEmpty(left) || F.isEmpty(right))
+ return Collections.emptyList();
+
+ HashSet<T> set = new HashSet<>(right);
+
+ return
left.stream().filter(set::contains).collect(Collectors.toList());
+ }
+
+ public static <T> List<T> union(List<T> left, List<T> right) {
+ Set<T> set = U.newHashSet(left.size() + right.size());
+
+ set.addAll(left);
+ set.addAll(right);
+
+ return new ArrayList<>(set);
+ }
+
+ public static int[] union(int[] left, int[] right) {
+ if (F.isEmpty(left) && F.isEmpty(right))
+ return EMPTY;
+
+ int min = Math.min(left.length, right.length);
+ int max = left.length + right.length;
+ int expected = Math.max(min, (int) (max * 1.5));
+
+ int[] res = new int[U.ceilPow2(expected)];
+
+ int i = 0, j = 0, k = 0;
+
+ while (i < left.length && j < right.length) {
+ res = ensureSize(res, k + 1);
+
+ if (left[i] < right[j])
+ res[k++] = left[i++];
+ else if (right[j] < left[i])
+ res[k++] = right[j++];
+ else {
+ res[k++] = left[i];
+
+ i++;
+ j++;
+ }
+ }
+
+ if (k == 0)
+ return EMPTY;
+
+ return res.length == k ? res : Arrays.copyOf(res, k);
+ }
+
+ private static int[] ensureSize(int[] array, int size) {
+ return size < array.length ? array : Arrays.copyOf(array,
U.ceilPow2(size));
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Edge.java
similarity index 55%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Edge.java
index 8bc129e..58f7146 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Edge.java
@@ -14,14 +14,33 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.util;
-import org.apache.calcite.util.ImmutableIntList;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import org.apache.calcite.rel.RelNode;
/**
*
*/
-public interface DistributionFunctionFactory {
- DistributionFunction create(SourceDistribution targetDistr,
ImmutableIntList keys);
+public class Edge {
+ private final RelNode parent;
+ private final RelNode child;
+ private final int childIdx;
+
+ public Edge(RelNode parent, RelNode child, int childIdx) {
+ this.parent = parent;
+ this.child = child;
+ this.childIdx = childIdx;
+ }
+
+ public RelNode parent() {
+ return parent;
+ }
+
+ public RelNode child() {
+ return child;
+ }
+
+ public int childIdx() {
+ return childIdx;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
index 63a5c2b..94109e9 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
@@ -20,14 +20,14 @@ package
org.apache.ignite.internal.processors.query.calcite.util;
import java.lang.reflect.Method;
import org.apache.calcite.linq4j.tree.Types;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.DistributionTraitMetadata;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.SourceDistributionMetadata;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentLocationMetadata;
/**
*
*/
public enum IgniteMethod {
DISTRIBUTION_TRAIT(DistributionTraitMetadata.class,
"getDistributionTrait"),
- SOURCE_DISTRIBUTION(SourceDistributionMetadata.class,
"getSourceDistribution");
+ FRAGMENT_LOCATION(FragmentLocationMetadata.class, "getLocation");
private final Method method;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
new file mode 100644
index 0000000..a236774
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.util;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+
+/**
+ *
+ */
+public class IgniteRelShuttle extends RelShuttleImpl {
+ public RelNode visit(IgniteExchange rel) {
+ return visitChild(rel, 0, rel.getInput());
+ }
+
+ public RelNode visit(IgniteFilter rel) {
+ return visitChild(rel, 0, rel.getInput());
+ }
+
+ public RelNode visit(IgniteProject rel) {
+ return visitChild(rel, 0, rel.getInput());
+ }
+
+ public RelNode visit(Receiver rel) {
+ return visitChild(rel, 0, rel.getInput());
+ }
+
+ public RelNode visit(Sender rel) {
+ return visitChild(rel, 0, rel.getInput());
+ }
+
+ public RelNode visit(IgniteTableScan rel) {
+ return rel;
+ }
+
+ public RelNode visit(IgniteHashJoin rel) {
+ return visitChildren(rel);
+ }
+
+ @Override public RelNode visit(RelNode rel) {
+ if (rel instanceof IgniteExchange)
+ return visit((IgniteExchange)rel);
+ if (rel instanceof IgniteFilter)
+ return visit((IgniteFilter)rel);
+ if (rel instanceof IgniteProject)
+ return visit((IgniteProject)rel);
+ if (rel instanceof Receiver)
+ return visit((Receiver)rel);
+ if (rel instanceof Sender)
+ return visit((Sender)rel);
+ if (rel instanceof IgniteTableScan)
+ return visit((IgniteTableScan)rel);
+ if (rel instanceof IgniteHashJoin)
+ return visit((IgniteHashJoin)rel);
+
+ return visitOther(rel);
+ }
+
+ protected RelNode visitOther(RelNode rel) {
+ return super.visit(rel);
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 8806868..3138c6c 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -18,6 +18,10 @@
package org.apache.ignite.internal.processors.query.calcite;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
@@ -28,7 +32,11 @@ import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
+import org.apache.ignite.internal.processors.query.calcite.metadata.Location;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
import
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
@@ -37,16 +45,17 @@ import
org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry;
import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
/**
@@ -58,12 +67,8 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
private static CalciteQueryProcessor proc;
private static SchemaPlus schema;
- private static PartitionsDistribution developerDistribution;
- private static PartitionsDistribution projectDistribution;
- private static PartitionsDistribution randomDistribution;
- private static PartitionsDistribution singleDistribution;
-
- private static PartitionsDistributionRegistry registry;
+ private static TestRegistry registry;
+ private static List<ClusterNode> nodes;
@BeforeClass
public static void setupClass() {
@@ -109,47 +114,13 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
schema.add("PUBLIC", publicSchema);
- developerDistribution = new PartitionsDistribution();
-
- developerDistribution.parts = 5;
- developerDistribution.nodes = new int[]{0,1,2};
- developerDistribution.nodeParts = new int[][]{{1,2},{3,4},{5}};
-
- projectDistribution = new PartitionsDistribution();
-
- projectDistribution.excessive = true;
- projectDistribution.parts = 5;
- projectDistribution.nodes = new int[]{0,1,2};
- projectDistribution.nodeParts = new int[][]{{1,2,3,5},{2,3,4},{1,4,5}};
-
- randomDistribution = new PartitionsDistribution();
- randomDistribution.parts = 3;
- randomDistribution.nodes = new int[]{0,1,2};
- randomDistribution.nodeParts = new int[][]{{1},{2},{3}};
-
- singleDistribution = new PartitionsDistribution();
- singleDistribution.parts = 1;
- singleDistribution.nodes = new int[]{0};
- singleDistribution.nodeParts = new int[][]{{1}};
+ nodes = new ArrayList<>(4);
- registry = new PartitionsDistributionRegistry() {
- @Override public PartitionsDistribution distributed(int cacheId,
AffinityTopologyVersion topVer) {
- if (cacheId == CU.cacheId("Developer"))
- return developerDistribution;
- if (cacheId == CU.cacheId("Project"))
- return projectDistribution;
-
- throw new AssertionError("Unexpected cache id:" + cacheId);
- }
-
- @Override public PartitionsDistribution
random(AffinityTopologyVersion topVer) {
- return randomDistribution;
- }
+ for (int i = 0; i < 4; i++) {
+ nodes.add(new GridTestNode(UUID.randomUUID()));
+ }
- @Override public PartitionsDistribution single() {
- return singleDistribution;
- }
- };
+ registry = new TestRegistry();
}
@Test
@@ -405,7 +376,72 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
}
@Test
- public void testSplitterCollocated() throws Exception {
+ public void testSplitterCollocatedPartitionedPartitioned() throws
Exception {
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.id = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+ assertNotNull(ctx);
+
+ RelTraitDef[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE
+ };
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+ assertNotNull(planner);
+
+ Query query = ctx.unwrap(Query.class);
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(query.sql());
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerType.HEP,
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteRel.LOGICAL_CONVENTION)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL,
rel, desired);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot);
+
+ QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
+
+ assertNotNull(plan);
+
+ assertTrue(plan.fragments().size() == 2);
+ }
+
+ @Test
+ @Ignore("Need to request broadcast trait as a variant for left inner join
sub-tree.")
+ public void testSplitterCollocatedReplicatedReplicated() throws Exception {
String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
"FROM PUBLIC.Developer d JOIN (" +
"SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
@@ -413,6 +449,111 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
"ON d.id = p.id0 " +
"WHERE (d.projectId + 1) > ?";
+ TestRegistry registry = new TestRegistry(){
+ @Override public DistributionTrait distribution(int cacheId,
RowType rowType) {
+ return IgniteDistributions.broadcast();
+ }
+
+ @Override public Location distributed(int cacheId,
AffinityTopologyVersion topVer) {
+ if (cacheId == CU.cacheId("Developer"))
+ return new Location(select(nodes, 0,1,2), null,
Location.HAS_REPLICATED_CACHES);
+ if (cacheId == CU.cacheId("Project"))
+ return new Location(select(nodes, 0,1,2), null,
Location.HAS_REPLICATED_CACHES);
+
+ throw new AssertionError("Unexpected cache id:" + cacheId);
+ }
+ };
+
+
+ Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+ assertNotNull(ctx);
+
+ RelTraitDef[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE
+ };
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+ assertNotNull(planner);
+
+ Query query = ctx.unwrap(Query.class);
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(query.sql());
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerType.HEP,
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteRel.LOGICAL_CONVENTION)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL,
rel, desired);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot);
+
+ QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
+
+ assertNotNull(plan);
+
+ assertTrue(plan.fragments().size() == 2);
+ }
+
+ @Test
+ @Ignore("Need to request broadcast trait as a variant for left inner join
sub-tree.")
+ public void testSplitterCollocatedReplicatedAndPartitioned() throws
Exception {
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.id = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ TestRegistry registry = new TestRegistry(){
+ @Override public DistributionTrait distribution(int cacheId,
RowType rowType) {
+ if (cacheId == CU.cacheId("Project"))
+ return IgniteDistributions.broadcast();
+
+ return IgniteDistributions.hash(rowType.distributionKeys(),
IgniteDistributions.hashFunction());
+ }
+
+ @Override public Location distributed(int cacheId,
AffinityTopologyVersion topVer) {
+ if (cacheId == CU.cacheId("Developer"))
+ return new Location(null, Arrays.asList(
+ select(nodes, 0,1),
+ select(nodes, 1,2),
+ select(nodes, 2,0),
+ select(nodes, 0,1),
+ select(nodes, 1,2)
+ ), Location.HAS_PARTITIONED_CACHES);
+ if (cacheId == CU.cacheId("Project"))
+ return new Location(select(nodes, 0,1), null,
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+
+ throw new AssertionError("Unexpected cache id:" + cacheId);
+ }
+ };
+
Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
assertNotNull(ctx);
@@ -595,4 +736,228 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
assertTrue(plan.fragments().size() == 4);
}
+
+ @Test
+ @Ignore("Need to request broadcast trait as a variant for left inner join
sub-tree.")
+ public void testSplitterPartiallyReplicated1() throws Exception {
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.id = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+
+ TestRegistry registry = new TestRegistry(){
+ @Override public DistributionTrait distribution(int cacheId,
RowType rowType) {
+ if (cacheId == CU.cacheId("Project"))
+ return IgniteDistributions.broadcast();
+
+ return IgniteDistributions.hash(rowType.distributionKeys(),
IgniteDistributions.hashFunction());
+ }
+
+ @Override public Location distributed(int cacheId,
AffinityTopologyVersion topVer) {
+ if (cacheId == CU.cacheId("Developer"))
+ return new Location(null, Arrays.asList(
+ select(nodes, 0,1),
+ select(nodes, 1,2),
+ select(nodes, 2,0),
+ select(nodes, 0,1),
+ select(nodes, 1,2)
+ ), Location.HAS_PARTITIONED_CACHES);
+ if (cacheId == CU.cacheId("Project"))
+ return new Location(select(nodes, 0,1), null,
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+
+ throw new AssertionError("Unexpected cache id:" + cacheId);
+ }
+ };
+
+ Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+ assertNotNull(ctx);
+
+ RelTraitDef[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE
+ };
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+ assertNotNull(planner);
+
+ Query query = ctx.unwrap(Query.class);
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(query.sql());
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerType.HEP,
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteRel.LOGICAL_CONVENTION)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL,
rel, desired);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot);
+
+ QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
+
+ assertNotNull(plan);
+
+ assertTrue(plan.fragments().size() == 2);
+ }
+
+ @Test
+ @Ignore("Need to request broadcast trait as a variant for left inner join
sub-tree.")
+ public void testSplitterPartiallyReplicated2() throws Exception {
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.id = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+
+ TestRegistry registry = new TestRegistry(){
+ @Override public DistributionTrait distribution(int cacheId,
RowType rowType) {
+ if (cacheId == CU.cacheId("Project"))
+ return IgniteDistributions.broadcast();
+
+ return IgniteDistributions.hash(rowType.distributionKeys(),
IgniteDistributions.hashFunction());
+ }
+
+ @Override public Location distributed(int cacheId,
AffinityTopologyVersion topVer) {
+ if (cacheId == CU.cacheId("Developer"))
+ return new Location(null, Arrays.asList(
+ select(nodes, 0,1),
+ select(nodes, 2),
+ select(nodes, 2,0),
+ select(nodes, 0,1),
+ select(nodes, 1,2)
+ ), Location.HAS_PARTITIONED_CACHES);
+ if (cacheId == CU.cacheId("Project"))
+ return new Location(select(nodes, 0,1), null,
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+
+ throw new AssertionError("Unexpected cache id:" + cacheId);
+ }
+ };
+ Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+ assertNotNull(ctx);
+
+ RelTraitDef[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE
+ };
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+ assertNotNull(planner);
+
+ Query query = ctx.unwrap(Query.class);
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(query.sql());
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerType.HEP,
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteRel.LOGICAL_CONVENTION)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL,
rel, desired);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot);
+
+ QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
+
+ assertNotNull(plan);
+
+ assertTrue(plan.fragments().size() == 3);
+ }
+
+ private static <T> List<T> select(List<T> src, int... idxs) {
+ ArrayList<T> res = new ArrayList<>(idxs.length);
+
+ for (int idx : idxs) {
+ res.add(src.get(idx));
+ }
+
+ return res;
+ }
+
+ private static class TestRegistry implements LocationRegistry,
DistributionRegistry {
+ @Override public Location random(AffinityTopologyVersion topVer) {
+ return new Location(select(nodes, 0,1,2,3), null, (byte) 0);
+ }
+
+ @Override public Location single(AffinityTopologyVersion topVer) {
+ return new Location(select(nodes, 0), null, (byte) 0);
+ }
+
+ @Override public DistributionTrait distribution(int cacheId, RowType
rowType) {
+ return IgniteDistributions.hash(rowType.distributionKeys(),
IgniteDistributions.hashFunction());
+ }
+
+ @Override public Location distributed(int cacheId,
AffinityTopologyVersion topVer) {
+ if (cacheId == CU.cacheId("Developer"))
+ return new Location(null, Arrays.asList(
+ select(nodes, 0,1),
+ select(nodes, 1,2),
+ select(nodes, 2,0),
+ select(nodes, 0,1),
+ select(nodes, 1,2)
+ ), Location.HAS_PARTITIONED_CACHES);
+ if (cacheId == CU.cacheId("Project"))
+ return new Location(null, Arrays.asList(
+ select(nodes, 0,1),
+ select(nodes, 1,2),
+ select(nodes, 2,0),
+ select(nodes, 0,1),
+ select(nodes, 1,2)
+ ), Location.HAS_PARTITIONED_CACHES);
+
+ throw new AssertionError("Unexpected cache id:" + cacheId);
+ }
+ }
}
\ No newline at end of file