Repository: ignite Updated Branches: refs/heads/ignite-1232 d51d8b207 -> 8ab0608cf
ignite-1232 Added tests. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ab0608c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ab0608c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ab0608c Branch: refs/heads/ignite-1232 Commit: 8ab0608cff49db5c603f80f12d543026b429ea7d Parents: d51d8b2 Author: sboikov <[email protected]> Authored: Thu Jun 30 19:43:58 2016 +0300 Committer: sboikov <[email protected]> Committed: Thu Jun 30 19:43:58 2016 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 1 - .../messages/GridQueryNextPageResponse.java | 12 +- .../junits/common/GridCommonAbstractTest.java | 16 + .../processors/query/h2/IgniteH2Indexing.java | 31 +- .../query/h2/opt/GridH2RowFactory.java | 8 + .../query/h2/opt/GridH2TreeIndex.java | 28 +- .../processors/query/h2/sql/GridSqlTable.java | 10 +- .../h2/twostep/GridReduceQueryExecutor.java | 8 +- .../query/h2/twostep/msg/GridH2Boolean.java | 1 + .../query/h2/twostep/msg/GridH2RowRange.java | 8 + .../IgniteCacheDistributedJoinQueryTest.java | 517 +++++++++++++++++++ .../cache/IgniteCacheJoinNoIndexTest.java | 248 +++++++++ ...teCacheJoinPartitionedAndReplicatedTest.java | 316 ++++++++++++ .../query/IgniteSqlSplitterSelfTest.java | 25 + 14 files changed, 1188 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 6b25649..8b0465f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -90,7 +90,6 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL; http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java index b220291..087d5e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -55,6 +56,7 @@ public class GridQueryNextPageResponse implements Message { /** */ @GridDirectCollection(Message.class) + @GridToStringInclude private Collection<Message> vals; /** */ @@ -144,11 +146,6 @@ public class GridQueryNextPageResponse implements Message { } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridQueryNextPageResponse.class, this); - } - - /** {@inheritDoc} */ @Override public void onAckReceived() { // No-op. } @@ -304,4 +301,9 @@ public class GridQueryNextPageResponse implements Message { public void retry(AffinityTopologyVersion retry) { this.retry = retry; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridQueryNextPageResponse.class, this); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 6913539..6b60a4b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.cache.Cache; import javax.cache.CacheException; @@ -1169,4 +1170,19 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { } } } + + /** + * @param aff Affinity. + * @param key Counter. + * @param node Target node. + * @return Key. + */ + protected final Integer keyForNode(Affinity<Object> aff, AtomicInteger key, ClusterNode node) { + while (true) { + Integer next = key.getAndIncrement(); + + if (aff.mapKeyToNode(next).equals(node)) + return next; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index ed47d70..dc90df7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1096,12 +1096,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { final boolean enforceJoinOrder = qry.isEnforceJoinOrder(); final boolean distributedJoins = qry.isDistributedJoins() && isPartitioned(cctx); - final boolean groupByCollocated = qry.isCollocated(); + final boolean grpByCollocated = qry.isCollocated(); GridCacheTwoStepQuery twoStepQry; List<GridQueryFieldMetadata> meta; - final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, groupByCollocated, + final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, grpByCollocated, distributedJoins, enforceJoinOrder); TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey); @@ -1152,7 +1152,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { try { bindParameters(stmt, F.asList(qry.getArgs())); - twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), groupByCollocated, + twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), grpByCollocated, distributedJoins); // Setup spaces from schemas. @@ -1961,7 +1961,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { private final String sql; /** */ - private final boolean groupByCollocated; + private final boolean grpByCollocated; /** */ private final boolean distributedJoins; @@ -1972,15 +1972,18 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** * @param space Space. * @param sql Sql. - * @param groupByCollocated Collocated GROUP BY. + * @param grpByCollocated Collocated GROUP BY. * @param distributedJoins Distributed joins enabled. * @param enforceJoinOrder Enforce join order of tables. */ - private TwoStepCachedQueryKey(String space, String sql, boolean groupByCollocated, boolean distributedJoins, + private TwoStepCachedQueryKey(String space, + String sql, + boolean grpByCollocated, + boolean distributedJoins, boolean enforceJoinOrder) { this.space = space; this.sql = sql; - this.groupByCollocated = groupByCollocated; + this.grpByCollocated = grpByCollocated; this.distributedJoins = distributedJoins; this.enforceJoinOrder = enforceJoinOrder; } @@ -1995,7 +1998,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { TwoStepCachedQueryKey that = (TwoStepCachedQueryKey)o; - if (groupByCollocated != that.groupByCollocated) + if (grpByCollocated != that.grpByCollocated) return false; if (distributedJoins != that.distributedJoins) @@ -2012,13 +2015,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @Override public int hashCode() { - int result = space != null ? space.hashCode() : 0; - result = 31 * result + sql.hashCode(); - result = 31 * result + (groupByCollocated ? 1 : 0); - result = 31 * result + (distributedJoins ? 1 : 0); - result = 31 * result + (enforceJoinOrder ? 1 : 0); + int res = space != null ? space.hashCode() : 0; + res = 31 * res + sql.hashCode(); + res = 31 * res + (grpByCollocated ? 1 : 0); + res = 31 * res + (distributedJoins ? 1 : 0); + res = 31 * res + (enforceJoinOrder ? 1 : 0); - return result; + return res; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java index 148fab8..00ff3f2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.query.h2.opt; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; import org.h2.result.RowFactory; import org.h2.value.Value; @@ -144,6 +146,7 @@ public class GridH2RowFactory extends RowFactory { */ private static final class RowSimple extends GridH2Row { /** */ + @GridToStringInclude private Value[] vals; /** @@ -167,5 +170,10 @@ public class GridH2RowFactory extends RowFactory { @Override public void setValue(int idx, Value v) { vals[idx] = v; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RowSimple.class, this); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java index 31e9408..00606b2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java @@ -115,7 +115,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS private final boolean snapshotEnabled; /** */ - private final CIX2<ClusterNode,Message> locNodeHandler = new CIX2<ClusterNode,Message>() { + private final CIX2<ClusterNode,Message> locNodeHnd = new CIX2<ClusterNode,Message>() { @Override public void applyx(ClusterNode clusterNode, Message msg) throws IgniteCheckedException { onMessage0(clusterNode.id(), msg); } @@ -238,7 +238,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS * @param msg Message. */ private void send(Collection<ClusterNode> nodes, Message msg) { - if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, null, locNodeHandler, + if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, null, locNodeHnd, GridIoPolicy.IDX_POOL, false)) throw new GridH2RetryException("Failed to send message to nodes: " + nodes + "."); } @@ -296,7 +296,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS if (msg.bounds() != null) { // This is the first request containing all the search rows. - ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> snapshot0 = qctx.getSnapshot(idxId); + ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> snapshot0 = qctx.getSnapshot(idxId); assert !msg.bounds().isEmpty() : "empty bounds"; @@ -785,7 +785,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS * @return Collection of nodes for broadcasting. */ private List<ClusterNode> broadcastNodes(GridH2QueryContext qctx, GridCacheContext<?,?> cctx) { - Map<UUID,int[]> partMap = qctx.partitionsMap(); + Map<UUID, int[]> partMap = qctx.partitionsMap(); List<ClusterNode> res; @@ -1140,7 +1140,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS final GridCacheContext<?,?> cctx; /** */ - final boolean unicast; + final boolean ucast; /** */ final int affColId; @@ -1168,12 +1168,12 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS /** * @param cctx Cache Cache context. - * @param unicast Unicast or broadcast query. + * @param ucast Unicast or broadcast query. * @param affColId Affinity column ID. */ - private DistributedLookupBatch(GridCacheContext<?,?> cctx, boolean unicast, int affColId) { + private DistributedLookupBatch(GridCacheContext<?,?> cctx, boolean ucast, int affColId) { this.cctx = cctx; - this.unicast = unicast; + this.ucast = ucast; this.affColId = affColId; } @@ -1212,7 +1212,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS try { pkAffKeyFirst = ctx.affinity().affinityKey(cctx.name(), pkFirst.getObject()); - pkAffKeyLast = ctx.affinity().affinityKey(cctx.name(), pkFirst.getObject()); + pkAffKeyLast = ctx.affinity().affinityKey(cctx.name(), pkLast.getObject()); } catch (IgniteCheckedException e) { throw new CacheException(e); @@ -1228,6 +1228,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS } /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") @Override public boolean addSearchRows(SearchRow firstRow, SearchRow lastRow) { if (findCalled) { findCalled = false; @@ -1313,6 +1314,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS return batchFull; } + /** + * + */ private void startStreams() { if (rangeStreams.isEmpty()) { assert res.isEmpty(); @@ -1338,8 +1342,8 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS } /** {@inheritDoc} */ - @Override public void reset(boolean beforeQuery) { - if (beforeQuery) { + @Override public void reset(boolean beforeQry) { + if (beforeQry) { qctx = GridH2QueryContext.get(); batchLookupId = qctx.nextBatchLookupId(); rangeStreams = new HashMap<>(); @@ -1355,7 +1359,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS /** {@inheritDoc} */ @Override public String getPlanSQL() { - return unicast ? "unicast" : "broadcast"; + return ucast ? "unicast" : "broadcast"; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java index 59f14cf..49c679d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java @@ -37,7 +37,7 @@ public class GridSqlTable extends GridSqlElement { private final GridH2Table tbl; /** */ - private boolean affKeyCondition; + private boolean affKeyCond; /** * @param schema Schema. @@ -72,17 +72,17 @@ public class GridSqlTable extends GridSqlElement { } /** - * @param affKeyCondition If affinity key condition is found. + * @param affKeyCond If affinity key condition is found. */ - public void affinityKeyCondition(boolean affKeyCondition) { - this.affKeyCondition = affKeyCondition; + public void affinityKeyCondition(boolean affKeyCond) { + this.affKeyCond = affKeyCond; } /** * @return {@code true} If affinity key condition is found. */ public boolean affinityKeyCondition() { - return affKeyCondition; + return affKeyCond; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 530f3ad..da029c7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -480,7 +480,7 @@ public class GridReduceQueryExecutor { if (!F.isEmpty(m)) { for (Map.Entry<IgniteProductVersion,Collection<ClusterNode>> entry : m.entrySet()) { if (entry.getKey().compareTo(DISTRIBUTED_JOIN_SINCE) >= 0) - break; + break; for (ClusterNode node : entry.getValue()) { if (!node.isClient() && !node.isDaemon()) @@ -533,7 +533,7 @@ public class GridReduceQueryExecutor { Collection<ClusterNode> nodes; // Explicit partition mapping for unstable topology. - Map<ClusterNode,IntArray> partsMap = null; + Map<ClusterNode, IntArray> partsMap = null; if (isPreloadingActive(cctx, extraSpaces)) { if (cctx.isReplicated()) @@ -1192,7 +1192,7 @@ public class GridReduceQueryExecutor { * @param m Map. * @return Converted map. */ - private static Map<UUID,int[]> convert(Map<ClusterNode,IntArray> m) { + private static Map<UUID,int[]> convert(Map<ClusterNode, IntArray> m) { if (m == null) return null; @@ -1209,7 +1209,7 @@ public class GridReduceQueryExecutor { * @param qry Query. * @param explain Explain. * @return Table. - * @throws IgniteCheckedException + * @throws IgniteCheckedException If failed. */ private GridMergeTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java index 5681a66..edd404e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java @@ -107,6 +107,7 @@ public class GridH2Boolean extends GridH2ValueMessage { return -5; } + /** {@inheritDoc} */ @Override public byte fieldsCount() { return 1; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java index 374e4b8..6ebc11d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; import java.util.List; import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -37,6 +39,7 @@ public class GridH2RowRange implements Message { /** */ @GridDirectCollection(Message.class) + @GridToStringInclude private List<GridH2RowMessage> rows; /** */ @@ -170,4 +173,9 @@ public class GridH2RowRange implements Message { @Override public void onAckReceived() { // No-op. } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridH2RowRange.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java new file mode 100644 index 0000000..700a2c4 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheDistributedJoinQueryTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String PERSON_CACHE = "person"; + + /** */ + private static final String ORG_CACHE = "org"; + + /** */ + private boolean client; + + /** */ + private int total; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = ((TcpDiscoverySpi) cfg.getDiscoverySpi()); + + spi.setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(2); + + client = true; + + startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery1() throws Exception { + Ignite client = grid(2); + + try { + CacheConfiguration ccfg1 = + cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false))); + CacheConfiguration ccfg2 = + cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false))); + + IgniteCache<Object, Object> pCache = client.createCache(ccfg1); + IgniteCache<Object, Object> orgCache = client.createCache(ccfg2); + + List<Integer> orgIds = putData1(); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key", pCache, total); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and o._key=" + orgIds.get(3), pCache, 3); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and o._key > " + orgIds.get(2), pCache, total - 3); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and o._key > " + orgIds.get(1) + " and o._key < " + orgIds.get(4), pCache, 5); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.name = o.name", pCache, total); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.name = o.name and o._key=" + orgIds.get(0), pCache, 0); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.name = o.name and o._key=" + orgIds.get(3), pCache, 3); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.name = o.name and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.name = o.name and o.name='obj-" + orgIds.get(3) + "'", pCache, 3); + } + finally { + client.destroyCache(PERSON_CACHE); + client.destroyCache(ORG_CACHE); + } + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery2() throws Exception { + Ignite client = grid(2); + + try { + CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, true))); + CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false))); + + IgniteCache<Object, Object> pCache = client.createCache(ccfg1); + IgniteCache<Object, Object> orgCache = client.createCache(ccfg2); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + AtomicInteger orgKey = new AtomicInteger(); + AtomicInteger pKey = new AtomicInteger(); + + List<Integer> pIds = new ArrayList<>(); + + for (int i = 0; i < 3; i++) { + Integer orgId = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId, new Organization("org-" + orgId)); + + Integer pId = keyForNode(aff, pKey, node1); + + pCache.put(pId, new Person(orgId, "p-" + orgId)); + + pIds.add(pId); + } + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and p._key >= 0", pCache, 3); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and p._key=" + pIds.get(0), pCache, 1); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and p._key in (" + pIds.get(0) + ", " + pIds.get(1) + ")", pCache, 2); + } + finally { + client.destroyCache(PERSON_CACHE); + client.destroyCache(ORG_CACHE); + } + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery3() throws Exception { + Ignite client = grid(2); + + try { + CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false))); + CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false))); + + IgniteCache<Object, Object> pCache = client.createCache(ccfg1); + IgniteCache<Object, Object> orgCache = client.createCache(ccfg2); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + AtomicInteger orgKey = new AtomicInteger(); + AtomicInteger pKey = new AtomicInteger(); + + List<Integer> pIds = new ArrayList<>(); + + for (int i = 0; i < 3; i++) { + Integer orgId = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId, new Organization("org-" + orgId)); + + Integer pId = keyForNode(aff, pKey, node1); + + pCache.put(pId, new Person(orgId + 100_000, "p-" + orgId)); + + pIds.add(pId); + } + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId != o._key", pCache, 9); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId != o._key and p._key=" + pIds.get(0), pCache, 3); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId != o._key and p._key in (" + pIds.get(0) + ", " + pIds.get(1) + ")", pCache, 6); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId != o._key and p._key >=" + pIds.get(0) + "and p._key <= " + pIds.get(2), pCache, 9); + } + finally { + client.destroyCache(PERSON_CACHE); + client.destroyCache(ORG_CACHE); + } + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery4() throws Exception { + Ignite client = grid(2); + + try { + CacheConfiguration ccfg1 = + cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false))); + + IgniteCache<Object, Object> pCache = client.createCache(ccfg1); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + AtomicInteger pKey = new AtomicInteger(); + + Integer pId0 = keyForNode(aff, pKey, node0); + + pCache.put(pId0, new Person(0, "p0")); + + for (int i = 0; i < 3; i++) { + Integer pId = keyForNode(aff, pKey, node1); + + pCache.put(pId, new Person(0, "p")); + } + + checkQuery("select p1._key, p1.name, p2._key, p2.name " + + "from Person p1, Person p2 " + + "where p2._key > p1._key", pCache, 6); + + checkQuery("select p1._key, p1.name, p2._key, p2.name " + + "from Person p1, Person p2 " + + "where p2._key > p1._key and p1._key=" + pId0, pCache, 3); + + checkQuery("select p1._key, p1.name, p2._key, p2.name " + + "from Person p1, Person p2 " + + "where p2._key > p1._key and p1.name='p0'", pCache, 3); + + checkQuery("select p1._key, p1.name, p2._key, p2.name " + + "from Person p1, Person p2 " + + "where p1.name != p2.name", pCache, 6); + + checkQuery("select p1._key, p1.name, p2._key, p2.name " + + "from Person p1, Person p2 " + + "where p1.name > p2.name", pCache, 3); + } + finally { + client.destroyCache(PERSON_CACHE); + client.destroyCache(ORG_CACHE); + } + } + + /** + * @param sql SQL. + * @param cache Cache. + * @param expSize Expected results size. + * @param args Arguments. + */ + private void checkQuery(String sql, IgniteCache<Object, Object> cache, int expSize, Object... args) { + log.info("Execute query: " + sql); + + checkQuery(sql, cache, false, expSize, args); + + checkQuery(sql, cache, true, expSize, args); + } + + /** + * @param sql SQL. + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @param expSize Expected results size. + * @param args Arguments. + */ + private void checkQuery(String sql, + IgniteCache<Object, Object> cache, + boolean enforceJoinOrder, + int expSize, + Object... args) { + String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql) + .setDistributedJoins(true) + .setEnforceJoinOrder(enforceJoinOrder)) + .getAll().get(0).get(0); + + log.info("Plan: " + plan); + + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setDistributedJoins(true); + qry.setEnforceJoinOrder(enforceJoinOrder); + qry.setArgs(args); + + QueryCursor<List<?>> cur = cache.query(qry); + + List<List<?>> res = cur.getAll(); + + if (expSize != res.size()) + log.info("Results: " + res); + + assertEquals(expSize, res.size()); + } + + /** + * @param idxName Name index flag. + * @param idxOrgId Org ID index flag. + * @return Entity. + */ + private QueryEntity personEntity(boolean idxName, boolean idxOrgId) { + QueryEntity entity = new QueryEntity(); + + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Person.class.getName()); + + entity.addQueryField("orgId", Integer.class.getName(), null); + entity.addQueryField("name", String.class.getName(), null); + + List<QueryIndex> idxs = new ArrayList<>(); + + if (idxName) { + QueryIndex idx = new QueryIndex("name"); + + idxs.add(idx); + } + + if (idxOrgId) { + QueryIndex idx = new QueryIndex("orgId"); + + idxs.add(idx); + } + + entity.setIndexes(idxs); + + return entity; + } + + /** + * @param idxName Name index flag. + * @return Entity. + */ + private QueryEntity organizationEntity(boolean idxName) { + QueryEntity entity = new QueryEntity(); + + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Organization.class.getName()); + + entity.addQueryField("name", String.class.getName(), null); + + if (idxName) { + QueryIndex idx = new QueryIndex("name"); + + entity.setIndexes(F.asList(idx)); + } + + return entity; + } + + /** + * @return Organization ids. + */ + private List<Integer> putData1() { + total = 0; + + Ignite client = grid(2); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); + IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE); + + AtomicInteger pKey = new AtomicInteger(); + AtomicInteger orgKey = new AtomicInteger(); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + List<Integer> data = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + int orgId = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId, new Organization("obj-" + orgId)); + + for (int j = 0; j < i; j++) { + personCache.put(keyForNode(aff, pKey, node1), new Person(orgId, "obj-" + orgId)); + + total++; + } + + data.add(orgId); + } + + return data; + } + + /** + * @param name Cache name. + * @return Configuration. + */ + private CacheConfiguration cacheConfiguration(String name) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setBackups(0); + + return ccfg; + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + int orgId; + + /** */ + String name; + + /** + * @param orgId Organization ID. + * @param name Name. + */ + public Person(int orgId, String name) { + this.orgId = orgId; + this.name = name; + } + } + + /** + * + */ + private static class Organization implements Serializable { + /** */ + String name; + + /** + * @param name Name. + */ + public Organization(String name) { + this.name = name; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java new file mode 100644 index 0000000..e8b363f --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheJoinNoIndexTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String PERSON_CACHE = "person"; + + /** */ + private static final String ORG_CACHE = "org"; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi()); + + spi.setIpFinder(IP_FINDER); + + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + { + CacheConfiguration ccfg = configuration(PERSON_CACHE); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Person.class.getName()); + entity.addQueryField("orgId", Integer.class.getName(), null); + entity.addQueryField("orgName", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + { + CacheConfiguration ccfg = configuration(ORG_CACHE); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Organization.class.getName()); + entity.addQueryField("name", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration configuration(String name) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setBackups(0); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(2); + + client = true; + + startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testJoin() throws Exception { + Ignite client = grid(2); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); + IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE); + + AtomicInteger pKey = new AtomicInteger(100_000); + AtomicInteger orgKey = new AtomicInteger(); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + for (int i = 0; i < 3; i++) { + int orgId = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId, new Organization("org-" + i)); + + for (int j = 0; j < i; j++) + personCache.put(keyForNode(aff, pKey, node1), new Person(orgId, "org-" + i)); + } + + checkQuery("select o.name, p._key, p.orgName " + + "from \"org\".Organization o, \"person\".Person p " + + "where p.orgName = o.name", personCache, false, 3); + } + + /** + * @param sql SQL. + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @param expSize Expected results size. + * @param args Arguments. + */ + private void checkQuery(String sql, + IgniteCache<Object, Object> cache, + boolean enforceJoinOrder, + int expSize, + Object... args) { + String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql) + .setDistributedJoins(true) + .setEnforceJoinOrder(enforceJoinOrder)) + .getAll().get(0).get(0); + + log.info("Plan: " + plan); + + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setDistributedJoins(true); + qry.setEnforceJoinOrder(enforceJoinOrder); + qry.setArgs(args); + + QueryCursor<List<?>> cur = cache.query(qry); + + List<List<?>> res = cur.getAll(); + + if (expSize != res.size()) + log.info("Results: " + res); + + assertEquals(expSize, res.size()); + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + int orgId; + + /** */ + String orgName; + + /** + * @param orgId Organization ID. + * @param orgName Organization name. + */ + public Person(int orgId, String orgName) { + this.orgId = orgId; + this.orgName = orgName; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + } + + /** + * + */ + private static class Organization implements Serializable { + /** */ + String name; + + /** + * @param name Name. + */ + public Organization(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Organization.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java new file mode 100644 index 0000000..4dda30a --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.TempJoinTest; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String PERSON_CACHE = "person"; + + /** */ + private static final String ORG_CACHE = "org"; + + /** */ + private static final String ACCOUNT_CACHE = "acc"; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi()); + + spi.setIpFinder(IP_FINDER); + + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + { + CacheConfiguration ccfg = configuration(PERSON_CACHE); + + // One cache is replicated. + ccfg.setCacheMode(REPLICATED); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Person.class.getName()); + entity.addQueryField("orgId", Integer.class.getName(), null); + entity.addQueryField("name", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + { + CacheConfiguration ccfg = configuration(ORG_CACHE); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Organization.class.getName()); + entity.addQueryField("name", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + { + CacheConfiguration ccfg = configuration(ACCOUNT_CACHE); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Account.class.getName()); + entity.addQueryField("orgId", Integer.class.getName(), null); + entity.addQueryField("personId", Integer.class.getName(), null); + entity.addQueryField("name", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration configuration(String name) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setBackups(1); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(2); + + client = true; + + startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testJoin() throws Exception { + Ignite client = grid(2); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); + IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE); + IgniteCache<Object, Object> accCache = client.cache(ACCOUNT_CACHE); + + AtomicInteger pKey = new AtomicInteger(100_000); + AtomicInteger orgKey = new AtomicInteger(); + AtomicInteger accKey = new AtomicInteger(); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + /** + * One organization, two persons, two accounts. + */ + + int orgId1 = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId1, new Organization("obj-" + orgId1)); + + int pid = keyForNode(aff, pKey, node0); + personCache.put(pid, new Person(orgId1, "o1-p1")); + + accCache.put(keyForNode(aff, accKey, node0), new Account(pid, orgId1)); + accCache.put(keyForNode(aff, accKey, node1), new Account(pid, orgId1)); + + checkQuery("select o.name, p._key, p.name, a.name " + + "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " + + "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2); + + checkQuery("select o.name, p._key, p.name, a.name " + + "from \"org\".Organization o, \"acc\".Account a, \"person\".Person p " + + "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2); + + checkQuery("select o.name, p._key, p.name, a.name " + + "from \"person\".Person p, \"org\".Organization o, \"acc\".Account a " + + "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2); + } + + /** + * @param sql SQL. + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @param expSize Expected results size. + * @param args Arguments. + */ + private void checkQuery(String sql, + IgniteCache<Object, Object> cache, + boolean enforceJoinOrder, + int expSize, + Object... args) { + String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql) + .setDistributedJoins(true) + .setEnforceJoinOrder(enforceJoinOrder)) + .getAll().get(0).get(0); + + log.info("Plan: " + plan); + + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setDistributedJoins(true); + qry.setEnforceJoinOrder(enforceJoinOrder); + qry.setArgs(args); + + QueryCursor<List<?>> cur = cache.query(qry); + + List<List<?>> res = cur.getAll(); + + if (expSize != res.size()) + log.info("Results: " + res); + + assertEquals(expSize, res.size()); + } + + /** + * + */ + private static class Account implements Serializable { + /** */ + int personId; + + /** */ + int orgId; + + /** */ + String name; + + /** + * @param personId Person ID. + * @param orgId Organization ID. + */ + public Account(int personId, int orgId) { + this.personId = personId; + this.orgId = orgId; + name = "acc-" + personId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Account.class, this); + } + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + int orgId; + + /** */ + String name; + + /** + * @param orgId Organization ID. + * @param name Name. + */ + public Person(int orgId, String name) { + this.orgId = orgId; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + } + + /** + * + */ + private static class Organization implements Serializable { + /** */ + String name; + + /** + * @param name Name. + */ + public Organization(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Organization.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ab0608c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java index ce4d24b..6587631 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java @@ -475,9 +475,11 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { * Test value. */ private static class GroupIndexTestValue implements Serializable { + /** */ @QuerySqlField(orderedGroups = @QuerySqlField.Group(name = "grpIdx", order = 0)) private int a; + /** */ @QuerySqlField(orderedGroups = @QuerySqlField.Group(name = "grpIdx", order = 1)) private int b; @@ -491,36 +493,59 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { } } + /** + * + */ private static class Person2 implements Serializable { + /** */ @QuerySqlField int orgId; + /** */ @QuerySqlField String name; } + /** + * + */ private static class Organization implements Serializable { + /** */ @QuerySqlField String name; } + /** + * + */ private static class User implements Serializable { + /** */ @QuerySqlField private int id; } + /** + * + */ private static class UserOrder implements Serializable { + /** */ @QuerySqlField private int id; + /** */ @QuerySqlField private int userId; } + /** + * + */ private static class OrderGood implements Serializable { + /** */ @QuerySqlField private int orderId; + /** */ @QuerySqlField private int goodId; }
