http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java index f12c0f3..babced3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java @@ -27,13 +27,16 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; +import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode; +import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel; +import org.apache.ignite.internal.processors.query.h2.opt.join.SourceKey; import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; +import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; /** @@ -44,10 +47,10 @@ public class GridH2QueryContext { private static final ThreadLocal<GridH2QueryContext> qctx = new ThreadLocal<>(); /** */ - private static final ConcurrentMap<Key, GridH2QueryContext> qctxs = new ConcurrentHashMap<>(); + private static final ConcurrentMap<QueryContextKey, GridH2QueryContext> qctxs = new ConcurrentHashMap<>(); /** */ - private final Key key; + private final QueryContextKey key; /** */ private volatile boolean cleared; @@ -83,7 +86,7 @@ public class GridH2QueryContext { private int pageSize; /** */ - private GridH2CollocationModel qryCollocationMdl; + private CollocationModel qryCollocationMdl; /** */ private MvccSnapshot mvccSnapshot; @@ -100,7 +103,7 @@ public class GridH2QueryContext { public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) { assert type != MAP; - key = new Key(locNodeId, nodeId, qryId, 0, type); + key = new QueryContextKey(locNodeId, nodeId, qryId, 0, type); } /** @@ -117,7 +120,7 @@ public class GridH2QueryContext { GridH2QueryType type) { assert segmentId == 0 || type == MAP; - key = new Key(locNodeId, nodeId, qryId, segmentId, type); + key = new QueryContextKey(locNodeId, nodeId, qryId, segmentId, type); } /** @@ -141,34 +144,34 @@ public class GridH2QueryContext { * @return Type. */ public GridH2QueryType type() { - return key.type; + return key.type(); } /** * @return Origin node ID. */ public UUID originNodeId() { - return key.nodeId; + return key.nodeId(); } /** * @return Query request ID. */ public long queryId() { - return key.qryId; + return key.queryId(); } /** * @return Query collocation model. */ - public GridH2CollocationModel queryCollocationModel() { + public CollocationModel queryCollocationModel() { return qryCollocationMdl; } /** * @param qryCollocationMdl Query collocation model. */ - public void queryCollocationModel(GridH2CollocationModel qryCollocationMdl) { + public void queryCollocationModel(CollocationModel qryCollocationMdl) { this.qryCollocationMdl = qryCollocationMdl; } @@ -268,7 +271,7 @@ public class GridH2QueryContext { /** @return index segment ID. */ public int segment() { - return key.segmentId; + return key.segmentId(); } /** @@ -351,7 +354,7 @@ public class GridH2QueryContext { assert qctx.get() == null; // We need MAP query context to be available to other threads to run distributed joins. - if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null) + if (x.key.type() == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null) throw new IllegalStateException("Query context is already set."); qctx.set(x); @@ -378,8 +381,12 @@ public class GridH2QueryContext { public static boolean clear(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) { boolean res = false; - for (Key key : qctxs.keySet()) { - if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId) && key.qryId == qryId && key.type == type) + for (QueryContextKey key : qctxs.keySet()) { + if (key.localNodeId().equals(locNodeId) && + key.nodeId().equals(nodeId) && + key.queryId() == qryId && + key.type() == type + ) res |= doClear(key, false); } @@ -391,8 +398,8 @@ public class GridH2QueryContext { * @param nodeStop Node is stopping. * @return {@code True} if context was found. */ - private static boolean doClear(Key key, boolean nodeStop) { - assert key.type == MAP : key.type; + private static boolean doClear(QueryContextKey key, boolean nodeStop) { + assert key.type() == MAP : key.type(); GridH2QueryContext x = qctxs.remove(key); @@ -436,8 +443,8 @@ public class GridH2QueryContext { * @param nodeId Dead node ID. */ public static void clearAfterDeadNode(UUID locNodeId, UUID nodeId) { - for (Key key : qctxs.keySet()) { - if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId)) + for (QueryContextKey key : qctxs.keySet()) { + if (key.localNodeId().equals(locNodeId) && key.nodeId().equals(nodeId)) doClear(key, false); } } @@ -446,8 +453,8 @@ public class GridH2QueryContext { * @param locNodeId Local node ID. */ public static void clearLocalNodeStop(UUID locNodeId) { - for (Key key : qctxs.keySet()) { - if (key.locNodeId.equals(locNodeId)) + for (QueryContextKey key : qctxs.keySet()) { + if (key.localNodeId().equals(locNodeId)) doClear(key, true); } } @@ -478,7 +485,7 @@ public class GridH2QueryContext { int segmentId, GridH2QueryType type ) { - return qctxs.get(new Key(locNodeId, nodeId, qryId, segmentId, type)); + return qctxs.get(new QueryContextKey(locNodeId, nodeId, qryId, segmentId, type)); } /** @@ -537,116 +544,4 @@ public class GridH2QueryContext { return S.toString(GridH2QueryContext.class, this); } - /** - * Unique key for the query context. - */ - private static class Key { - /** */ - private final UUID locNodeId; - - /** */ - private final UUID nodeId; - - /** */ - private final long qryId; - - /** */ - private final int segmentId; - - /** */ - private final GridH2QueryType type; - - /** - * @param locNodeId Local node ID. - * @param nodeId The node who initiated the query. - * @param qryId The query ID. - * @param segmentId Index segment ID. - * @param type Query type. - */ - private Key(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) { - assert locNodeId != null; - assert nodeId != null; - assert type != null; - - this.locNodeId = locNodeId; - this.nodeId = nodeId; - this.qryId = qryId; - this.segmentId = segmentId; - this.type = type; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - Key key = (Key)o; - - return qryId == key.qryId && nodeId.equals(key.nodeId) && type == key.type && - locNodeId.equals(key.locNodeId) ; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = locNodeId.hashCode(); - - res = 31 * res + nodeId.hashCode(); - res = 31 * res + (int)(qryId ^ (qryId >>> 32)); - res = 31 * res + type.hashCode(); - res = 31 * res + segmentId; - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(Key.class, this); - } - } - - /** - * Key for source. - */ - private static final class SourceKey { - /** */ - UUID ownerId; - - /** */ - int segmentId; - - /** */ - int batchLookupId; - - /** - * @param ownerId Owner node ID. - * @param segmentId Index segment ID. - * @param batchLookupId Batch lookup ID. - */ - SourceKey(UUID ownerId, int segmentId, int batchLookupId) { - this.ownerId = ownerId; - this.segmentId = segmentId; - this.batchLookupId = batchLookupId; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (o == null || !(o instanceof SourceKey)) - return false; - - SourceKey srcKey = (SourceKey)o; - - return batchLookupId == srcKey.batchLookupId && segmentId == srcKey.segmentId && - ownerId.equals(srcKey.ownerId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int hash = ownerId.hashCode(); - hash = 31 * hash + segmentId; - return 31 * hash + batchLookupId; - } - } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextKey.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextKey.java new file mode 100644 index 0000000..ad0ff20 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextKey.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt; + +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.UUID; + +/** + * Unique key for the query context. + */ +public class QueryContextKey { + /** */ + private final UUID locNodeId; + + /** */ + private final UUID nodeId; + + /** */ + private final long qryId; + + /** */ + private final int segmentId; + + /** */ + private final GridH2QueryType type; + + /** + * @param locNodeId Local node ID. + * @param nodeId The node who initiated the query. + * @param qryId The query ID. + * @param segmentId Index segment ID. + * @param type Query type. + */ + QueryContextKey(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) { + assert locNodeId != null; + assert nodeId != null; + assert type != null; + + this.locNodeId = locNodeId; + this.nodeId = nodeId; + this.qryId = qryId; + this.segmentId = segmentId; + this.type = type; + } + + /** + * @return Local node ID. + */ + public UUID localNodeId() { + return locNodeId; + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Query ID. + */ + public long queryId() { + return qryId; + } + + /** + * @return Segment ID. + */ + public int segmentId() { + return segmentId; + } + + /** + * @return Type. + */ + public GridH2QueryType type() { + return type; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryContextKey key = (QueryContextKey)o; + + return qryId == key.qryId && nodeId.equals(key.nodeId) && type == key.type && + locNodeId.equals(key.locNodeId) ; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = locNodeId.hashCode(); + + res = 31 * res + nodeId.hashCode(); + res = 31 * res + (int)(qryId ^ (qryId >>> 32)); + res = 31 * res + type.hashCode(); + res = 31 * res + segmentId; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryContextKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java new file mode 100644 index 0000000..632d72a --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt.join; + +import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; +import org.h2.index.Cursor; +import org.h2.result.Row; +import org.h2.result.SearchRow; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map; + +/** + * Merge cursor from multiple nodes. + */ +public class BroadcastCursor implements Cursor, Comparator<RangeStream> { + /** Index. */ + private final GridH2IndexBase idx; + + /** */ + private final int rangeId; + + /** */ + private final RangeStream[] streams; + + /** */ + private boolean first = true; + + /** */ + private int off; + + /** + * @param rangeId Range ID. + * @param segmentKeys Remote nodes. + * @param rangeStreams Range streams. + */ + public BroadcastCursor(GridH2IndexBase idx, int rangeId, Collection<SegmentKey> segmentKeys, + Map<SegmentKey, RangeStream> rangeStreams) { + this.idx = idx; + this.rangeId = rangeId; + + streams = new RangeStream[segmentKeys.size()]; + + int i = 0; + + for (SegmentKey segmentKey : segmentKeys) { + RangeStream stream = rangeStreams.get(segmentKey); + + assert stream != null; + + streams[i++] = stream; + } + } + + /** {@inheritDoc} */ + @Override public int compare(RangeStream o1, RangeStream o2) { + if (o1 == o2) + return 0; + + // Nulls are at the beginning of array. + if (o1 == null) + return -1; + + if (o2 == null) + return 1; + + return idx.compareRows(o1.get(rangeId), o2.get(rangeId)); + } + + /** + * Try to fetch the first row. + * + * @return {@code true} If we were able to find at least one row. + */ + private boolean goFirst() { + // Fetch first row from all the streams and sort them. + for (int i = 0; i < streams.length; i++) { + if (!streams[i].next(rangeId)) { + streams[i] = null; + off++; // After sorting this offset will cut off all null elements at the beginning of array. + } + } + + if (off == streams.length) + return false; + + Arrays.sort(streams, this); + + return true; + } + + /** + * Fetch next row. + * + * @return {@code true} If we were able to find at least one row. + */ + private boolean goNext() { + assert off != streams.length; + + if (!streams[off].next(rangeId)) { + // Next row from current min stream was not found -> nullify that stream and bump offset forward. + streams[off] = null; + + return ++off != streams.length; + } + + // Bubble up current min stream with respect to fetched row to achieve correct sort order of streams. + GridH2IndexBase.bubbleUp(streams, off, this); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean next() { + if (first) { + first = false; + + return goFirst(); + } + + return goNext(); + } + + /** {@inheritDoc} */ + @Override public Row get() { + return streams[off].get(rangeId); + } + + /** {@inheritDoc} */ + @Override public SearchRow getSearchRow() { + return get(); + } + + /** {@inheritDoc} */ + @Override public boolean previous() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java new file mode 100644 index 0000000..03653c7 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java @@ -0,0 +1,841 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt.join; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import javax.cache.CacheException; + +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.SB; +import org.h2.command.dml.Query; +import org.h2.command.dml.Select; +import org.h2.command.dml.SelectUnion; +import org.h2.expression.Comparison; +import org.h2.expression.Expression; +import org.h2.expression.ExpressionColumn; +import org.h2.index.IndexCondition; +import org.h2.index.ViewIndex; +import org.h2.table.Column; +import org.h2.table.IndexColumn; +import org.h2.table.SubQueryInfo; +import org.h2.table.Table; +import org.h2.table.TableFilter; +import org.h2.table.TableView; + +/** + * Collocation model for a query. + */ +public final class CollocationModel { + /** */ + public static final int MULTIPLIER_COLLOCATED = 1; + + /** */ + private static final int MULTIPLIER_UNICAST = 50; + + /** */ + private static final int MULTIPLIER_BROADCAST = 200; + + /** */ + private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000; + + /** */ + private final CollocationModel upper; + + /** */ + private final int filter; + + /** */ + private final boolean view; + + /** */ + private int multiplier; + + /** */ + private Type type; + + /** */ + private CollocationModel[] children; + + /** */ + private TableFilter[] childFilters; + + /** */ + private List<CollocationModel> unions; + + /** */ + private Select select; + + /** */ + private final boolean validate; + + /** + * @param upper Upper. + * @param filter Filter. + * @param view This model will be a subquery (or top level query) and must contain child filters. + * @param validate Query validation flag. + */ + private CollocationModel(CollocationModel upper, int filter, boolean view, boolean validate) { + this.upper = upper; + this.filter = filter; + this.view = view; + this.validate = validate; + } + + /** + * @return Table filter for this collocation model. + */ + private TableFilter filter() { + return upper == null ? null : upper.childFilters[filter]; + } + + /** {@inheritDoc} */ + @Override public String toString() { + calculate(); + + SB b = new SB(); + + for (int lvl = 0; lvl < 20; lvl++) { + if (!toString(b, lvl)) + break; + + b.a('\n'); + } + + return b.toString(); + } + + /** + * @param b String builder. + * @param lvl Depth level. + */ + private boolean toString(SB b, int lvl) { + boolean res = false; + + if (lvl == 0) { + TableFilter f = filter(); + String tblAlias = f == null ? "^" : f.getTableAlias(); + + b.a("[tbl=").a(tblAlias).a(", type=").a(type).a(", mul=").a(multiplier).a("]"); + + res = true; + } + else if (childFilters != null) { + assert lvl > 0; + + lvl--; + + for (int i = 0; i < childFilters.length; i++) { + if (lvl == 0) + b.a(" | "); + + res |= child(i, true).toString(b, lvl); + } + + if (lvl == 0) + b.a(" | "); + } + + return res; + } + + /** + * @param upper Upper. + * @param filter Filter. + * @param unions Unions. + * @param view This model will be a subquery (or top level query) and must contain child filters. + * @param validate Query validation flag. + * @return Created child collocation model. + */ + private static CollocationModel createChildModel(CollocationModel upper, + int filter, + List<CollocationModel> unions, + boolean view, + boolean validate) { + CollocationModel child = new CollocationModel(upper, filter, view, validate); + + if (unions != null) { + // Bind created child to unions. + assert upper == null || upper.child(filter, false) != null || unions.isEmpty(); + + if (upper != null && unions.isEmpty()) { + assert upper.child(filter, false) == null; + + upper.children[filter] = child; + } + + unions.add(child); + + child.unions = unions; + } + else if (upper != null) { + // Bind created child to upper model. + assert upper.child(filter, false) == null; + + upper.children[filter] = child; + } + + return child; + } + + /** + * @param childFilters New child filters. + * @return {@code true} If child filters were updated. + */ + private boolean childFilters(TableFilter[] childFilters) { + assert childFilters != null; + assert view; + + Select select = childFilters[0].getSelect(); + + assert this.select == null || this.select == select; + + if (this.select == null) { + this.select = select; + + assert this.childFilters == null; + } + else if (Arrays.equals(this.childFilters, childFilters)) + return false; + + if (this.childFilters == null) { + // We have to clone because H2 reuses array and reorders elements. + this.childFilters = childFilters.clone(); + + children = new CollocationModel[childFilters.length]; + } + else { + assert this.childFilters.length == childFilters.length; + + // We have to copy because H2 reuses array and reorders elements. + System.arraycopy(childFilters, 0, this.childFilters, 0, childFilters.length); + + Arrays.fill(children, null); + } + + // Reset results. + type = null; + multiplier = 0; + + return true; + } + + /** + * Do the needed calculations. + */ + @SuppressWarnings("ConstantConditions") + private void calculate() { + if (type != null) + return; + + if (view) { // We are at (sub-)query model. + assert childFilters != null; + + boolean collocated = true; + boolean partitioned = false; + int maxMultiplier = MULTIPLIER_COLLOCATED; + + for (int i = 0; i < childFilters.length; i++) { + CollocationModel child = child(i, true); + + Type t = child.type(true); + + if (child.multiplier == MULTIPLIER_REPLICATED_NOT_LAST) + maxMultiplier = child.multiplier; + + if (t.isPartitioned()) { + partitioned = true; + + if (!t.isCollocated()) { + collocated = false; + + int m = child.multiplier(true); + + if (m > maxMultiplier) { + maxMultiplier = m; + + if (maxMultiplier == MULTIPLIER_REPLICATED_NOT_LAST) + break; + } + } + } + } + + type = Type.of(partitioned, collocated); + multiplier = maxMultiplier; + } + else { + assert upper != null; + assert childFilters == null; + + // We are at table instance. + Table tbl = filter().getTable(); + + // Only partitioned tables will do distributed joins. + if (!(tbl instanceof GridH2Table) || !((GridH2Table)tbl).isPartitioned()) { + type = Type.REPLICATED; + multiplier = MULTIPLIER_COLLOCATED; + + return; + } + + // If we are the first partitioned table in a join, then we are "base" for all the rest partitioned tables + // which will need to get remote result (if there is no affinity condition). Since this query is broadcasted + // to all the affinity nodes the "base" does not need to get remote results. + if (!upper.findPartitionedTableBefore(filter)) { + type = Type.PARTITIONED_COLLOCATED; + multiplier = MULTIPLIER_COLLOCATED; + } + else { + // It is enough to make sure that our previous join by affinity key is collocated, then we are + // collocated. If we at least have affinity key condition, then we do unicast which is cheaper. + switch (upper.joinedWithCollocated(filter)) { + case COLLOCATED_JOIN: + type = Type.PARTITIONED_COLLOCATED; + multiplier = MULTIPLIER_COLLOCATED; + + break; + + case HAS_AFFINITY_CONDITION: + type = Type.PARTITIONED_NOT_COLLOCATED; + multiplier = MULTIPLIER_UNICAST; + + break; + + case NONE: + type = Type.PARTITIONED_NOT_COLLOCATED; + multiplier = MULTIPLIER_BROADCAST; + + break; + + default: + throw new IllegalStateException(); + } + } + + if (upper.previousReplicated(filter)) + multiplier = MULTIPLIER_REPLICATED_NOT_LAST; + } + } + + /** + * @param f Current filter. + * @return {@code true} If partitioned table was found. + */ + private boolean findPartitionedTableBefore(int f) { + for (int i = 0; i < f; i++) { + CollocationModel child = child(i, true); + + // The c can be null if it is not a GridH2Table and not a sub-query, + // it is a some kind of function table or anything else that considered replicated. + if (child != null && child.type(true).isPartitioned()) + return true; + } + + // We have to search globally in upper queries as well. + return upper != null && upper.findPartitionedTableBefore(filter); + } + + /** + * @param f Current filter. + * @return {@code true} If previous table is REPLICATED. + */ + @SuppressWarnings("SimplifiableIfStatement") + private boolean previousReplicated(int f) { + if (f > 0 && child(f - 1, true).type(true) == Type.REPLICATED) + return true; + + return upper != null && upper.previousReplicated(filter); + } + + /** + * @param f Filter. + * @return Affinity join type. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private Affinity joinedWithCollocated(int f) { + TableFilter tf = childFilters[f]; + + GridH2Table tbl = (GridH2Table)tf.getTable(); + + if (validate) { + if (tbl.isCustomAffinityMapper()) + throw customAffinityError(tbl.cacheName()); + + if (F.isEmpty(tf.getIndexConditions())) { + throw new CacheException("Failed to prepare distributed join query: " + + "join condition does not use index [joinedCache=" + tbl.cacheName() + + ", plan=" + tf.getSelect().getPlanSQL() + ']'); + } + } + + IndexColumn affCol = tbl.getAffinityKeyColumn(); + + boolean affKeyCondFound = false; + + if (affCol != null) { + ArrayList<IndexCondition> idxConditions = tf.getIndexConditions(); + + int affColId = affCol.column.getColumnId(); + + for (int i = 0; i < idxConditions.size(); i++) { + IndexCondition c = idxConditions.get(i); + int colId = c.getColumn().getColumnId(); + int cmpType = c.getCompareType(); + + if ((cmpType == Comparison.EQUAL || cmpType == Comparison.EQUAL_NULL_SAFE) && + (colId == affColId || tbl.rowDescriptor().isKeyColumn(colId)) && c.isEvaluatable()) { + affKeyCondFound = true; + + Expression exp = c.getExpression(); + exp = exp.getNonAliasExpression(); + + if (exp instanceof ExpressionColumn) { + ExpressionColumn expCol = (ExpressionColumn)exp; + + // This is one of our previous joins. + TableFilter prevJoin = expCol.getTableFilter(); + + if (prevJoin != null) { + CollocationModel cm = child(indexOf(prevJoin), true); + + // If the previous joined model is a subquery (view), we can not be sure that + // the found affinity column is the needed one, since we can select multiple + // different affinity columns from different tables. + if (cm != null && !cm.view) { + Type t = cm.type(true); + + if (t.isPartitioned() && t.isCollocated() && isAffinityColumn(prevJoin, expCol, validate)) + return Affinity.COLLOCATED_JOIN; + } + } + } + } + } + } + + return affKeyCondFound ? Affinity.HAS_AFFINITY_CONDITION : Affinity.NONE; + } + + /** + * @param f Table filter. + * @return Index. + */ + private int indexOf(TableFilter f) { + for (int i = 0; i < childFilters.length; i++) { + if (childFilters[i] == f) + return i; + } + + throw new IllegalStateException(); + } + + /** + * @param f Table filter. + * @param expCol Expression column. + * @param validate Query validation flag. + * @return {@code true} It it is an affinity column. + */ + private static boolean isAffinityColumn(TableFilter f, ExpressionColumn expCol, boolean validate) { + Column col = expCol.getColumn(); + + if (col == null) + return false; + + Table t = col.getTable(); + + if (t.isView()) { + Query qry; + + if (f.getIndex() != null) + qry = getSubQuery(f); + else + qry = GridSqlQueryParser.VIEW_QUERY.get((TableView)t); + + return isAffinityColumn(qry, expCol, validate); + } + + if (t instanceof GridH2Table) { + GridH2Table t0 = (GridH2Table)t; + + if (validate && t0.isCustomAffinityMapper()) + throw customAffinityError((t0).cacheName()); + + IndexColumn affCol = t0.getAffinityKeyColumn(); + + return affCol != null && col.getColumnId() == affCol.column.getColumnId(); + } + + return false; + } + + /** + * @param qry Query. + * @param expCol Expression column. + * @param validate Query validation flag. + * @return {@code true} It it is an affinity column. + */ + private static boolean isAffinityColumn(Query qry, ExpressionColumn expCol, boolean validate) { + if (qry.isUnion()) { + SelectUnion union = (SelectUnion)qry; + + return isAffinityColumn(union.getLeft(), expCol, validate) && isAffinityColumn(union.getRight(), expCol, validate); + } + + Expression exp = qry.getExpressions().get(expCol.getColumn().getColumnId()).getNonAliasExpression(); + + if (exp instanceof ExpressionColumn) { + expCol = (ExpressionColumn)exp; + + return isAffinityColumn(expCol.getTableFilter(), expCol, validate); + } + + return false; + } + + /** + * @return Multiplier. + */ + public int calculateMultiplier() { + // We don't need multiplier for union here because it will be summarized in H2. + return multiplier(false); + } + + /** + * @param withUnion With respect to union. + * @return Multiplier. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private int multiplier(boolean withUnion) { + calculate(); + + assert multiplier != 0; + + if (withUnion && unions != null) { + int maxMultiplier = 0; + + for (int i = 0; i < unions.size(); i++) { + int m = unions.get(i).multiplier(false); + + if (m > maxMultiplier) + maxMultiplier = m; + } + + return maxMultiplier; + } + + return multiplier; + } + + /** + * @param withUnion With respect to union. + * @return Type. + */ + private Type type(boolean withUnion) { + calculate(); + + assert type != null; + + if (withUnion && unions != null) { + Type left = unions.get(0).type(false); + + for (int i = 1; i < unions.size(); i++) { + Type right = unions.get(i).type(false); + + if (!left.isCollocated() || !right.isCollocated()) { + left = Type.PARTITIONED_NOT_COLLOCATED; + + break; + } + else if (!left.isPartitioned() && !right.isPartitioned()) + left = Type.REPLICATED; + else + left = Type.PARTITIONED_COLLOCATED; + } + + return left; + } + + return type; + } + + /** + * @param i Index. + * @param create Create child if needed. + * @return Child collocation. + */ + private CollocationModel child(int i, boolean create) { + CollocationModel child = children[i]; + + if (child == null && create) { + TableFilter f = childFilters[i]; + + if (f.getTable().isView()) { + if (f.getIndex() == null) { + // If we don't have view index yet, then we just creating empty model and it must be filled later. + child = createChildModel(this, i, null, true, validate); + } + else + child = buildCollocationModel(this, i, getSubQuery(f), null, validate); + } + else + child = createChildModel(this, i, null, false, validate); + + assert child != null; + assert children[i] == child; + } + + return child; + } + + /** + * @param f Table filter. + * @return Sub-query. + */ + private static Query getSubQuery(TableFilter f) { + return ((ViewIndex)f.getIndex()).getQuery(); + } + + /** + * @return Unions list. + */ + private List<CollocationModel> getOrCreateUnions() { + if (unions == null) { + unions = new ArrayList<>(4); + + unions.add(this); + } + + return unions; + } + + /** + * @param qctx Query context. + * @param info Sub-query info. + * @param filters Filters. + * @param filter Filter. + * @param validate Query validation flag. + * @return Collocation. + */ + public static CollocationModel buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info, + TableFilter[] filters, int filter, boolean validate) { + CollocationModel cm; + + if (info != null) { + // Go up until we reach the root query. + cm = buildCollocationModel(qctx, info.getUpper(), info.getFilters(), info.getFilter(), validate); + } + else { + // We are at the root query. + cm = qctx.queryCollocationModel(); + + if (cm == null) { + cm = createChildModel(null, -1, null, true, validate); + + qctx.queryCollocationModel(cm); + } + } + + if (filters == null) + return cm; + + assert cm.view; + + Select select = filters[0].getSelect(); + + // Handle union. We have to rely on fact that select will be the same on uppermost select. + // For sub-queries we will drop collocation models, so that they will be recalculated anyways. + if (cm.select != null && cm.select != select) { + List<CollocationModel> unions = cm.getOrCreateUnions(); + + // Try to find this select in existing unions. + // Start with 1 because at 0 it always will be c. + for (int i = 1; i < unions.size(); i++) { + CollocationModel u = unions.get(i); + + if (u.select == select) { + cm = u; + + break; + } + } + + // Nothing was found, need to create new child in union. + if (cm.select != select) + cm = createChildModel(cm.upper, cm.filter, unions, true, validate); + } + + cm.childFilters(filters); + + return cm.child(filter, true); + } + + /** + * @param qry Query. + * @return {@code true} If the query is collocated. + */ + public static boolean isCollocated(Query qry) { + CollocationModel mdl = buildCollocationModel(null, -1, qry, null, true); + + Type type = mdl.type(true); + + if (!type.isCollocated() && mdl.multiplier == MULTIPLIER_REPLICATED_NOT_LAST) + throw new CacheException("Failed to execute query: for distributed join " + + "all REPLICATED caches must be at the end of the joined tables list."); + + return type.isCollocated(); + } + + /** + * @param upper Upper. + * @param filter Filter. + * @param qry Query. + * @param unions Unions. + * @param validate Query validation flag. + * @return Built model. + */ + private static CollocationModel buildCollocationModel(CollocationModel upper, + int filter, + Query qry, + List<CollocationModel> unions, + boolean validate) { + if (qry.isUnion()) { + if (unions == null) + unions = new ArrayList<>(); + + SelectUnion union = (SelectUnion)qry; + + CollocationModel left = buildCollocationModel(upper, filter, union.getLeft(), unions, validate); + CollocationModel right = buildCollocationModel(upper, filter, union.getRight(), unions, validate); + + assert left != null; + assert right != null; + + return upper != null ? upper : left; + } + + Select select = (Select)qry; + + List<TableFilter> list = new ArrayList<>(); + + for (TableFilter f = select.getTopTableFilter(); f != null; f = f.getJoin()) + list.add(f); + + TableFilter[] filters = list.toArray(new TableFilter[list.size()]); + + CollocationModel cm = createChildModel(upper, filter, unions, true, validate); + + cm.childFilters(filters); + + for (int i = 0; i < filters.length; i++) { + TableFilter f = filters[i]; + + if (f.getTable().isView()) + buildCollocationModel(cm, i, getSubQuery(f), null, validate); + else if (f.getTable() instanceof GridH2Table) + createChildModel(cm, i, null, false, validate); + } + + return upper != null ? upper : cm; + } + + /** + * @param cacheName Cache name. + * @return Error. + */ + private static CacheException customAffinityError(String cacheName) { + return new CacheException("Failed to prepare distributed join query: can not use distributed joins for cache " + + "with custom AffinityKeyMapper configured. " + + "Please use AffinityKeyMapped annotation instead [cache=" + cacheName + ']'); + } + + /** + * Collocation type. + */ + private enum Type { + /** */ + PARTITIONED_COLLOCATED(true, true), + + /** */ + PARTITIONED_NOT_COLLOCATED(true, false), + + /** */ + REPLICATED(false, true); + + /** */ + private final boolean partitioned; + + /** */ + private final boolean collocated; + + /** + * @param partitioned Partitioned. + * @param collocated Collocated. + */ + Type(boolean partitioned, boolean collocated) { + this.partitioned = partitioned; + this.collocated = collocated; + } + + /** + * @return {@code true} If partitioned. + */ + public boolean isPartitioned() { + return partitioned; + } + + /** + * @return {@code true} If collocated. + */ + public boolean isCollocated() { + return collocated; + } + + /** + * @param partitioned Partitioned. + * @param collocated Collocated. + * @return Type. + */ + static Type of(boolean partitioned, boolean collocated) { + if (collocated) + return partitioned ? Type.PARTITIONED_COLLOCATED : Type.REPLICATED; + + assert partitioned; + + return Type.PARTITIONED_NOT_COLLOCATED; + } + } + + /** + * Affinity of a table relative to previous joined tables. + */ + private enum Affinity { + /** */ + NONE, + + /** */ + HAS_AFFINITY_CONDITION, + + /** */ + COLLOCATED_JOIN + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CursorIteratorWrapper.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CursorIteratorWrapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CursorIteratorWrapper.java new file mode 100644 index 0000000..5a174e8 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CursorIteratorWrapper.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt.join; + +import org.apache.ignite.internal.processors.query.h2.H2Cursor; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; + +import java.util.Iterator; + +/** + * + */ +public final class CursorIteratorWrapper implements Iterator<GridH2Row> { + /** */ + private final H2Cursor cursor; + + /** Next element. */ + private GridH2Row next; + + /** + * @param cursor Cursor. + */ + public CursorIteratorWrapper(H2Cursor cursor) { + assert cursor != null; + + this.cursor = cursor; + + if (cursor.next()) + next = (GridH2Row)cursor.get(); + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return next != null; + } + + /** {@inheritDoc} */ + @Override public GridH2Row next() { + GridH2Row res = next; + + if (cursor.next()) + next = (GridH2Row)cursor.get(); + else + next = null; + + return res; + } + + /** {@inheritDoc} */ + @Override public void remove() { + throw new UnsupportedOperationException("operation is not supported"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java new file mode 100644 index 0000000..7c958da --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt.join; + +/** + * Defines set of distributed join modes. + */ +public enum DistributedJoinMode { + /** + * Distributed joins is disabled. Local joins will be performed instead. + */ + OFF, + + /** + * Distributed joins is enabled within local node only. + * + * NOTE: This mode is used with segmented indices for local sql queries. + * As in this case we need to make distributed join across local index segments + * and prevent range-queries to other nodes. + */ + LOCAL_ONLY, + + /** + * Distributed joins is enabled. + */ + ON; + + /** + * @param isLocal Query local flag. + * @param distributedJoins Query distributed joins flag. + * @return DistributedJoinMode for the query. + */ + public static DistributedJoinMode distributedJoinMode(boolean isLocal, boolean distributedJoins) { + return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java new file mode 100644 index 0000000..02d2e44 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt.join; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.query.h2.H2Utils; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.h2.index.Cursor; +import org.h2.index.IndexLookupBatch; +import org.h2.result.SearchRow; +import org.h2.util.DoneFuture; +import org.h2.value.Value; +import org.h2.value.ValueNull; + +import javax.cache.CacheException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Future; + +import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.LOCAL_ONLY; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS; +import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds; + +/** + * Index lookup batch. + */ +public class DistributedLookupBatch implements IndexLookupBatch { + /** Index. */ + private final GridH2IndexBase idx; + + /** */ + private final GridCacheContext<?,?> cctx; + + /** */ + private final boolean ucast; + + /** */ + private final int affColId; + + /** */ + private GridH2QueryContext qctx; + + /** */ + private int batchLookupId; + + /** */ + private Map<SegmentKey, RangeStream> rangeStreams = Collections.emptyMap(); + + /** */ + private List<SegmentKey> broadcastSegments; + + /** */ + private List<Future<Cursor>> res = Collections.emptyList(); + + /** */ + private boolean batchFull; + + /** */ + private boolean findCalled; + + /** + * @param cctx Cache Cache context. + * @param ucast Unicast or broadcast query. + * @param affColId Affinity column ID. + */ + public DistributedLookupBatch(GridH2IndexBase idx, GridCacheContext<?, ?> cctx, boolean ucast, int affColId) { + this.idx = idx; + this.cctx = cctx; + this.ucast = ucast; + this.affColId = affColId; + } + + /** + * @param firstRow First row. + * @param lastRow Last row. + * @return Affinity key or {@code null}. + */ + private Object getAffinityKey(SearchRow firstRow, SearchRow lastRow) { + if (affColId == COL_NOT_EXISTS) + return null; + + if (firstRow == null || lastRow == null) + return null; + + Value affKeyFirst = firstRow.getValue(affColId); + Value affKeyLast = lastRow.getValue(affColId); + + if (affKeyFirst != null && equal(affKeyFirst, affKeyLast)) + return affKeyFirst == ValueNull.INSTANCE ? GridH2IndexBase.EXPLICIT_NULL : affKeyFirst.getObject(); + + if (idx.getTable().rowDescriptor().isKeyColumn(affColId)) + return null; + + // Try to extract affinity key from primary key. + Value pkFirst = firstRow.getValue(KEY_COL); + Value pkLast = lastRow.getValue(KEY_COL); + + if (pkFirst == ValueNull.INSTANCE || pkLast == ValueNull.INSTANCE) + return GridH2IndexBase.EXPLICIT_NULL; + + if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast)) + return null; + + Object pkAffKeyFirst = cctx.affinity().affinityKey(pkFirst.getObject()); + Object pkAffKeyLast = cctx.affinity().affinityKey(pkLast.getObject()); + + if (pkAffKeyFirst == null || pkAffKeyLast == null) + throw new CacheException("Cache key without affinity key."); + + if (pkAffKeyFirst.equals(pkAffKeyLast)) + return pkAffKeyFirst; + + return null; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"}) + @Override public boolean addSearchRows(SearchRow firstRow, SearchRow lastRow) { + if (qctx == null || findCalled) { + if (qctx == null) { + // It is the first call after query begin (may be after reuse), + // reinitialize query context and result. + qctx = GridH2QueryContext.get(); + res = new ArrayList<>(); + + assert qctx != null; + assert !findCalled; + } + else { + // Cleanup after the previous lookup phase. + assert batchLookupId != 0; + + findCalled = false; + qctx.putStreams(batchLookupId, null); + res.clear(); + } + + // Reinitialize for the next lookup phase. + batchLookupId = qctx.nextBatchLookupId(); + rangeStreams = new HashMap<>(); + } + + Object affKey = getAffinityKey(firstRow, lastRow); + + boolean locQry = localQuery(); + + List<SegmentKey> segmentKeys; + + if (affKey != null) { + // Affinity key is provided. + if (affKey == GridH2IndexBase.EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything. + return false; + + segmentKeys = F.asList(rangeSegment(affKey, locQry)); + } + else { + // Affinity key is not provided or is not the same in upper and lower bounds, we have to broadcast. + if (broadcastSegments == null) + broadcastSegments = broadcastSegments(locQry); + + segmentKeys = broadcastSegments; + } + + if (locQry && segmentKeys.isEmpty()) + return false; // Nothing to do + + assert !F.isEmpty(segmentKeys) : segmentKeys; + + final int rangeId = res.size(); + + // Create messages. + GridH2RowMessage first = idx.toSearchRowMessage(firstRow); + GridH2RowMessage last = idx.toSearchRowMessage(lastRow); + + // Range containing upper and lower bounds. + GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last); + + // Add range to every message of every participating node. + for (int i = 0; i < segmentKeys.size(); i++) { + SegmentKey segmentKey = segmentKeys.get(i); + assert segmentKey != null; + + RangeStream stream = rangeStreams.get(segmentKey); + + List<GridH2RowRangeBounds> bounds; + + if (stream == null) { + stream = new RangeStream(cctx.kernalContext(), idx, qctx, segmentKey.node()); + + stream.request(createRequest(qctx, batchLookupId, segmentKey.segmentId())); + stream.request().bounds(bounds = new ArrayList<>()); + + rangeStreams.put(segmentKey, stream); + } + else + bounds = stream.request().bounds(); + + bounds.add(rangeBounds); + + // If at least one node will have a full batch then we are ok. + if (bounds.size() >= qctx.pageSize()) + batchFull = true; + } + + Cursor cur; + + if (segmentKeys.size() == 1) + cur = new UnicastCursor(rangeId, rangeStreams.get(F.first(segmentKeys))); + else + cur = new BroadcastCursor(idx, rangeId, segmentKeys, rangeStreams); + + res.add(new DoneFuture<>(cur)); + + return true; + } + + /** + * @param v1 First value. + * @param v2 Second value. + * @return {@code true} If they equal. + */ + private boolean equal(Value v1, Value v2) { + return v1 == v2 || (v1 != null && v2 != null && + v1.compareTypeSafe(v2, idx.getDatabase().getCompareMode()) == 0); + } + + /** {@inheritDoc} */ + @Override public boolean isBatchFull() { + return batchFull; + } + + /** + * @return {@code True} if local query execution is enforced. + */ + private boolean localQuery() { + assert qctx != null : "Missing query context: " + this; + + return qctx.distributedJoinMode() == LOCAL_ONLY; + } + + /** + * + */ + private void startStreams() { + if (rangeStreams.isEmpty()) { + assert res.isEmpty(); + + return; + } + + qctx.putStreams(batchLookupId, rangeStreams); + + // Start streaming. + for (RangeStream stream : rangeStreams.values()) { + stream.start(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") + @Override public List<Future<Cursor>> find() { + batchFull = false; + findCalled = true; + + startStreams(); + + return res; + } + + /** {@inheritDoc} */ + @Override public void reset(boolean beforeQry) { + if (beforeQry || qctx == null) // Query context can be null if addSearchRows was never called. + return; + + assert batchLookupId != 0; + + // Do cleanup after the query run. + qctx.putStreams(batchLookupId, null); + qctx = null; // The same query can be reused multiple times for different query contexts. + batchLookupId = 0; + + rangeStreams = Collections.emptyMap(); + broadcastSegments = null; + batchFull = false; + findCalled = false; + res = Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public String getPlanSQL() { + return ucast ? "unicast" : "broadcast"; + } + + /** + * @param affKeyObj Affinity key. + * @param isLocalQry Local query flag. + * @return Segment key for Affinity key. + */ + public SegmentKey rangeSegment(Object affKeyObj, boolean isLocalQry) { + assert affKeyObj != null && affKeyObj != GridH2IndexBase.EXPLICIT_NULL : affKeyObj; + + ClusterNode node; + + int partition = cctx.affinity().partition(affKeyObj); + + if (isLocalQry) { + if (qctx.partitionsMap() != null) { + // If we have explicit partitions map, we have to use it to calculate affinity node. + UUID nodeId = qctx.nodeForPartition(partition, cctx); + + if(!cctx.localNodeId().equals(nodeId)) + return null; // Prevent remote index call for local queries. + } + + if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, qctx.topologyVersion())) + return null; + + node = cctx.localNode(); + } + else{ + if (qctx.partitionsMap() != null) { + // If we have explicit partitions map, we have to use it to calculate affinity node. + UUID nodeId = qctx.nodeForPartition(partition, cctx); + + node = cctx.discovery().node(nodeId); + } + else // Get primary node for current topology version. + node = cctx.affinity().primaryByKey(affKeyObj, qctx.topologyVersion()); + + if (node == null) // Node was not found, probably topology changed and we need to retry the whole query. + throw H2Utils.retryException("Failed to get primary node by key for range segment."); + } + + return new SegmentKey(node, idx.segmentForPartition(partition)); + } + + /** + * @param isLocalQry Local query flag. + * @return Collection of nodes for broadcasting. + */ + public List<SegmentKey> broadcastSegments(boolean isLocalQry) { + Map<UUID, int[]> partMap = qctx.partitionsMap(); + + List<ClusterNode> nodes; + + if (isLocalQry) { + if (partMap != null && !partMap.containsKey(cctx.localNodeId())) + return Collections.emptyList(); // Prevent remote index call for local queries. + + nodes = Collections.singletonList(cctx.localNode()); + } + else { + if (partMap == null) + nodes = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion())); + else { + nodes = new ArrayList<>(partMap.size()); + + for (UUID nodeId : partMap.keySet()) { + ClusterNode node = cctx.kernalContext().discovery().node(nodeId); + + if (node == null) + throw H2Utils.retryException("Failed to get node by ID during broadcast [" + + "nodeId=" + nodeId + ']'); + + nodes.add(node); + } + } + + if (F.isEmpty(nodes)) + throw H2Utils.retryException("Failed to collect affinity nodes during broadcast [" + + "cacheName=" + cctx.name() + ']'); + } + + int segmentsCount = idx.segmentsCount(); + + List<SegmentKey> res = new ArrayList<>(nodes.size() * segmentsCount); + + for (ClusterNode node : nodes) { + for (int seg = 0; seg < segmentsCount; seg++) + res.add(new SegmentKey(node, seg)); + } + + return res; + } + + /** + * @param qctx Query context. + * @param batchLookupId Batch lookup ID. + * @param segmentId Segment ID. + * @return Index range request. + */ + public static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId, int segmentId) { + GridH2IndexRangeRequest req = new GridH2IndexRangeRequest(); + + req.originNodeId(qctx.originNodeId()); + req.queryId(qctx.queryId()); + req.originSegmentId(qctx.segment()); + req.segment(segmentId); + req.batchLookupId(batchLookupId); + + return req; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java new file mode 100644 index 0000000..8cce83d --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt.join; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.query.h2.H2Utils; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static java.util.Collections.emptyIterator; + +/** + * Bounds iterator. + */ +public class RangeSource { + /** Index. */ + private final GridH2IndexBase idx; + + /** */ + private Iterator<GridH2RowRangeBounds> boundsIter; + + /** */ + private int curRangeId = -1; + + /** */ + private final int segment; + + /** */ + private final BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter; + + /** Iterator. */ + private Iterator<GridH2Row> iter = emptyIterator(); + + /** + * @param bounds Bounds. + * @param segment Segment. + * @param filter Filter. + */ + public RangeSource( + GridH2IndexBase idx, + Iterable<GridH2RowRangeBounds> bounds, + int segment, + BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter + ) { + this.idx = idx; + this.segment = segment; + this.filter = filter; + + boundsIter = bounds.iterator(); + } + + /** + * @return {@code true} If there are more rows in this source. + */ + public boolean hasMoreRows() throws IgniteCheckedException { + return boundsIter.hasNext() || iter.hasNext(); + } + + /** + * @param maxRows Max allowed rows. + * @return Range. + */ + public GridH2RowRange next(int maxRows) { + assert maxRows > 0 : maxRows; + + for (; ; ) { + if (iter.hasNext()) { + // Here we are getting last rows from previously partially fetched range. + List<GridH2RowMessage> rows = new ArrayList<>(); + + GridH2RowRange nextRange = new GridH2RowRange(); + + nextRange.rangeId(curRangeId); + nextRange.rows(rows); + + do { + rows.add(H2Utils.toRowMessage(iter.next())); + } + while (rows.size() < maxRows && iter.hasNext()); + + if (iter.hasNext()) + nextRange.setPartial(); + else + iter = emptyIterator(); + + return nextRange; + } + + iter = emptyIterator(); + + if (!boundsIter.hasNext()) { + boundsIter = emptyIterator(); + + return null; + } + + GridH2RowRangeBounds bounds = boundsIter.next(); + + curRangeId = bounds.rangeId(); + + iter = idx.findForSegment(bounds, segment, filter); + + if (!iter.hasNext()) { + // We have to return empty range here. + GridH2RowRange emptyRange = new GridH2RowRange(); + + emptyRange.rangeId(curRangeId); + + return emptyRange; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java new file mode 100644 index 0000000..0684089 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt.join; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.query.h2.H2Utils; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.h2.index.Cursor; +import org.h2.result.Row; +import org.h2.value.Value; + +import javax.cache.CacheException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyIterator; +import static java.util.Collections.singletonList; +import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR; +import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND; +import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK; +import static org.h2.result.Row.MEMORY_CALCULATE; + +/** + * Per node range stream. + */ +public class RangeStream { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Index. */ + private final GridH2IndexBase idx; + + /** */ + private final GridH2QueryContext qctx; + + /** */ + private final ClusterNode node; + + /** */ + private GridH2IndexRangeRequest req; + + /** */ + private int remainingRanges; + + /** */ + private final BlockingQueue<GridH2IndexRangeResponse> respQueue = new LinkedBlockingQueue<>(); + + /** */ + private Iterator<GridH2RowRange> ranges = emptyIterator(); + + /** */ + private Cursor cursor = GridH2Cursor.EMPTY; + + /** */ + private int cursorRangeId = -1; + + /** + * @param qctx Query context. + * @param node Node. + */ + public RangeStream(GridKernalContext ctx, GridH2IndexBase idx, GridH2QueryContext qctx, ClusterNode node) { + this.ctx = ctx; + this.idx = idx; + this.node = node; + this.qctx = qctx; + } + + /** + * Start streaming. + */ + public void start() { + remainingRanges = req.bounds().size(); + + assert remainingRanges > 0; + + idx.send(singletonList(node), req); + } + + /** + * @param msg Response. + */ + public void onResponse(GridH2IndexRangeResponse msg) { + respQueue.add(msg); + } + + /** + * @param req Current request. + */ + public void request(GridH2IndexRangeRequest req) { + this.req = req; + } + + /** + * @return Current request. + */ + public GridH2IndexRangeRequest request() { + return req; + } + + /** + * @return Response. + */ + private GridH2IndexRangeResponse awaitForResponse() { + assert remainingRanges > 0; + + final long start = U.currentTimeMillis(); + + for (int attempt = 0;; attempt++) { + if (qctx.isCleared()) + throw H2Utils.retryException("Query is cancelled."); + + if (ctx.isStopping()) + throw H2Utils.retryException("Local node is stopping."); + + GridH2IndexRangeResponse res; + + try { + res = respQueue.poll(500, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ignored) { + throw H2Utils.retryException("Interrupted while waiting for reply."); + } + + if (res != null) { + switch (res.status()) { + case STATUS_OK: + List<GridH2RowRange> ranges0 = res.ranges(); + + remainingRanges -= ranges0.size(); + + if (ranges0.get(ranges0.size() - 1).isPartial()) + remainingRanges++; + + if (remainingRanges > 0) { + if (req.bounds() != null) + req = DistributedLookupBatch.createRequest(qctx, req.batchLookupId(), req.segment()); + + // Prefetch next page. + idx.send(singletonList(node), req); + } + else + req = null; + + return res; + + case STATUS_NOT_FOUND: + if (req == null || req.bounds() == null) // We have already received the first response. + throw H2Utils.retryException("Failure on remote node."); + + if (U.currentTimeMillis() - start > 30_000) + throw H2Utils.retryException("Timeout reached."); + + try { + U.sleep(20 * attempt); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteInterruptedException(e.getMessage()); + } + + // Retry to send the request once more after some time. + idx.send(singletonList(node), req); + + break; + + case STATUS_ERROR: + throw new CacheException(res.error()); + + default: + throw new IllegalStateException(); + } + } + + if (!ctx.discovery().alive(node)) + throw H2Utils.retryException("Node has left topology: " + node.id()); + } + } + + /** + * @param rangeId Requested range ID. + * @return {@code true} If next row for the requested range was found. + */ + public boolean next(final int rangeId) { + for (;;) { + if (rangeId == cursorRangeId) { + if (cursor.next()) + return true; + } + else if (rangeId < cursorRangeId) + return false; + + cursor = GridH2Cursor.EMPTY; + + while (!ranges.hasNext()) { + if (remainingRanges == 0) { + ranges = emptyIterator(); + + return false; + } + + ranges = awaitForResponse().ranges().iterator(); + } + + GridH2RowRange range = ranges.next(); + + cursorRangeId = range.rangeId(); + + if (!F.isEmpty(range.rows())) { + final Iterator<GridH2RowMessage> it = range.rows().iterator(); + + if (it.hasNext()) { + cursor = new GridH2Cursor(new Iterator<Row>() { + @Override public boolean hasNext() { + return it.hasNext(); + } + + @Override public Row next() { + // Lazily convert messages into real rows. + return toRow(it.next()); + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }); + } + } + } + } + + /** + * @param msg Message. + * @return Row. + */ + private Row toRow(GridH2RowMessage msg) { + if (msg == null) + return null; + + List<GridH2ValueMessage> vals = msg.values(); + + assert !F.isEmpty(vals) : vals; + + Value[] vals0 = new Value[vals.size()]; + + for (int i = 0; i < vals0.length; i++) { + try { + vals0[i] = vals.get(i).value(ctx); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + } + + return idx.getDatabase().createRow(vals0, MEMORY_CALCULATE); + } + + /** + * @param rangeId Requested range ID. + * @return Current row. + */ + public Row get(int rangeId) { + assert rangeId == cursorRangeId; + + return cursor.get(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SegmentKey.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SegmentKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SegmentKey.java new file mode 100644 index 0000000..9bdbab4 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SegmentKey.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt.join; + +import org.apache.ignite.cluster.ClusterNode; + +/** + * Segment key. + */ +public class SegmentKey { + /** */ + private final ClusterNode node; + + /** */ + private final int segmentId; + + /** + * Constructor. + * + * @param node Node. + * @param segmentId Segment ID. + */ + public SegmentKey(ClusterNode node, int segmentId) { + assert node != null; + + this.node = node; + this.segmentId = segmentId; + } + + /** + * @return Node. + */ + public ClusterNode node() { + return node; + } + + /** + * @return Segment ID. + */ + public int segmentId() { + return segmentId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + SegmentKey key = (SegmentKey)o; + + return segmentId == key.segmentId && node.id().equals(key.node.id()); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = node.hashCode(); + + res = 31 * res + segmentId; + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SourceKey.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SourceKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SourceKey.java new file mode 100644 index 0000000..9cf7629 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SourceKey.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt.join; + +import java.util.UUID; + +/** + * Key for source. + */ +public class SourceKey { + /** */ + private final UUID ownerId; + + /** */ + private final int segmentId; + + /** */ + private final int batchLookupId; + + /** + * @param ownerId Owner node ID. + * @param segmentId Index segment ID. + * @param batchLookupId Batch lookup ID. + */ + public SourceKey(UUID ownerId, int segmentId, int batchLookupId) { + this.ownerId = ownerId; + this.segmentId = segmentId; + this.batchLookupId = batchLookupId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (!(o instanceof SourceKey)) + return false; + + SourceKey srcKey = (SourceKey)o; + + return batchLookupId == srcKey.batchLookupId && segmentId == srcKey.segmentId && + ownerId.equals(srcKey.ownerId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int hash = ownerId.hashCode(); + + hash = 31 * hash + segmentId; + hash = 31 * hash + batchLookupId; + + return hash; + } +}