WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0860542a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0860542a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0860542a Branch: refs/heads/ignite-10759-1 Commit: 0860542a2d33cb5b20c463cc7308a1fb84d5b946 Parents: 7027ef2 Author: devozerov <voze...@gridgain.com> Authored: Fri Dec 21 17:57:57 2018 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Fri Dec 21 17:57:57 2018 +0300 ---------------------------------------------------------------------- .../processors/query/h2/IgniteH2Indexing.java | 4 +- .../query/h2/opt/DistributedJoinMode.java | 51 -- .../query/h2/opt/GridH2CollocationModel.java | 838 ------------------ .../query/h2/opt/GridH2IndexBase.java | 5 +- .../query/h2/opt/GridH2QueryContext.java | 4 +- .../query/h2/opt/join/DistributedJoinMode.java | 51 ++ .../h2/opt/join/DistributedLookupBatch.java | 2 +- .../h2/opt/join/GridH2CollocationModel.java | 841 +++++++++++++++++++ .../query/h2/sql/GridSqlQuerySplitter.java | 2 +- .../query/h2/twostep/GridMapQueryExecutor.java | 6 +- .../h2/twostep/GridReduceQueryExecutor.java | 2 +- 11 files changed, 906 insertions(+), 900 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 29f39b9..931a1dc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -178,8 +178,8 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT; import static org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_CACHE_ID; import static org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_STATE; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode; +import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF; +import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE; http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java deleted file mode 100644 index cc06244..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.opt; - -/** - * Defines set of distributed join modes. - */ -public enum DistributedJoinMode { - /** - * Distributed joins is disabled. Local joins will be performed instead. - */ - OFF, - - /** - * Distributed joins is enabled within local node only. - * - * NOTE: This mode is used with segmented indices for local sql queries. - * As in this case we need to make distributed join across local index segments - * and prevent range-queries to other nodes. - */ - LOCAL_ONLY, - - /** - * Distributed joins is enabled. - */ - ON; - - /** - * @param isLocal Query local flag. - * @param distributedJoins Query distributed joins flag. - * @return DistributedJoinMode for the query. - */ - public static DistributedJoinMode distributedJoinMode(boolean isLocal, boolean distributedJoins) { - return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java deleted file mode 100644 index 2a92511..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java +++ /dev/null @@ -1,838 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.opt; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import javax.cache.CacheException; -import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.SB; -import org.h2.command.dml.Query; -import org.h2.command.dml.Select; -import org.h2.command.dml.SelectUnion; -import org.h2.expression.Comparison; -import org.h2.expression.Expression; -import org.h2.expression.ExpressionColumn; -import org.h2.index.IndexCondition; -import org.h2.index.ViewIndex; -import org.h2.table.Column; -import org.h2.table.IndexColumn; -import org.h2.table.SubQueryInfo; -import org.h2.table.Table; -import org.h2.table.TableFilter; -import org.h2.table.TableView; - -/** - * Collocation model for a query. - */ -public final class GridH2CollocationModel { - /** */ - public static final int MULTIPLIER_COLLOCATED = 1; - - /** */ - private static final int MULTIPLIER_UNICAST = 50; - - /** */ - private static final int MULTIPLIER_BROADCAST = 200; - - /** */ - private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000; - - /** */ - private final GridH2CollocationModel upper; - - /** */ - private final int filter; - - /** */ - private final boolean view; - - /** */ - private int multiplier; - - /** */ - private Type type; - - /** */ - private GridH2CollocationModel[] children; - - /** */ - private TableFilter[] childFilters; - - /** */ - private List<GridH2CollocationModel> unions; - - /** */ - private Select select; - - /** */ - private final boolean validate; - - /** - * @param upper Upper. - * @param filter Filter. - * @param view This model will be a subquery (or top level query) and must contain child filters. - * @param validate Query validation flag. - */ - private GridH2CollocationModel(GridH2CollocationModel upper, int filter, boolean view, boolean validate) { - this.upper = upper; - this.filter = filter; - this.view = view; - this.validate = validate; - } - - /** - * @return Table filter for this collocation model. - */ - private TableFilter filter() { - return upper == null ? null : upper.childFilters[filter]; - } - - /** {@inheritDoc} */ - @Override public String toString() { - calculate(); - - SB b = new SB(); - - for (int lvl = 0; lvl < 20; lvl++) { - if (!toString(b, lvl)) - break; - - b.a('\n'); - } - - return b.toString(); - } - - /** - * @param b String builder. - * @param lvl Depth level. - */ - private boolean toString(SB b, int lvl) { - boolean res = false; - - if (lvl == 0) { - TableFilter f = filter(); - String tblAlias = f == null ? "^" : f.getTableAlias(); - - b.a("[tbl=").a(tblAlias).a(", type=").a(type).a(", mul=").a(multiplier).a("]"); - - res = true; - } - else if (childFilters != null) { - assert lvl > 0; - - lvl--; - - for (int i = 0; i < childFilters.length; i++) { - if (lvl == 0) - b.a(" | "); - - res |= child(i, true).toString(b, lvl); - } - - if (lvl == 0) - b.a(" | "); - } - - return res; - } - - /** - * @param upper Upper. - * @param filter Filter. - * @param unions Unions. - * @param view This model will be a subquery (or top level query) and must contain child filters. - * @param validate Query validation flag. - * @return Created child collocation model. - */ - private static GridH2CollocationModel createChildModel(GridH2CollocationModel upper, - int filter, - List<GridH2CollocationModel> unions, - boolean view, - boolean validate) { - GridH2CollocationModel child = new GridH2CollocationModel(upper, filter, view, validate); - - if (unions != null) { - // Bind created child to unions. - assert upper == null || upper.child(filter, false) != null || unions.isEmpty(); - - if (upper != null && unions.isEmpty()) { - assert upper.child(filter, false) == null; - - upper.children[filter] = child; - } - - unions.add(child); - - child.unions = unions; - } - else if (upper != null) { - // Bind created child to upper model. - assert upper.child(filter, false) == null; - - upper.children[filter] = child; - } - - return child; - } - - /** - * @param childFilters New child filters. - * @return {@code true} If child filters were updated. - */ - private boolean childFilters(TableFilter[] childFilters) { - assert childFilters != null; - assert view; - - Select select = childFilters[0].getSelect(); - - assert this.select == null || this.select == select; - - if (this.select == null) { - this.select = select; - - assert this.childFilters == null; - } - else if (Arrays.equals(this.childFilters, childFilters)) - return false; - - if (this.childFilters == null) { - // We have to clone because H2 reuses array and reorders elements. - this.childFilters = childFilters.clone(); - - children = new GridH2CollocationModel[childFilters.length]; - } - else { - assert this.childFilters.length == childFilters.length; - - // We have to copy because H2 reuses array and reorders elements. - System.arraycopy(childFilters, 0, this.childFilters, 0, childFilters.length); - - Arrays.fill(children, null); - } - - // Reset results. - type = null; - multiplier = 0; - - return true; - } - - /** - * Do the needed calculations. - */ - @SuppressWarnings("ConstantConditions") - private void calculate() { - if (type != null) - return; - - if (view) { // We are at (sub-)query model. - assert childFilters != null; - - boolean collocated = true; - boolean partitioned = false; - int maxMultiplier = MULTIPLIER_COLLOCATED; - - for (int i = 0; i < childFilters.length; i++) { - GridH2CollocationModel child = child(i, true); - - Type t = child.type(true); - - if (child.multiplier == MULTIPLIER_REPLICATED_NOT_LAST) - maxMultiplier = child.multiplier; - - if (t.isPartitioned()) { - partitioned = true; - - if (!t.isCollocated()) { - collocated = false; - - int m = child.multiplier(true); - - if (m > maxMultiplier) { - maxMultiplier = m; - - if (maxMultiplier == MULTIPLIER_REPLICATED_NOT_LAST) - break; - } - } - } - } - - type = Type.of(partitioned, collocated); - multiplier = maxMultiplier; - } - else { - assert upper != null; - assert childFilters == null; - - // We are at table instance. - Table tbl = filter().getTable(); - - // Only partitioned tables will do distributed joins. - if (!(tbl instanceof GridH2Table) || !((GridH2Table)tbl).isPartitioned()) { - type = Type.REPLICATED; - multiplier = MULTIPLIER_COLLOCATED; - - return; - } - - // If we are the first partitioned table in a join, then we are "base" for all the rest partitioned tables - // which will need to get remote result (if there is no affinity condition). Since this query is broadcasted - // to all the affinity nodes the "base" does not need to get remote results. - if (!upper.findPartitionedTableBefore(filter)) { - type = Type.PARTITIONED_COLLOCATED; - multiplier = MULTIPLIER_COLLOCATED; - } - else { - // It is enough to make sure that our previous join by affinity key is collocated, then we are - // collocated. If we at least have affinity key condition, then we do unicast which is cheaper. - switch (upper.joinedWithCollocated(filter)) { - case COLLOCATED_JOIN: - type = Type.PARTITIONED_COLLOCATED; - multiplier = MULTIPLIER_COLLOCATED; - - break; - - case HAS_AFFINITY_CONDITION: - type = Type.PARTITIONED_NOT_COLLOCATED; - multiplier = MULTIPLIER_UNICAST; - - break; - - case NONE: - type = Type.PARTITIONED_NOT_COLLOCATED; - multiplier = MULTIPLIER_BROADCAST; - - break; - - default: - throw new IllegalStateException(); - } - } - - if (upper.previousReplicated(filter)) - multiplier = MULTIPLIER_REPLICATED_NOT_LAST; - } - } - - /** - * @param f Current filter. - * @return {@code true} If partitioned table was found. - */ - private boolean findPartitionedTableBefore(int f) { - for (int i = 0; i < f; i++) { - GridH2CollocationModel child = child(i, true); - - // The c can be null if it is not a GridH2Table and not a sub-query, - // it is a some kind of function table or anything else that considered replicated. - if (child != null && child.type(true).isPartitioned()) - return true; - } - - // We have to search globally in upper queries as well. - return upper != null && upper.findPartitionedTableBefore(filter); - } - - /** - * @param f Current filter. - * @return {@code true} If previous table is REPLICATED. - */ - @SuppressWarnings("SimplifiableIfStatement") - private boolean previousReplicated(int f) { - if (f > 0 && child(f - 1, true).type(true) == Type.REPLICATED) - return true; - - return upper != null && upper.previousReplicated(filter); - } - - /** - * @param f Filter. - * @return Affinity join type. - */ - @SuppressWarnings("ForLoopReplaceableByForEach") - private Affinity joinedWithCollocated(int f) { - TableFilter tf = childFilters[f]; - - GridH2Table tbl = (GridH2Table)tf.getTable(); - - if (validate) { - if (tbl.isCustomAffinityMapper()) - throw customAffinityError(tbl.cacheName()); - - if (F.isEmpty(tf.getIndexConditions())) { - throw new CacheException("Failed to prepare distributed join query: " + - "join condition does not use index [joinedCache=" + tbl.cacheName() + - ", plan=" + tf.getSelect().getPlanSQL() + ']'); - } - } - - IndexColumn affCol = tbl.getAffinityKeyColumn(); - - boolean affKeyCondFound = false; - - if (affCol != null) { - ArrayList<IndexCondition> idxConditions = tf.getIndexConditions(); - - int affColId = affCol.column.getColumnId(); - - for (int i = 0; i < idxConditions.size(); i++) { - IndexCondition c = idxConditions.get(i); - int colId = c.getColumn().getColumnId(); - int cmpType = c.getCompareType(); - - if ((cmpType == Comparison.EQUAL || cmpType == Comparison.EQUAL_NULL_SAFE) && - (colId == affColId || tbl.rowDescriptor().isKeyColumn(colId)) && c.isEvaluatable()) { - affKeyCondFound = true; - - Expression exp = c.getExpression(); - exp = exp.getNonAliasExpression(); - - if (exp instanceof ExpressionColumn) { - ExpressionColumn expCol = (ExpressionColumn)exp; - - // This is one of our previous joins. - TableFilter prevJoin = expCol.getTableFilter(); - - if (prevJoin != null) { - GridH2CollocationModel cm = child(indexOf(prevJoin), true); - - // If the previous joined model is a subquery (view), we can not be sure that - // the found affinity column is the needed one, since we can select multiple - // different affinity columns from different tables. - if (cm != null && !cm.view) { - Type t = cm.type(true); - - if (t.isPartitioned() && t.isCollocated() && isAffinityColumn(prevJoin, expCol, validate)) - return Affinity.COLLOCATED_JOIN; - } - } - } - } - } - } - - return affKeyCondFound ? Affinity.HAS_AFFINITY_CONDITION : Affinity.NONE; - } - - /** - * @param f Table filter. - * @return Index. - */ - private int indexOf(TableFilter f) { - for (int i = 0; i < childFilters.length; i++) { - if (childFilters[i] == f) - return i; - } - - throw new IllegalStateException(); - } - - /** - * @param f Table filter. - * @param expCol Expression column. - * @param validate Query validation flag. - * @return {@code true} It it is an affinity column. - */ - private static boolean isAffinityColumn(TableFilter f, ExpressionColumn expCol, boolean validate) { - Column col = expCol.getColumn(); - - if (col == null) - return false; - - Table t = col.getTable(); - - if (t.isView()) { - Query qry; - - if (f.getIndex() != null) - qry = getSubQuery(f); - else - qry = GridSqlQueryParser.VIEW_QUERY.get((TableView)t); - - return isAffinityColumn(qry, expCol, validate); - } - - if (t instanceof GridH2Table) { - GridH2Table t0 = (GridH2Table)t; - - if (validate && t0.isCustomAffinityMapper()) - throw customAffinityError((t0).cacheName()); - - IndexColumn affCol = t0.getAffinityKeyColumn(); - - return affCol != null && col.getColumnId() == affCol.column.getColumnId(); - } - - return false; - } - - /** - * @param qry Query. - * @param expCol Expression column. - * @param validate Query validation flag. - * @return {@code true} It it is an affinity column. - */ - private static boolean isAffinityColumn(Query qry, ExpressionColumn expCol, boolean validate) { - if (qry.isUnion()) { - SelectUnion union = (SelectUnion)qry; - - return isAffinityColumn(union.getLeft(), expCol, validate) && isAffinityColumn(union.getRight(), expCol, validate); - } - - Expression exp = qry.getExpressions().get(expCol.getColumn().getColumnId()).getNonAliasExpression(); - - if (exp instanceof ExpressionColumn) { - expCol = (ExpressionColumn)exp; - - return isAffinityColumn(expCol.getTableFilter(), expCol, validate); - } - - return false; - } - - /** - * @return Multiplier. - */ - public int calculateMultiplier() { - // We don't need multiplier for union here because it will be summarized in H2. - return multiplier(false); - } - - /** - * @param withUnion With respect to union. - * @return Multiplier. - */ - @SuppressWarnings("ForLoopReplaceableByForEach") - private int multiplier(boolean withUnion) { - calculate(); - - assert multiplier != 0; - - if (withUnion && unions != null) { - int maxMultiplier = 0; - - for (int i = 0; i < unions.size(); i++) { - int m = unions.get(i).multiplier(false); - - if (m > maxMultiplier) - maxMultiplier = m; - } - - return maxMultiplier; - } - - return multiplier; - } - - /** - * @param withUnion With respect to union. - * @return Type. - */ - private Type type(boolean withUnion) { - calculate(); - - assert type != null; - - if (withUnion && unions != null) { - Type left = unions.get(0).type(false); - - for (int i = 1; i < unions.size(); i++) { - Type right = unions.get(i).type(false); - - if (!left.isCollocated() || !right.isCollocated()) { - left = Type.PARTITIONED_NOT_COLLOCATED; - - break; - } - else if (!left.isPartitioned() && !right.isPartitioned()) - left = Type.REPLICATED; - else - left = Type.PARTITIONED_COLLOCATED; - } - - return left; - } - - return type; - } - - /** - * @param i Index. - * @param create Create child if needed. - * @return Child collocation. - */ - private GridH2CollocationModel child(int i, boolean create) { - GridH2CollocationModel child = children[i]; - - if (child == null && create) { - TableFilter f = childFilters[i]; - - if (f.getTable().isView()) { - if (f.getIndex() == null) { - // If we don't have view index yet, then we just creating empty model and it must be filled later. - child = createChildModel(this, i, null, true, validate); - } - else - child = buildCollocationModel(this, i, getSubQuery(f), null, validate); - } - else - child = createChildModel(this, i, null, false, validate); - - assert child != null; - assert children[i] == child; - } - - return child; - } - - /** - * @param f Table filter. - * @return Sub-query. - */ - private static Query getSubQuery(TableFilter f) { - return ((ViewIndex)f.getIndex()).getQuery(); - } - - /** - * @return Unions list. - */ - private List<GridH2CollocationModel> getOrCreateUnions() { - if (unions == null) { - unions = new ArrayList<>(4); - - unions.add(this); - } - - return unions; - } - - /** - * @param qctx Query context. - * @param info Sub-query info. - * @param filters Filters. - * @param filter Filter. - * @param validate Query validation flag. - * @return Collocation. - */ - public static GridH2CollocationModel buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info, - TableFilter[] filters, int filter, boolean validate) { - GridH2CollocationModel cm; - - if (info != null) { - // Go up until we reach the root query. - cm = buildCollocationModel(qctx, info.getUpper(), info.getFilters(), info.getFilter(), validate); - } - else { - // We are at the root query. - cm = qctx.queryCollocationModel(); - - if (cm == null) { - cm = createChildModel(null, -1, null, true, validate); - - qctx.queryCollocationModel(cm); - } - } - - if (filters == null) - return cm; - - assert cm.view; - - Select select = filters[0].getSelect(); - - // Handle union. We have to rely on fact that select will be the same on uppermost select. - // For sub-queries we will drop collocation models, so that they will be recalculated anyways. - if (cm.select != null && cm.select != select) { - List<GridH2CollocationModel> unions = cm.getOrCreateUnions(); - - // Try to find this select in existing unions. - // Start with 1 because at 0 it always will be c. - for (int i = 1; i < unions.size(); i++) { - GridH2CollocationModel u = unions.get(i); - - if (u.select == select) { - cm = u; - - break; - } - } - - // Nothing was found, need to create new child in union. - if (cm.select != select) - cm = createChildModel(cm.upper, cm.filter, unions, true, validate); - } - - cm.childFilters(filters); - - return cm.child(filter, true); - } - - /** - * @param qry Query. - * @return {@code true} If the query is collocated. - */ - public static boolean isCollocated(Query qry) { - GridH2CollocationModel mdl = buildCollocationModel(null, -1, qry, null, true); - - Type type = mdl.type(true); - - if (!type.isCollocated() && mdl.multiplier == MULTIPLIER_REPLICATED_NOT_LAST) - throw new CacheException("Failed to execute query: for distributed join " + - "all REPLICATED caches must be at the end of the joined tables list."); - - return type.isCollocated(); - } - - /** - * @param upper Upper. - * @param filter Filter. - * @param qry Query. - * @param unions Unions. - * @param validate Query validation flag. - * @return Built model. - */ - private static GridH2CollocationModel buildCollocationModel(GridH2CollocationModel upper, - int filter, - Query qry, - List<GridH2CollocationModel> unions, - boolean validate) { - if (qry.isUnion()) { - if (unions == null) - unions = new ArrayList<>(); - - SelectUnion union = (SelectUnion)qry; - - GridH2CollocationModel left = buildCollocationModel(upper, filter, union.getLeft(), unions, validate); - GridH2CollocationModel right = buildCollocationModel(upper, filter, union.getRight(), unions, validate); - - assert left != null; - assert right != null; - - return upper != null ? upper : left; - } - - Select select = (Select)qry; - - List<TableFilter> list = new ArrayList<>(); - - for (TableFilter f = select.getTopTableFilter(); f != null; f = f.getJoin()) - list.add(f); - - TableFilter[] filters = list.toArray(new TableFilter[list.size()]); - - GridH2CollocationModel cm = createChildModel(upper, filter, unions, true, validate); - - cm.childFilters(filters); - - for (int i = 0; i < filters.length; i++) { - TableFilter f = filters[i]; - - if (f.getTable().isView()) - buildCollocationModel(cm, i, getSubQuery(f), null, validate); - else if (f.getTable() instanceof GridH2Table) - createChildModel(cm, i, null, false, validate); - } - - return upper != null ? upper : cm; - } - - /** - * @param cacheName Cache name. - * @return Error. - */ - private static CacheException customAffinityError(String cacheName) { - return new CacheException("Failed to prepare distributed join query: can not use distributed joins for cache " + - "with custom AffinityKeyMapper configured. " + - "Please use AffinityKeyMapped annotation instead [cache=" + cacheName + ']'); - } - - /** - * Collocation type. - */ - private enum Type { - /** */ - PARTITIONED_COLLOCATED(true, true), - - /** */ - PARTITIONED_NOT_COLLOCATED(true, false), - - /** */ - REPLICATED(false, true); - - /** */ - private final boolean partitioned; - - /** */ - private final boolean collocated; - - /** - * @param partitioned Partitioned. - * @param collocated Collocated. - */ - Type(boolean partitioned, boolean collocated) { - this.partitioned = partitioned; - this.collocated = collocated; - } - - /** - * @return {@code true} If partitioned. - */ - public boolean isPartitioned() { - return partitioned; - } - - /** - * @return {@code true} If collocated. - */ - public boolean isCollocated() { - return collocated; - } - - /** - * @param partitioned Partitioned. - * @param collocated Collocated. - * @return Type. - */ - static Type of(boolean partitioned, boolean collocated) { - if (collocated) - return partitioned ? Type.PARTITIONED_COLLOCATED : Type.REPLICATED; - - assert partitioned; - - return Type.PARTITIONED_NOT_COLLOCATED; - } - } - - /** - * Affinity of a table relative to previous joined tables. - */ - private enum Affinity { - /** */ - NONE, - - /** */ - HAS_AFFINITY_CONDITION, - - /** */ - COLLOCATED_JOIN - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index 3d7d388..6d8f2ed 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.query.h2.H2Cursor; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.opt.join.CursorIteratorWrapper; import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch; +import org.apache.ignite.internal.processors.query.h2.opt.join.GridH2CollocationModel; import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource; import org.apache.ignite.internal.processors.query.h2.opt.join.RangeStream; import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey; @@ -71,8 +72,8 @@ import java.util.Map; import java.util.UUID; import static java.util.Collections.singletonList; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; -import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel; +import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF; +import static org.apache.ignite.internal.processors.query.h2.opt.join.GridH2CollocationModel.buildCollocationModel; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE; http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java index ef2fd25..addd137 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java @@ -27,6 +27,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; +import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode; +import org.apache.ignite.internal.processors.query.h2.opt.join.GridH2CollocationModel; import org.apache.ignite.internal.processors.query.h2.opt.join.SourceKey; import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker; import org.apache.ignite.internal.util.typedef.F; @@ -34,7 +36,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; +import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java new file mode 100644 index 0000000..7c958da --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt.join; + +/** + * Defines set of distributed join modes. + */ +public enum DistributedJoinMode { + /** + * Distributed joins is disabled. Local joins will be performed instead. + */ + OFF, + + /** + * Distributed joins is enabled within local node only. + * + * NOTE: This mode is used with segmented indices for local sql queries. + * As in this case we need to make distributed join across local index segments + * and prevent range-queries to other nodes. + */ + LOCAL_ONLY, + + /** + * Distributed joins is enabled. + */ + ON; + + /** + * @param isLocal Query local flag. + * @param distributedJoins Query distributed joins flag. + * @return DistributedJoinMode for the query. + */ + public static DistributedJoinMode distributedJoinMode(boolean isLocal, boolean distributedJoins) { + return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java index 60106a8..02d2e44 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java @@ -43,7 +43,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.Future; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY; +import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.LOCAL_ONLY; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS; import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds; http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/GridH2CollocationModel.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/GridH2CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/GridH2CollocationModel.java new file mode 100644 index 0000000..430bc71 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/GridH2CollocationModel.java @@ -0,0 +1,841 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt.join; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import javax.cache.CacheException; + +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.SB; +import org.h2.command.dml.Query; +import org.h2.command.dml.Select; +import org.h2.command.dml.SelectUnion; +import org.h2.expression.Comparison; +import org.h2.expression.Expression; +import org.h2.expression.ExpressionColumn; +import org.h2.index.IndexCondition; +import org.h2.index.ViewIndex; +import org.h2.table.Column; +import org.h2.table.IndexColumn; +import org.h2.table.SubQueryInfo; +import org.h2.table.Table; +import org.h2.table.TableFilter; +import org.h2.table.TableView; + +/** + * Collocation model for a query. + */ +public final class GridH2CollocationModel { + /** */ + public static final int MULTIPLIER_COLLOCATED = 1; + + /** */ + private static final int MULTIPLIER_UNICAST = 50; + + /** */ + private static final int MULTIPLIER_BROADCAST = 200; + + /** */ + private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000; + + /** */ + private final GridH2CollocationModel upper; + + /** */ + private final int filter; + + /** */ + private final boolean view; + + /** */ + private int multiplier; + + /** */ + private Type type; + + /** */ + private GridH2CollocationModel[] children; + + /** */ + private TableFilter[] childFilters; + + /** */ + private List<GridH2CollocationModel> unions; + + /** */ + private Select select; + + /** */ + private final boolean validate; + + /** + * @param upper Upper. + * @param filter Filter. + * @param view This model will be a subquery (or top level query) and must contain child filters. + * @param validate Query validation flag. + */ + private GridH2CollocationModel(GridH2CollocationModel upper, int filter, boolean view, boolean validate) { + this.upper = upper; + this.filter = filter; + this.view = view; + this.validate = validate; + } + + /** + * @return Table filter for this collocation model. + */ + private TableFilter filter() { + return upper == null ? null : upper.childFilters[filter]; + } + + /** {@inheritDoc} */ + @Override public String toString() { + calculate(); + + SB b = new SB(); + + for (int lvl = 0; lvl < 20; lvl++) { + if (!toString(b, lvl)) + break; + + b.a('\n'); + } + + return b.toString(); + } + + /** + * @param b String builder. + * @param lvl Depth level. + */ + private boolean toString(SB b, int lvl) { + boolean res = false; + + if (lvl == 0) { + TableFilter f = filter(); + String tblAlias = f == null ? "^" : f.getTableAlias(); + + b.a("[tbl=").a(tblAlias).a(", type=").a(type).a(", mul=").a(multiplier).a("]"); + + res = true; + } + else if (childFilters != null) { + assert lvl > 0; + + lvl--; + + for (int i = 0; i < childFilters.length; i++) { + if (lvl == 0) + b.a(" | "); + + res |= child(i, true).toString(b, lvl); + } + + if (lvl == 0) + b.a(" | "); + } + + return res; + } + + /** + * @param upper Upper. + * @param filter Filter. + * @param unions Unions. + * @param view This model will be a subquery (or top level query) and must contain child filters. + * @param validate Query validation flag. + * @return Created child collocation model. + */ + private static GridH2CollocationModel createChildModel(GridH2CollocationModel upper, + int filter, + List<GridH2CollocationModel> unions, + boolean view, + boolean validate) { + GridH2CollocationModel child = new GridH2CollocationModel(upper, filter, view, validate); + + if (unions != null) { + // Bind created child to unions. + assert upper == null || upper.child(filter, false) != null || unions.isEmpty(); + + if (upper != null && unions.isEmpty()) { + assert upper.child(filter, false) == null; + + upper.children[filter] = child; + } + + unions.add(child); + + child.unions = unions; + } + else if (upper != null) { + // Bind created child to upper model. + assert upper.child(filter, false) == null; + + upper.children[filter] = child; + } + + return child; + } + + /** + * @param childFilters New child filters. + * @return {@code true} If child filters were updated. + */ + private boolean childFilters(TableFilter[] childFilters) { + assert childFilters != null; + assert view; + + Select select = childFilters[0].getSelect(); + + assert this.select == null || this.select == select; + + if (this.select == null) { + this.select = select; + + assert this.childFilters == null; + } + else if (Arrays.equals(this.childFilters, childFilters)) + return false; + + if (this.childFilters == null) { + // We have to clone because H2 reuses array and reorders elements. + this.childFilters = childFilters.clone(); + + children = new GridH2CollocationModel[childFilters.length]; + } + else { + assert this.childFilters.length == childFilters.length; + + // We have to copy because H2 reuses array and reorders elements. + System.arraycopy(childFilters, 0, this.childFilters, 0, childFilters.length); + + Arrays.fill(children, null); + } + + // Reset results. + type = null; + multiplier = 0; + + return true; + } + + /** + * Do the needed calculations. + */ + @SuppressWarnings("ConstantConditions") + private void calculate() { + if (type != null) + return; + + if (view) { // We are at (sub-)query model. + assert childFilters != null; + + boolean collocated = true; + boolean partitioned = false; + int maxMultiplier = MULTIPLIER_COLLOCATED; + + for (int i = 0; i < childFilters.length; i++) { + GridH2CollocationModel child = child(i, true); + + Type t = child.type(true); + + if (child.multiplier == MULTIPLIER_REPLICATED_NOT_LAST) + maxMultiplier = child.multiplier; + + if (t.isPartitioned()) { + partitioned = true; + + if (!t.isCollocated()) { + collocated = false; + + int m = child.multiplier(true); + + if (m > maxMultiplier) { + maxMultiplier = m; + + if (maxMultiplier == MULTIPLIER_REPLICATED_NOT_LAST) + break; + } + } + } + } + + type = Type.of(partitioned, collocated); + multiplier = maxMultiplier; + } + else { + assert upper != null; + assert childFilters == null; + + // We are at table instance. + Table tbl = filter().getTable(); + + // Only partitioned tables will do distributed joins. + if (!(tbl instanceof GridH2Table) || !((GridH2Table)tbl).isPartitioned()) { + type = Type.REPLICATED; + multiplier = MULTIPLIER_COLLOCATED; + + return; + } + + // If we are the first partitioned table in a join, then we are "base" for all the rest partitioned tables + // which will need to get remote result (if there is no affinity condition). Since this query is broadcasted + // to all the affinity nodes the "base" does not need to get remote results. + if (!upper.findPartitionedTableBefore(filter)) { + type = Type.PARTITIONED_COLLOCATED; + multiplier = MULTIPLIER_COLLOCATED; + } + else { + // It is enough to make sure that our previous join by affinity key is collocated, then we are + // collocated. If we at least have affinity key condition, then we do unicast which is cheaper. + switch (upper.joinedWithCollocated(filter)) { + case COLLOCATED_JOIN: + type = Type.PARTITIONED_COLLOCATED; + multiplier = MULTIPLIER_COLLOCATED; + + break; + + case HAS_AFFINITY_CONDITION: + type = Type.PARTITIONED_NOT_COLLOCATED; + multiplier = MULTIPLIER_UNICAST; + + break; + + case NONE: + type = Type.PARTITIONED_NOT_COLLOCATED; + multiplier = MULTIPLIER_BROADCAST; + + break; + + default: + throw new IllegalStateException(); + } + } + + if (upper.previousReplicated(filter)) + multiplier = MULTIPLIER_REPLICATED_NOT_LAST; + } + } + + /** + * @param f Current filter. + * @return {@code true} If partitioned table was found. + */ + private boolean findPartitionedTableBefore(int f) { + for (int i = 0; i < f; i++) { + GridH2CollocationModel child = child(i, true); + + // The c can be null if it is not a GridH2Table and not a sub-query, + // it is a some kind of function table or anything else that considered replicated. + if (child != null && child.type(true).isPartitioned()) + return true; + } + + // We have to search globally in upper queries as well. + return upper != null && upper.findPartitionedTableBefore(filter); + } + + /** + * @param f Current filter. + * @return {@code true} If previous table is REPLICATED. + */ + @SuppressWarnings("SimplifiableIfStatement") + private boolean previousReplicated(int f) { + if (f > 0 && child(f - 1, true).type(true) == Type.REPLICATED) + return true; + + return upper != null && upper.previousReplicated(filter); + } + + /** + * @param f Filter. + * @return Affinity join type. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private Affinity joinedWithCollocated(int f) { + TableFilter tf = childFilters[f]; + + GridH2Table tbl = (GridH2Table)tf.getTable(); + + if (validate) { + if (tbl.isCustomAffinityMapper()) + throw customAffinityError(tbl.cacheName()); + + if (F.isEmpty(tf.getIndexConditions())) { + throw new CacheException("Failed to prepare distributed join query: " + + "join condition does not use index [joinedCache=" + tbl.cacheName() + + ", plan=" + tf.getSelect().getPlanSQL() + ']'); + } + } + + IndexColumn affCol = tbl.getAffinityKeyColumn(); + + boolean affKeyCondFound = false; + + if (affCol != null) { + ArrayList<IndexCondition> idxConditions = tf.getIndexConditions(); + + int affColId = affCol.column.getColumnId(); + + for (int i = 0; i < idxConditions.size(); i++) { + IndexCondition c = idxConditions.get(i); + int colId = c.getColumn().getColumnId(); + int cmpType = c.getCompareType(); + + if ((cmpType == Comparison.EQUAL || cmpType == Comparison.EQUAL_NULL_SAFE) && + (colId == affColId || tbl.rowDescriptor().isKeyColumn(colId)) && c.isEvaluatable()) { + affKeyCondFound = true; + + Expression exp = c.getExpression(); + exp = exp.getNonAliasExpression(); + + if (exp instanceof ExpressionColumn) { + ExpressionColumn expCol = (ExpressionColumn)exp; + + // This is one of our previous joins. + TableFilter prevJoin = expCol.getTableFilter(); + + if (prevJoin != null) { + GridH2CollocationModel cm = child(indexOf(prevJoin), true); + + // If the previous joined model is a subquery (view), we can not be sure that + // the found affinity column is the needed one, since we can select multiple + // different affinity columns from different tables. + if (cm != null && !cm.view) { + Type t = cm.type(true); + + if (t.isPartitioned() && t.isCollocated() && isAffinityColumn(prevJoin, expCol, validate)) + return Affinity.COLLOCATED_JOIN; + } + } + } + } + } + } + + return affKeyCondFound ? Affinity.HAS_AFFINITY_CONDITION : Affinity.NONE; + } + + /** + * @param f Table filter. + * @return Index. + */ + private int indexOf(TableFilter f) { + for (int i = 0; i < childFilters.length; i++) { + if (childFilters[i] == f) + return i; + } + + throw new IllegalStateException(); + } + + /** + * @param f Table filter. + * @param expCol Expression column. + * @param validate Query validation flag. + * @return {@code true} It it is an affinity column. + */ + private static boolean isAffinityColumn(TableFilter f, ExpressionColumn expCol, boolean validate) { + Column col = expCol.getColumn(); + + if (col == null) + return false; + + Table t = col.getTable(); + + if (t.isView()) { + Query qry; + + if (f.getIndex() != null) + qry = getSubQuery(f); + else + qry = GridSqlQueryParser.VIEW_QUERY.get((TableView)t); + + return isAffinityColumn(qry, expCol, validate); + } + + if (t instanceof GridH2Table) { + GridH2Table t0 = (GridH2Table)t; + + if (validate && t0.isCustomAffinityMapper()) + throw customAffinityError((t0).cacheName()); + + IndexColumn affCol = t0.getAffinityKeyColumn(); + + return affCol != null && col.getColumnId() == affCol.column.getColumnId(); + } + + return false; + } + + /** + * @param qry Query. + * @param expCol Expression column. + * @param validate Query validation flag. + * @return {@code true} It it is an affinity column. + */ + private static boolean isAffinityColumn(Query qry, ExpressionColumn expCol, boolean validate) { + if (qry.isUnion()) { + SelectUnion union = (SelectUnion)qry; + + return isAffinityColumn(union.getLeft(), expCol, validate) && isAffinityColumn(union.getRight(), expCol, validate); + } + + Expression exp = qry.getExpressions().get(expCol.getColumn().getColumnId()).getNonAliasExpression(); + + if (exp instanceof ExpressionColumn) { + expCol = (ExpressionColumn)exp; + + return isAffinityColumn(expCol.getTableFilter(), expCol, validate); + } + + return false; + } + + /** + * @return Multiplier. + */ + public int calculateMultiplier() { + // We don't need multiplier for union here because it will be summarized in H2. + return multiplier(false); + } + + /** + * @param withUnion With respect to union. + * @return Multiplier. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private int multiplier(boolean withUnion) { + calculate(); + + assert multiplier != 0; + + if (withUnion && unions != null) { + int maxMultiplier = 0; + + for (int i = 0; i < unions.size(); i++) { + int m = unions.get(i).multiplier(false); + + if (m > maxMultiplier) + maxMultiplier = m; + } + + return maxMultiplier; + } + + return multiplier; + } + + /** + * @param withUnion With respect to union. + * @return Type. + */ + private Type type(boolean withUnion) { + calculate(); + + assert type != null; + + if (withUnion && unions != null) { + Type left = unions.get(0).type(false); + + for (int i = 1; i < unions.size(); i++) { + Type right = unions.get(i).type(false); + + if (!left.isCollocated() || !right.isCollocated()) { + left = Type.PARTITIONED_NOT_COLLOCATED; + + break; + } + else if (!left.isPartitioned() && !right.isPartitioned()) + left = Type.REPLICATED; + else + left = Type.PARTITIONED_COLLOCATED; + } + + return left; + } + + return type; + } + + /** + * @param i Index. + * @param create Create child if needed. + * @return Child collocation. + */ + private GridH2CollocationModel child(int i, boolean create) { + GridH2CollocationModel child = children[i]; + + if (child == null && create) { + TableFilter f = childFilters[i]; + + if (f.getTable().isView()) { + if (f.getIndex() == null) { + // If we don't have view index yet, then we just creating empty model and it must be filled later. + child = createChildModel(this, i, null, true, validate); + } + else + child = buildCollocationModel(this, i, getSubQuery(f), null, validate); + } + else + child = createChildModel(this, i, null, false, validate); + + assert child != null; + assert children[i] == child; + } + + return child; + } + + /** + * @param f Table filter. + * @return Sub-query. + */ + private static Query getSubQuery(TableFilter f) { + return ((ViewIndex)f.getIndex()).getQuery(); + } + + /** + * @return Unions list. + */ + private List<GridH2CollocationModel> getOrCreateUnions() { + if (unions == null) { + unions = new ArrayList<>(4); + + unions.add(this); + } + + return unions; + } + + /** + * @param qctx Query context. + * @param info Sub-query info. + * @param filters Filters. + * @param filter Filter. + * @param validate Query validation flag. + * @return Collocation. + */ + public static GridH2CollocationModel buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info, + TableFilter[] filters, int filter, boolean validate) { + GridH2CollocationModel cm; + + if (info != null) { + // Go up until we reach the root query. + cm = buildCollocationModel(qctx, info.getUpper(), info.getFilters(), info.getFilter(), validate); + } + else { + // We are at the root query. + cm = qctx.queryCollocationModel(); + + if (cm == null) { + cm = createChildModel(null, -1, null, true, validate); + + qctx.queryCollocationModel(cm); + } + } + + if (filters == null) + return cm; + + assert cm.view; + + Select select = filters[0].getSelect(); + + // Handle union. We have to rely on fact that select will be the same on uppermost select. + // For sub-queries we will drop collocation models, so that they will be recalculated anyways. + if (cm.select != null && cm.select != select) { + List<GridH2CollocationModel> unions = cm.getOrCreateUnions(); + + // Try to find this select in existing unions. + // Start with 1 because at 0 it always will be c. + for (int i = 1; i < unions.size(); i++) { + GridH2CollocationModel u = unions.get(i); + + if (u.select == select) { + cm = u; + + break; + } + } + + // Nothing was found, need to create new child in union. + if (cm.select != select) + cm = createChildModel(cm.upper, cm.filter, unions, true, validate); + } + + cm.childFilters(filters); + + return cm.child(filter, true); + } + + /** + * @param qry Query. + * @return {@code true} If the query is collocated. + */ + public static boolean isCollocated(Query qry) { + GridH2CollocationModel mdl = buildCollocationModel(null, -1, qry, null, true); + + Type type = mdl.type(true); + + if (!type.isCollocated() && mdl.multiplier == MULTIPLIER_REPLICATED_NOT_LAST) + throw new CacheException("Failed to execute query: for distributed join " + + "all REPLICATED caches must be at the end of the joined tables list."); + + return type.isCollocated(); + } + + /** + * @param upper Upper. + * @param filter Filter. + * @param qry Query. + * @param unions Unions. + * @param validate Query validation flag. + * @return Built model. + */ + private static GridH2CollocationModel buildCollocationModel(GridH2CollocationModel upper, + int filter, + Query qry, + List<GridH2CollocationModel> unions, + boolean validate) { + if (qry.isUnion()) { + if (unions == null) + unions = new ArrayList<>(); + + SelectUnion union = (SelectUnion)qry; + + GridH2CollocationModel left = buildCollocationModel(upper, filter, union.getLeft(), unions, validate); + GridH2CollocationModel right = buildCollocationModel(upper, filter, union.getRight(), unions, validate); + + assert left != null; + assert right != null; + + return upper != null ? upper : left; + } + + Select select = (Select)qry; + + List<TableFilter> list = new ArrayList<>(); + + for (TableFilter f = select.getTopTableFilter(); f != null; f = f.getJoin()) + list.add(f); + + TableFilter[] filters = list.toArray(new TableFilter[list.size()]); + + GridH2CollocationModel cm = createChildModel(upper, filter, unions, true, validate); + + cm.childFilters(filters); + + for (int i = 0; i < filters.length; i++) { + TableFilter f = filters[i]; + + if (f.getTable().isView()) + buildCollocationModel(cm, i, getSubQuery(f), null, validate); + else if (f.getTable() instanceof GridH2Table) + createChildModel(cm, i, null, false, validate); + } + + return upper != null ? upper : cm; + } + + /** + * @param cacheName Cache name. + * @return Error. + */ + private static CacheException customAffinityError(String cacheName) { + return new CacheException("Failed to prepare distributed join query: can not use distributed joins for cache " + + "with custom AffinityKeyMapper configured. " + + "Please use AffinityKeyMapped annotation instead [cache=" + cacheName + ']'); + } + + /** + * Collocation type. + */ + private enum Type { + /** */ + PARTITIONED_COLLOCATED(true, true), + + /** */ + PARTITIONED_NOT_COLLOCATED(true, false), + + /** */ + REPLICATED(false, true); + + /** */ + private final boolean partitioned; + + /** */ + private final boolean collocated; + + /** + * @param partitioned Partitioned. + * @param collocated Collocated. + */ + Type(boolean partitioned, boolean collocated) { + this.partitioned = partitioned; + this.collocated = collocated; + } + + /** + * @return {@code true} If partitioned. + */ + public boolean isPartitioned() { + return partitioned; + } + + /** + * @return {@code true} If collocated. + */ + public boolean isCollocated() { + return collocated; + } + + /** + * @param partitioned Partitioned. + * @param collocated Collocated. + * @return Type. + */ + static Type of(boolean partitioned, boolean collocated) { + if (collocated) + return partitioned ? Type.PARTITIONED_COLLOCATED : Type.REPLICATED; + + assert partitioned; + + return Type.PARTITIONED_NOT_COLLOCATED; + } + } + + /** + * Affinity of a table relative to previous joined tables. + */ + private enum Affinity { + /** */ + NONE, + + /** */ + HAS_AFFINITY_CONDITION, + + /** */ + COLLOCATED_JOIN + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index 165ec0c..0e7f245 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -53,7 +53,7 @@ import org.h2.command.dml.SelectUnion; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.isCollocated; +import static org.apache.ignite.internal.processors.query.h2.opt.join.GridH2CollocationModel.isCollocated; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst.TRUE; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.AVG; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.CAST; http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index b69ee78..2e611de 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -75,7 +75,7 @@ import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture; import org.apache.ignite.internal.processors.query.h2.UpdateResult; -import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode; +import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode; import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; @@ -111,8 +111,8 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUE import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode; +import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF; +import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED; import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages; http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 20ee1b4..299ab5a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -109,7 +109,7 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkAc import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx; import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; +import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier;