WIP.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0860542a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0860542a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0860542a

Branch: refs/heads/ignite-10759-1
Commit: 0860542a2d33cb5b20c463cc7308a1fb84d5b946
Parents: 7027ef2
Author: devozerov <voze...@gridgain.com>
Authored: Fri Dec 21 17:57:57 2018 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Fri Dec 21 17:57:57 2018 +0300

----------------------------------------------------------------------
 .../processors/query/h2/IgniteH2Indexing.java   |   4 +-
 .../query/h2/opt/DistributedJoinMode.java       |  51 --
 .../query/h2/opt/GridH2CollocationModel.java    | 838 ------------------
 .../query/h2/opt/GridH2IndexBase.java           |   5 +-
 .../query/h2/opt/GridH2QueryContext.java        |   4 +-
 .../query/h2/opt/join/DistributedJoinMode.java  |  51 ++
 .../h2/opt/join/DistributedLookupBatch.java     |   2 +-
 .../h2/opt/join/GridH2CollocationModel.java     | 841 +++++++++++++++++++
 .../query/h2/sql/GridSqlQuerySplitter.java      |   2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   6 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   2 +-
 11 files changed, 906 insertions(+), 900 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 29f39b9..931a1dc 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -178,8 +178,8 @@ import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
 import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
 import static 
org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_CACHE_ID;
 import static 
org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_STATE;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
deleted file mode 100644
index cc06244..0000000
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.opt;
-
-/**
- * Defines set of distributed join modes.
- */
-public enum DistributedJoinMode {
-    /**
-     * Distributed joins is disabled. Local joins will be performed instead.
-     */
-    OFF,
-
-    /**
-     * Distributed joins is enabled within local node only.
-     *
-     * NOTE: This mode is used with segmented indices for local sql queries.
-     * As in this case we need to make distributed join across local index 
segments
-     * and prevent range-queries to other nodes.
-     */
-    LOCAL_ONLY,
-
-    /**
-     * Distributed joins is enabled.
-     */
-    ON;
-
-    /**
-     * @param isLocal Query local flag.
-     * @param distributedJoins Query distributed joins flag.
-     * @return DistributedJoinMode for the query.
-     */
-    public static DistributedJoinMode distributedJoinMode(boolean isLocal, 
boolean distributedJoins) {
-        return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
deleted file mode 100644
index 2a92511..0000000
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
+++ /dev/null
@@ -1,838 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.opt;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import javax.cache.CacheException;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.SB;
-import org.h2.command.dml.Query;
-import org.h2.command.dml.Select;
-import org.h2.command.dml.SelectUnion;
-import org.h2.expression.Comparison;
-import org.h2.expression.Expression;
-import org.h2.expression.ExpressionColumn;
-import org.h2.index.IndexCondition;
-import org.h2.index.ViewIndex;
-import org.h2.table.Column;
-import org.h2.table.IndexColumn;
-import org.h2.table.SubQueryInfo;
-import org.h2.table.Table;
-import org.h2.table.TableFilter;
-import org.h2.table.TableView;
-
-/**
- * Collocation model for a query.
- */
-public final class GridH2CollocationModel {
-    /** */
-    public static final int MULTIPLIER_COLLOCATED = 1;
-
-    /** */
-    private static final int MULTIPLIER_UNICAST = 50;
-
-    /** */
-    private static final int MULTIPLIER_BROADCAST = 200;
-
-    /** */
-    private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000;
-
-    /** */
-    private final GridH2CollocationModel upper;
-
-    /** */
-    private final int filter;
-
-    /** */
-    private final boolean view;
-
-    /** */
-    private int multiplier;
-
-    /** */
-    private Type type;
-
-    /** */
-    private GridH2CollocationModel[] children;
-
-    /** */
-    private TableFilter[] childFilters;
-
-    /** */
-    private List<GridH2CollocationModel> unions;
-
-    /** */
-    private Select select;
-
-    /** */
-    private final boolean validate;
-
-    /**
-     * @param upper Upper.
-     * @param filter Filter.
-     * @param view This model will be a subquery (or top level query) and must 
contain child filters.
-     * @param validate Query validation flag.
-     */
-    private GridH2CollocationModel(GridH2CollocationModel upper, int filter, 
boolean view, boolean validate) {
-        this.upper = upper;
-        this.filter = filter;
-        this.view = view;
-        this.validate = validate;
-    }
-
-    /**
-     * @return Table filter for this collocation model.
-     */
-    private TableFilter filter() {
-        return upper == null ? null : upper.childFilters[filter];
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        calculate();
-
-        SB b = new SB();
-
-        for (int lvl = 0; lvl < 20; lvl++) {
-            if (!toString(b, lvl))
-                break;
-
-            b.a('\n');
-        }
-
-        return b.toString();
-    }
-
-    /**
-     * @param b String builder.
-     * @param lvl Depth level.
-     */
-    private boolean toString(SB b, int lvl) {
-        boolean res = false;
-
-        if (lvl == 0) {
-            TableFilter f = filter();
-            String tblAlias = f == null ? "^" : f.getTableAlias();
-
-            b.a("[tbl=").a(tblAlias).a(", type=").a(type).a(", 
mul=").a(multiplier).a("]");
-
-            res = true;
-        }
-        else if (childFilters != null) {
-            assert lvl > 0;
-
-            lvl--;
-
-            for (int i = 0; i < childFilters.length; i++) {
-                if (lvl == 0)
-                    b.a(" | ");
-
-                res |= child(i, true).toString(b, lvl);
-            }
-
-            if (lvl == 0)
-                b.a(" | ");
-        }
-
-        return res;
-    }
-
-    /**
-     * @param upper Upper.
-     * @param filter Filter.
-     * @param unions Unions.
-     * @param view This model will be a subquery (or top level query) and must 
contain child filters.
-     * @param validate Query validation flag.
-     * @return Created child collocation model.
-     */
-    private static GridH2CollocationModel 
createChildModel(GridH2CollocationModel upper,
-        int filter,
-        List<GridH2CollocationModel> unions,
-        boolean view,
-        boolean validate) {
-        GridH2CollocationModel child = new GridH2CollocationModel(upper, 
filter, view, validate);
-
-        if (unions != null) {
-            // Bind created child to unions.
-            assert upper == null || upper.child(filter, false) != null || 
unions.isEmpty();
-
-            if (upper != null && unions.isEmpty()) {
-                assert upper.child(filter, false) == null;
-
-                upper.children[filter] = child;
-            }
-
-            unions.add(child);
-
-            child.unions = unions;
-        }
-        else if (upper != null) {
-            // Bind created child to upper model.
-            assert upper.child(filter, false) == null;
-
-            upper.children[filter] = child;
-        }
-
-        return child;
-    }
-
-    /**
-     * @param childFilters New child filters.
-     * @return {@code true} If child filters were updated.
-     */
-    private boolean childFilters(TableFilter[] childFilters) {
-        assert childFilters != null;
-        assert view;
-
-        Select select = childFilters[0].getSelect();
-
-        assert this.select == null || this.select == select;
-
-        if (this.select == null) {
-            this.select = select;
-
-            assert this.childFilters == null;
-        }
-        else if (Arrays.equals(this.childFilters, childFilters))
-            return false;
-
-        if (this.childFilters == null) {
-            // We have to clone because H2 reuses array and reorders elements.
-            this.childFilters = childFilters.clone();
-
-            children = new GridH2CollocationModel[childFilters.length];
-        }
-        else {
-            assert this.childFilters.length == childFilters.length;
-
-            // We have to copy because H2 reuses array and reorders elements.
-            System.arraycopy(childFilters, 0, this.childFilters, 0, 
childFilters.length);
-
-            Arrays.fill(children, null);
-        }
-
-        // Reset results.
-        type = null;
-        multiplier = 0;
-
-        return true;
-    }
-
-    /**
-     * Do the needed calculations.
-     */
-    @SuppressWarnings("ConstantConditions")
-    private void calculate() {
-        if (type != null)
-            return;
-
-        if (view) { // We are at (sub-)query model.
-            assert childFilters != null;
-
-            boolean collocated = true;
-            boolean partitioned = false;
-            int maxMultiplier = MULTIPLIER_COLLOCATED;
-
-            for (int i = 0; i < childFilters.length; i++) {
-                GridH2CollocationModel child = child(i, true);
-
-                Type t = child.type(true);
-
-                if (child.multiplier == MULTIPLIER_REPLICATED_NOT_LAST)
-                    maxMultiplier = child.multiplier;
-
-                if (t.isPartitioned()) {
-                    partitioned = true;
-
-                    if (!t.isCollocated()) {
-                        collocated = false;
-
-                        int m = child.multiplier(true);
-
-                        if (m > maxMultiplier) {
-                            maxMultiplier = m;
-
-                            if (maxMultiplier == 
MULTIPLIER_REPLICATED_NOT_LAST)
-                                break;
-                        }
-                    }
-                }
-            }
-
-            type = Type.of(partitioned, collocated);
-            multiplier = maxMultiplier;
-        }
-        else {
-            assert upper != null;
-            assert childFilters == null;
-
-            // We are at table instance.
-            Table tbl = filter().getTable();
-
-            // Only partitioned tables will do distributed joins.
-            if (!(tbl instanceof GridH2Table) || 
!((GridH2Table)tbl).isPartitioned()) {
-                type = Type.REPLICATED;
-                multiplier = MULTIPLIER_COLLOCATED;
-
-                return;
-            }
-
-            // If we are the first partitioned table in a join, then we are 
"base" for all the rest partitioned tables
-            // which will need to get remote result (if there is no affinity 
condition). Since this query is broadcasted
-            // to all the affinity nodes the "base" does not need to get 
remote results.
-            if (!upper.findPartitionedTableBefore(filter)) {
-                type = Type.PARTITIONED_COLLOCATED;
-                multiplier = MULTIPLIER_COLLOCATED;
-            }
-            else {
-                // It is enough to make sure that our previous join by 
affinity key is collocated, then we are
-                // collocated. If we at least have affinity key condition, 
then we do unicast which is cheaper.
-                switch (upper.joinedWithCollocated(filter)) {
-                    case COLLOCATED_JOIN:
-                        type = Type.PARTITIONED_COLLOCATED;
-                        multiplier = MULTIPLIER_COLLOCATED;
-
-                        break;
-
-                    case HAS_AFFINITY_CONDITION:
-                        type = Type.PARTITIONED_NOT_COLLOCATED;
-                        multiplier = MULTIPLIER_UNICAST;
-
-                        break;
-
-                    case NONE:
-                        type = Type.PARTITIONED_NOT_COLLOCATED;
-                        multiplier = MULTIPLIER_BROADCAST;
-
-                        break;
-
-                    default:
-                        throw new IllegalStateException();
-                }
-            }
-
-            if (upper.previousReplicated(filter))
-                multiplier = MULTIPLIER_REPLICATED_NOT_LAST;
-        }
-    }
-
-    /**
-     * @param f Current filter.
-     * @return {@code true} If partitioned table was found.
-     */
-    private boolean findPartitionedTableBefore(int f) {
-        for (int i = 0; i < f; i++) {
-            GridH2CollocationModel child = child(i, true);
-
-            // The c can be null if it is not a GridH2Table and not a 
sub-query,
-            // it is a some kind of function table or anything else that 
considered replicated.
-            if (child != null && child.type(true).isPartitioned())
-                return true;
-        }
-
-        // We have to search globally in upper queries as well.
-        return upper != null && upper.findPartitionedTableBefore(filter);
-    }
-
-    /**
-     * @param f Current filter.
-     * @return {@code true} If previous table is REPLICATED.
-     */
-    @SuppressWarnings("SimplifiableIfStatement")
-    private boolean previousReplicated(int f) {
-        if (f > 0 && child(f - 1, true).type(true) == Type.REPLICATED)
-            return true;
-
-        return upper != null && upper.previousReplicated(filter);
-    }
-
-    /**
-     * @param f Filter.
-     * @return Affinity join type.
-     */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    private Affinity joinedWithCollocated(int f) {
-        TableFilter tf = childFilters[f];
-
-        GridH2Table tbl = (GridH2Table)tf.getTable();
-
-        if (validate) {
-            if (tbl.isCustomAffinityMapper())
-                throw customAffinityError(tbl.cacheName());
-
-            if (F.isEmpty(tf.getIndexConditions())) {
-                throw new CacheException("Failed to prepare distributed join 
query: " +
-                    "join condition does not use index [joinedCache=" + 
tbl.cacheName() +
-                    ", plan=" + tf.getSelect().getPlanSQL() + ']');
-            }
-        }
-
-        IndexColumn affCol = tbl.getAffinityKeyColumn();
-
-        boolean affKeyCondFound = false;
-
-        if (affCol != null) {
-            ArrayList<IndexCondition> idxConditions = tf.getIndexConditions();
-
-            int affColId = affCol.column.getColumnId();
-
-            for (int i = 0; i < idxConditions.size(); i++) {
-                IndexCondition c = idxConditions.get(i);
-                int colId = c.getColumn().getColumnId();
-                int cmpType = c.getCompareType();
-
-                if ((cmpType == Comparison.EQUAL || cmpType == 
Comparison.EQUAL_NULL_SAFE) &&
-                    (colId == affColId || 
tbl.rowDescriptor().isKeyColumn(colId)) && c.isEvaluatable()) {
-                    affKeyCondFound = true;
-
-                    Expression exp = c.getExpression();
-                    exp = exp.getNonAliasExpression();
-
-                    if (exp instanceof ExpressionColumn) {
-                        ExpressionColumn expCol = (ExpressionColumn)exp;
-
-                        // This is one of our previous joins.
-                        TableFilter prevJoin = expCol.getTableFilter();
-
-                        if (prevJoin != null) {
-                            GridH2CollocationModel cm = 
child(indexOf(prevJoin), true);
-
-                            // If the previous joined model is a subquery 
(view), we can not be sure that
-                            // the found affinity column is the needed one, 
since we can select multiple
-                            // different affinity columns from different 
tables.
-                            if (cm != null && !cm.view) {
-                                Type t = cm.type(true);
-
-                                if (t.isPartitioned() && t.isCollocated() && 
isAffinityColumn(prevJoin, expCol, validate))
-                                    return Affinity.COLLOCATED_JOIN;
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        return affKeyCondFound ? Affinity.HAS_AFFINITY_CONDITION : 
Affinity.NONE;
-    }
-
-    /**
-     * @param f Table filter.
-     * @return Index.
-     */
-    private int indexOf(TableFilter f) {
-        for (int i = 0; i < childFilters.length; i++) {
-            if (childFilters[i] == f)
-                return i;
-        }
-
-        throw new IllegalStateException();
-    }
-
-    /**
-     * @param f Table filter.
-     * @param expCol Expression column.
-     * @param validate Query validation flag.
-     * @return {@code true} It it is an affinity column.
-     */
-    private static boolean isAffinityColumn(TableFilter f, ExpressionColumn 
expCol, boolean validate) {
-        Column col = expCol.getColumn();
-
-        if (col == null)
-            return false;
-
-        Table t = col.getTable();
-
-        if (t.isView()) {
-            Query qry;
-
-            if (f.getIndex() != null)
-                qry = getSubQuery(f);
-            else
-                qry = GridSqlQueryParser.VIEW_QUERY.get((TableView)t);
-
-            return isAffinityColumn(qry, expCol, validate);
-        }
-
-        if (t instanceof GridH2Table) {
-            GridH2Table t0 = (GridH2Table)t;
-
-            if (validate && t0.isCustomAffinityMapper())
-                throw customAffinityError((t0).cacheName());
-
-            IndexColumn affCol = t0.getAffinityKeyColumn();
-
-            return affCol != null && col.getColumnId() == 
affCol.column.getColumnId();
-        }
-
-        return false;
-    }
-
-    /**
-     * @param qry Query.
-     * @param expCol Expression column.
-     * @param validate Query validation flag.
-     * @return {@code true} It it is an affinity column.
-     */
-    private static boolean isAffinityColumn(Query qry, ExpressionColumn 
expCol, boolean validate) {
-        if (qry.isUnion()) {
-            SelectUnion union = (SelectUnion)qry;
-
-            return isAffinityColumn(union.getLeft(), expCol, validate) && 
isAffinityColumn(union.getRight(), expCol, validate);
-        }
-
-        Expression exp = 
qry.getExpressions().get(expCol.getColumn().getColumnId()).getNonAliasExpression();
-
-        if (exp instanceof ExpressionColumn) {
-            expCol = (ExpressionColumn)exp;
-
-            return isAffinityColumn(expCol.getTableFilter(), expCol, validate);
-        }
-
-        return false;
-    }
-
-    /**
-     * @return Multiplier.
-     */
-    public int calculateMultiplier() {
-        // We don't need multiplier for union here because it will be 
summarized in H2.
-        return multiplier(false);
-    }
-
-    /**
-     * @param withUnion With respect to union.
-     * @return Multiplier.
-     */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    private int multiplier(boolean withUnion) {
-        calculate();
-
-        assert multiplier != 0;
-
-        if (withUnion && unions != null) {
-            int maxMultiplier = 0;
-
-            for (int i = 0; i < unions.size(); i++) {
-                int m = unions.get(i).multiplier(false);
-
-                if (m > maxMultiplier)
-                    maxMultiplier = m;
-            }
-
-            return maxMultiplier;
-        }
-
-        return multiplier;
-    }
-
-    /**
-     * @param withUnion With respect to union.
-     * @return Type.
-     */
-    private Type type(boolean withUnion) {
-        calculate();
-
-        assert type != null;
-
-        if (withUnion && unions != null) {
-            Type left = unions.get(0).type(false);
-
-            for (int i = 1; i < unions.size(); i++) {
-                Type right = unions.get(i).type(false);
-
-                if (!left.isCollocated() || !right.isCollocated()) {
-                    left = Type.PARTITIONED_NOT_COLLOCATED;
-
-                    break;
-                }
-                else if (!left.isPartitioned() && !right.isPartitioned())
-                    left = Type.REPLICATED;
-                else
-                    left = Type.PARTITIONED_COLLOCATED;
-            }
-
-            return left;
-        }
-
-        return type;
-    }
-
-    /**
-     * @param i Index.
-     * @param create Create child if needed.
-     * @return Child collocation.
-     */
-    private GridH2CollocationModel child(int i, boolean create) {
-        GridH2CollocationModel child = children[i];
-
-        if (child == null && create) {
-            TableFilter f = childFilters[i];
-
-            if (f.getTable().isView()) {
-                if (f.getIndex() == null) {
-                    // If we don't have view index yet, then we just creating 
empty model and it must be filled later.
-                    child = createChildModel(this, i, null, true, validate);
-                }
-                else
-                    child = buildCollocationModel(this, i, getSubQuery(f), 
null, validate);
-            }
-            else
-                child = createChildModel(this, i, null, false, validate);
-
-            assert child != null;
-            assert children[i] == child;
-        }
-
-        return child;
-    }
-
-    /**
-     * @param f Table filter.
-     * @return Sub-query.
-     */
-    private static Query getSubQuery(TableFilter f) {
-        return ((ViewIndex)f.getIndex()).getQuery();
-    }
-
-    /**
-     * @return Unions list.
-     */
-    private List<GridH2CollocationModel> getOrCreateUnions() {
-        if (unions == null) {
-            unions = new ArrayList<>(4);
-
-            unions.add(this);
-        }
-
-        return unions;
-    }
-
-    /**
-     * @param qctx Query context.
-     * @param info Sub-query info.
-     * @param filters Filters.
-     * @param filter Filter.
-     * @param validate Query validation flag.
-     * @return Collocation.
-     */
-    public static GridH2CollocationModel 
buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info,
-        TableFilter[] filters, int filter, boolean validate) {
-        GridH2CollocationModel cm;
-
-        if (info != null) {
-            // Go up until we reach the root query.
-            cm = buildCollocationModel(qctx, info.getUpper(), 
info.getFilters(), info.getFilter(), validate);
-        }
-        else {
-            // We are at the root query.
-            cm = qctx.queryCollocationModel();
-
-            if (cm == null) {
-                cm = createChildModel(null, -1, null, true, validate);
-
-                qctx.queryCollocationModel(cm);
-            }
-        }
-
-        if (filters == null)
-            return cm;
-
-        assert cm.view;
-
-        Select select = filters[0].getSelect();
-
-        // Handle union. We have to rely on fact that select will be the same 
on uppermost select.
-        // For sub-queries we will drop collocation models, so that they will 
be recalculated anyways.
-        if (cm.select != null && cm.select != select) {
-            List<GridH2CollocationModel> unions = cm.getOrCreateUnions();
-
-            // Try to find this select in existing unions.
-            // Start with 1 because at 0 it always will be c.
-            for (int i = 1; i < unions.size(); i++) {
-                GridH2CollocationModel u = unions.get(i);
-
-                if (u.select == select) {
-                    cm = u;
-
-                    break;
-                }
-            }
-
-            // Nothing was found, need to create new child in union.
-            if (cm.select != select)
-                cm = createChildModel(cm.upper, cm.filter, unions, true, 
validate);
-        }
-
-        cm.childFilters(filters);
-
-        return cm.child(filter, true);
-    }
-
-    /**
-     * @param qry Query.
-     * @return {@code true} If the query is collocated.
-     */
-    public static boolean isCollocated(Query qry) {
-        GridH2CollocationModel mdl = buildCollocationModel(null, -1, qry, 
null, true);
-
-        Type type = mdl.type(true);
-
-        if (!type.isCollocated() && mdl.multiplier == 
MULTIPLIER_REPLICATED_NOT_LAST)
-            throw new CacheException("Failed to execute query: for distributed 
join " +
-                "all REPLICATED caches must be at the end of the joined tables 
list.");
-
-        return type.isCollocated();
-    }
-
-    /**
-     * @param upper Upper.
-     * @param filter Filter.
-     * @param qry Query.
-     * @param unions Unions.
-     * @param validate Query validation flag.
-     * @return Built model.
-     */
-    private static GridH2CollocationModel 
buildCollocationModel(GridH2CollocationModel upper,
-        int filter,
-        Query qry,
-        List<GridH2CollocationModel> unions,
-        boolean validate) {
-        if (qry.isUnion()) {
-            if (unions == null)
-                unions = new ArrayList<>();
-
-            SelectUnion union = (SelectUnion)qry;
-
-            GridH2CollocationModel left = buildCollocationModel(upper, filter, 
union.getLeft(), unions, validate);
-            GridH2CollocationModel right = buildCollocationModel(upper, 
filter, union.getRight(), unions, validate);
-
-            assert left != null;
-            assert right != null;
-
-            return upper != null ? upper : left;
-        }
-
-        Select select = (Select)qry;
-
-        List<TableFilter> list = new ArrayList<>();
-
-        for (TableFilter f = select.getTopTableFilter(); f != null; f = 
f.getJoin())
-            list.add(f);
-
-        TableFilter[] filters = list.toArray(new TableFilter[list.size()]);
-
-        GridH2CollocationModel cm = createChildModel(upper, filter, unions, 
true, validate);
-
-        cm.childFilters(filters);
-
-        for (int i = 0; i < filters.length; i++) {
-            TableFilter f = filters[i];
-
-            if (f.getTable().isView())
-                buildCollocationModel(cm, i, getSubQuery(f), null, validate);
-            else if (f.getTable() instanceof GridH2Table)
-                createChildModel(cm, i, null, false, validate);
-        }
-
-        return upper != null ? upper : cm;
-    }
-
-    /**
-     * @param cacheName Cache name.
-     * @return Error.
-     */
-    private static CacheException customAffinityError(String cacheName) {
-        return new CacheException("Failed to prepare distributed join query: 
can not use distributed joins for cache " +
-            "with custom AffinityKeyMapper configured. " +
-            "Please use AffinityKeyMapped annotation instead [cache=" + 
cacheName + ']');
-    }
-
-    /**
-     * Collocation type.
-     */
-    private enum Type {
-        /** */
-        PARTITIONED_COLLOCATED(true, true),
-
-        /** */
-        PARTITIONED_NOT_COLLOCATED(true, false),
-
-        /** */
-        REPLICATED(false, true);
-
-        /** */
-        private final boolean partitioned;
-
-        /** */
-        private final boolean collocated;
-
-        /**
-         * @param partitioned Partitioned.
-         * @param collocated Collocated.
-         */
-        Type(boolean partitioned, boolean collocated) {
-            this.partitioned = partitioned;
-            this.collocated = collocated;
-        }
-
-        /**
-         * @return {@code true} If partitioned.
-         */
-        public boolean isPartitioned() {
-            return partitioned;
-        }
-
-        /**
-         * @return {@code true} If collocated.
-         */
-        public boolean isCollocated() {
-            return collocated;
-        }
-
-        /**
-         * @param partitioned Partitioned.
-         * @param collocated Collocated.
-         * @return Type.
-         */
-        static Type of(boolean partitioned, boolean collocated) {
-            if (collocated)
-                return partitioned ? Type.PARTITIONED_COLLOCATED : 
Type.REPLICATED;
-
-            assert partitioned;
-
-            return Type.PARTITIONED_NOT_COLLOCATED;
-        }
-    }
-
-    /**
-     * Affinity of a table relative to previous joined tables.
-     */
-    private enum Affinity {
-        /** */
-        NONE,
-
-        /** */
-        HAS_AFFINITY_CONDITION,
-
-        /** */
-        COLLOCATED_JOIN
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 3d7d388..6d8f2ed 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.processors.query.h2.H2Cursor;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import 
org.apache.ignite.internal.processors.query.h2.opt.join.CursorIteratorWrapper;
 import 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch;
+import 
org.apache.ignite.internal.processors.query.h2.opt.join.GridH2CollocationModel;
 import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource;
 import org.apache.ignite.internal.processors.query.h2.opt.join.RangeStream;
 import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey;
@@ -71,8 +72,8 @@ import java.util.Map;
 import java.util.UUID;
 
 import static java.util.Collections.singletonList;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.GridH2CollocationModel.buildCollocationModel;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index ef2fd25..addd137 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -27,6 +27,8 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode;
+import 
org.apache.ignite.internal.processors.query.h2.opt.join.GridH2CollocationModel;
 import org.apache.ignite.internal.processors.query.h2.opt.join.SourceKey;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
 import org.apache.ignite.internal.util.typedef.F;
@@ -34,7 +36,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 
 /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java
new file mode 100644
index 0000000..7c958da
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+/**
+ * Defines set of distributed join modes.
+ */
+public enum DistributedJoinMode {
+    /**
+     * Distributed joins is disabled. Local joins will be performed instead.
+     */
+    OFF,
+
+    /**
+     * Distributed joins is enabled within local node only.
+     *
+     * NOTE: This mode is used with segmented indices for local sql queries.
+     * As in this case we need to make distributed join across local index 
segments
+     * and prevent range-queries to other nodes.
+     */
+    LOCAL_ONLY,
+
+    /**
+     * Distributed joins is enabled.
+     */
+    ON;
+
+    /**
+     * @param isLocal Query local flag.
+     * @param distributedJoins Query distributed joins flag.
+     * @return DistributedJoinMode for the query.
+     */
+    public static DistributedJoinMode distributedJoinMode(boolean isLocal, 
boolean distributedJoins) {
+        return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
index 60106a8..02d2e44 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
@@ -43,7 +43,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Future;
 
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.LOCAL_ONLY;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS;
 import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/GridH2CollocationModel.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/GridH2CollocationModel.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/GridH2CollocationModel.java
new file mode 100644
index 0000000..430bc71
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/GridH2CollocationModel.java
@@ -0,0 +1,841 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import javax.cache.CacheException;
+
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.h2.command.dml.Query;
+import org.h2.command.dml.Select;
+import org.h2.command.dml.SelectUnion;
+import org.h2.expression.Comparison;
+import org.h2.expression.Expression;
+import org.h2.expression.ExpressionColumn;
+import org.h2.index.IndexCondition;
+import org.h2.index.ViewIndex;
+import org.h2.table.Column;
+import org.h2.table.IndexColumn;
+import org.h2.table.SubQueryInfo;
+import org.h2.table.Table;
+import org.h2.table.TableFilter;
+import org.h2.table.TableView;
+
+/**
+ * Collocation model for a query.
+ */
+public final class GridH2CollocationModel {
+    /** */
+    public static final int MULTIPLIER_COLLOCATED = 1;
+
+    /** */
+    private static final int MULTIPLIER_UNICAST = 50;
+
+    /** */
+    private static final int MULTIPLIER_BROADCAST = 200;
+
+    /** */
+    private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000;
+
+    /** */
+    private final GridH2CollocationModel upper;
+
+    /** */
+    private final int filter;
+
+    /** */
+    private final boolean view;
+
+    /** */
+    private int multiplier;
+
+    /** */
+    private Type type;
+
+    /** */
+    private GridH2CollocationModel[] children;
+
+    /** */
+    private TableFilter[] childFilters;
+
+    /** */
+    private List<GridH2CollocationModel> unions;
+
+    /** */
+    private Select select;
+
+    /** */
+    private final boolean validate;
+
+    /**
+     * @param upper Upper.
+     * @param filter Filter.
+     * @param view This model will be a subquery (or top level query) and must 
contain child filters.
+     * @param validate Query validation flag.
+     */
+    private GridH2CollocationModel(GridH2CollocationModel upper, int filter, 
boolean view, boolean validate) {
+        this.upper = upper;
+        this.filter = filter;
+        this.view = view;
+        this.validate = validate;
+    }
+
+    /**
+     * @return Table filter for this collocation model.
+     */
+    private TableFilter filter() {
+        return upper == null ? null : upper.childFilters[filter];
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        calculate();
+
+        SB b = new SB();
+
+        for (int lvl = 0; lvl < 20; lvl++) {
+            if (!toString(b, lvl))
+                break;
+
+            b.a('\n');
+        }
+
+        return b.toString();
+    }
+
+    /**
+     * @param b String builder.
+     * @param lvl Depth level.
+     */
+    private boolean toString(SB b, int lvl) {
+        boolean res = false;
+
+        if (lvl == 0) {
+            TableFilter f = filter();
+            String tblAlias = f == null ? "^" : f.getTableAlias();
+
+            b.a("[tbl=").a(tblAlias).a(", type=").a(type).a(", 
mul=").a(multiplier).a("]");
+
+            res = true;
+        }
+        else if (childFilters != null) {
+            assert lvl > 0;
+
+            lvl--;
+
+            for (int i = 0; i < childFilters.length; i++) {
+                if (lvl == 0)
+                    b.a(" | ");
+
+                res |= child(i, true).toString(b, lvl);
+            }
+
+            if (lvl == 0)
+                b.a(" | ");
+        }
+
+        return res;
+    }
+
+    /**
+     * @param upper Upper.
+     * @param filter Filter.
+     * @param unions Unions.
+     * @param view This model will be a subquery (or top level query) and must 
contain child filters.
+     * @param validate Query validation flag.
+     * @return Created child collocation model.
+     */
+    private static GridH2CollocationModel 
createChildModel(GridH2CollocationModel upper,
+        int filter,
+        List<GridH2CollocationModel> unions,
+        boolean view,
+        boolean validate) {
+        GridH2CollocationModel child = new GridH2CollocationModel(upper, 
filter, view, validate);
+
+        if (unions != null) {
+            // Bind created child to unions.
+            assert upper == null || upper.child(filter, false) != null || 
unions.isEmpty();
+
+            if (upper != null && unions.isEmpty()) {
+                assert upper.child(filter, false) == null;
+
+                upper.children[filter] = child;
+            }
+
+            unions.add(child);
+
+            child.unions = unions;
+        }
+        else if (upper != null) {
+            // Bind created child to upper model.
+            assert upper.child(filter, false) == null;
+
+            upper.children[filter] = child;
+        }
+
+        return child;
+    }
+
+    /**
+     * @param childFilters New child filters.
+     * @return {@code true} If child filters were updated.
+     */
+    private boolean childFilters(TableFilter[] childFilters) {
+        assert childFilters != null;
+        assert view;
+
+        Select select = childFilters[0].getSelect();
+
+        assert this.select == null || this.select == select;
+
+        if (this.select == null) {
+            this.select = select;
+
+            assert this.childFilters == null;
+        }
+        else if (Arrays.equals(this.childFilters, childFilters))
+            return false;
+
+        if (this.childFilters == null) {
+            // We have to clone because H2 reuses array and reorders elements.
+            this.childFilters = childFilters.clone();
+
+            children = new GridH2CollocationModel[childFilters.length];
+        }
+        else {
+            assert this.childFilters.length == childFilters.length;
+
+            // We have to copy because H2 reuses array and reorders elements.
+            System.arraycopy(childFilters, 0, this.childFilters, 0, 
childFilters.length);
+
+            Arrays.fill(children, null);
+        }
+
+        // Reset results.
+        type = null;
+        multiplier = 0;
+
+        return true;
+    }
+
+    /**
+     * Do the needed calculations.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private void calculate() {
+        if (type != null)
+            return;
+
+        if (view) { // We are at (sub-)query model.
+            assert childFilters != null;
+
+            boolean collocated = true;
+            boolean partitioned = false;
+            int maxMultiplier = MULTIPLIER_COLLOCATED;
+
+            for (int i = 0; i < childFilters.length; i++) {
+                GridH2CollocationModel child = child(i, true);
+
+                Type t = child.type(true);
+
+                if (child.multiplier == MULTIPLIER_REPLICATED_NOT_LAST)
+                    maxMultiplier = child.multiplier;
+
+                if (t.isPartitioned()) {
+                    partitioned = true;
+
+                    if (!t.isCollocated()) {
+                        collocated = false;
+
+                        int m = child.multiplier(true);
+
+                        if (m > maxMultiplier) {
+                            maxMultiplier = m;
+
+                            if (maxMultiplier == 
MULTIPLIER_REPLICATED_NOT_LAST)
+                                break;
+                        }
+                    }
+                }
+            }
+
+            type = Type.of(partitioned, collocated);
+            multiplier = maxMultiplier;
+        }
+        else {
+            assert upper != null;
+            assert childFilters == null;
+
+            // We are at table instance.
+            Table tbl = filter().getTable();
+
+            // Only partitioned tables will do distributed joins.
+            if (!(tbl instanceof GridH2Table) || 
!((GridH2Table)tbl).isPartitioned()) {
+                type = Type.REPLICATED;
+                multiplier = MULTIPLIER_COLLOCATED;
+
+                return;
+            }
+
+            // If we are the first partitioned table in a join, then we are 
"base" for all the rest partitioned tables
+            // which will need to get remote result (if there is no affinity 
condition). Since this query is broadcasted
+            // to all the affinity nodes the "base" does not need to get 
remote results.
+            if (!upper.findPartitionedTableBefore(filter)) {
+                type = Type.PARTITIONED_COLLOCATED;
+                multiplier = MULTIPLIER_COLLOCATED;
+            }
+            else {
+                // It is enough to make sure that our previous join by 
affinity key is collocated, then we are
+                // collocated. If we at least have affinity key condition, 
then we do unicast which is cheaper.
+                switch (upper.joinedWithCollocated(filter)) {
+                    case COLLOCATED_JOIN:
+                        type = Type.PARTITIONED_COLLOCATED;
+                        multiplier = MULTIPLIER_COLLOCATED;
+
+                        break;
+
+                    case HAS_AFFINITY_CONDITION:
+                        type = Type.PARTITIONED_NOT_COLLOCATED;
+                        multiplier = MULTIPLIER_UNICAST;
+
+                        break;
+
+                    case NONE:
+                        type = Type.PARTITIONED_NOT_COLLOCATED;
+                        multiplier = MULTIPLIER_BROADCAST;
+
+                        break;
+
+                    default:
+                        throw new IllegalStateException();
+                }
+            }
+
+            if (upper.previousReplicated(filter))
+                multiplier = MULTIPLIER_REPLICATED_NOT_LAST;
+        }
+    }
+
+    /**
+     * @param f Current filter.
+     * @return {@code true} If partitioned table was found.
+     */
+    private boolean findPartitionedTableBefore(int f) {
+        for (int i = 0; i < f; i++) {
+            GridH2CollocationModel child = child(i, true);
+
+            // The c can be null if it is not a GridH2Table and not a 
sub-query,
+            // it is a some kind of function table or anything else that 
considered replicated.
+            if (child != null && child.type(true).isPartitioned())
+                return true;
+        }
+
+        // We have to search globally in upper queries as well.
+        return upper != null && upper.findPartitionedTableBefore(filter);
+    }
+
+    /**
+     * @param f Current filter.
+     * @return {@code true} If previous table is REPLICATED.
+     */
+    @SuppressWarnings("SimplifiableIfStatement")
+    private boolean previousReplicated(int f) {
+        if (f > 0 && child(f - 1, true).type(true) == Type.REPLICATED)
+            return true;
+
+        return upper != null && upper.previousReplicated(filter);
+    }
+
+    /**
+     * @param f Filter.
+     * @return Affinity join type.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private Affinity joinedWithCollocated(int f) {
+        TableFilter tf = childFilters[f];
+
+        GridH2Table tbl = (GridH2Table)tf.getTable();
+
+        if (validate) {
+            if (tbl.isCustomAffinityMapper())
+                throw customAffinityError(tbl.cacheName());
+
+            if (F.isEmpty(tf.getIndexConditions())) {
+                throw new CacheException("Failed to prepare distributed join 
query: " +
+                    "join condition does not use index [joinedCache=" + 
tbl.cacheName() +
+                    ", plan=" + tf.getSelect().getPlanSQL() + ']');
+            }
+        }
+
+        IndexColumn affCol = tbl.getAffinityKeyColumn();
+
+        boolean affKeyCondFound = false;
+
+        if (affCol != null) {
+            ArrayList<IndexCondition> idxConditions = tf.getIndexConditions();
+
+            int affColId = affCol.column.getColumnId();
+
+            for (int i = 0; i < idxConditions.size(); i++) {
+                IndexCondition c = idxConditions.get(i);
+                int colId = c.getColumn().getColumnId();
+                int cmpType = c.getCompareType();
+
+                if ((cmpType == Comparison.EQUAL || cmpType == 
Comparison.EQUAL_NULL_SAFE) &&
+                    (colId == affColId || 
tbl.rowDescriptor().isKeyColumn(colId)) && c.isEvaluatable()) {
+                    affKeyCondFound = true;
+
+                    Expression exp = c.getExpression();
+                    exp = exp.getNonAliasExpression();
+
+                    if (exp instanceof ExpressionColumn) {
+                        ExpressionColumn expCol = (ExpressionColumn)exp;
+
+                        // This is one of our previous joins.
+                        TableFilter prevJoin = expCol.getTableFilter();
+
+                        if (prevJoin != null) {
+                            GridH2CollocationModel cm = 
child(indexOf(prevJoin), true);
+
+                            // If the previous joined model is a subquery 
(view), we can not be sure that
+                            // the found affinity column is the needed one, 
since we can select multiple
+                            // different affinity columns from different 
tables.
+                            if (cm != null && !cm.view) {
+                                Type t = cm.type(true);
+
+                                if (t.isPartitioned() && t.isCollocated() && 
isAffinityColumn(prevJoin, expCol, validate))
+                                    return Affinity.COLLOCATED_JOIN;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return affKeyCondFound ? Affinity.HAS_AFFINITY_CONDITION : 
Affinity.NONE;
+    }
+
+    /**
+     * @param f Table filter.
+     * @return Index.
+     */
+    private int indexOf(TableFilter f) {
+        for (int i = 0; i < childFilters.length; i++) {
+            if (childFilters[i] == f)
+                return i;
+        }
+
+        throw new IllegalStateException();
+    }
+
+    /**
+     * @param f Table filter.
+     * @param expCol Expression column.
+     * @param validate Query validation flag.
+     * @return {@code true} It it is an affinity column.
+     */
+    private static boolean isAffinityColumn(TableFilter f, ExpressionColumn 
expCol, boolean validate) {
+        Column col = expCol.getColumn();
+
+        if (col == null)
+            return false;
+
+        Table t = col.getTable();
+
+        if (t.isView()) {
+            Query qry;
+
+            if (f.getIndex() != null)
+                qry = getSubQuery(f);
+            else
+                qry = GridSqlQueryParser.VIEW_QUERY.get((TableView)t);
+
+            return isAffinityColumn(qry, expCol, validate);
+        }
+
+        if (t instanceof GridH2Table) {
+            GridH2Table t0 = (GridH2Table)t;
+
+            if (validate && t0.isCustomAffinityMapper())
+                throw customAffinityError((t0).cacheName());
+
+            IndexColumn affCol = t0.getAffinityKeyColumn();
+
+            return affCol != null && col.getColumnId() == 
affCol.column.getColumnId();
+        }
+
+        return false;
+    }
+
+    /**
+     * @param qry Query.
+     * @param expCol Expression column.
+     * @param validate Query validation flag.
+     * @return {@code true} It it is an affinity column.
+     */
+    private static boolean isAffinityColumn(Query qry, ExpressionColumn 
expCol, boolean validate) {
+        if (qry.isUnion()) {
+            SelectUnion union = (SelectUnion)qry;
+
+            return isAffinityColumn(union.getLeft(), expCol, validate) && 
isAffinityColumn(union.getRight(), expCol, validate);
+        }
+
+        Expression exp = 
qry.getExpressions().get(expCol.getColumn().getColumnId()).getNonAliasExpression();
+
+        if (exp instanceof ExpressionColumn) {
+            expCol = (ExpressionColumn)exp;
+
+            return isAffinityColumn(expCol.getTableFilter(), expCol, validate);
+        }
+
+        return false;
+    }
+
+    /**
+     * @return Multiplier.
+     */
+    public int calculateMultiplier() {
+        // We don't need multiplier for union here because it will be 
summarized in H2.
+        return multiplier(false);
+    }
+
+    /**
+     * @param withUnion With respect to union.
+     * @return Multiplier.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private int multiplier(boolean withUnion) {
+        calculate();
+
+        assert multiplier != 0;
+
+        if (withUnion && unions != null) {
+            int maxMultiplier = 0;
+
+            for (int i = 0; i < unions.size(); i++) {
+                int m = unions.get(i).multiplier(false);
+
+                if (m > maxMultiplier)
+                    maxMultiplier = m;
+            }
+
+            return maxMultiplier;
+        }
+
+        return multiplier;
+    }
+
+    /**
+     * @param withUnion With respect to union.
+     * @return Type.
+     */
+    private Type type(boolean withUnion) {
+        calculate();
+
+        assert type != null;
+
+        if (withUnion && unions != null) {
+            Type left = unions.get(0).type(false);
+
+            for (int i = 1; i < unions.size(); i++) {
+                Type right = unions.get(i).type(false);
+
+                if (!left.isCollocated() || !right.isCollocated()) {
+                    left = Type.PARTITIONED_NOT_COLLOCATED;
+
+                    break;
+                }
+                else if (!left.isPartitioned() && !right.isPartitioned())
+                    left = Type.REPLICATED;
+                else
+                    left = Type.PARTITIONED_COLLOCATED;
+            }
+
+            return left;
+        }
+
+        return type;
+    }
+
+    /**
+     * @param i Index.
+     * @param create Create child if needed.
+     * @return Child collocation.
+     */
+    private GridH2CollocationModel child(int i, boolean create) {
+        GridH2CollocationModel child = children[i];
+
+        if (child == null && create) {
+            TableFilter f = childFilters[i];
+
+            if (f.getTable().isView()) {
+                if (f.getIndex() == null) {
+                    // If we don't have view index yet, then we just creating 
empty model and it must be filled later.
+                    child = createChildModel(this, i, null, true, validate);
+                }
+                else
+                    child = buildCollocationModel(this, i, getSubQuery(f), 
null, validate);
+            }
+            else
+                child = createChildModel(this, i, null, false, validate);
+
+            assert child != null;
+            assert children[i] == child;
+        }
+
+        return child;
+    }
+
+    /**
+     * @param f Table filter.
+     * @return Sub-query.
+     */
+    private static Query getSubQuery(TableFilter f) {
+        return ((ViewIndex)f.getIndex()).getQuery();
+    }
+
+    /**
+     * @return Unions list.
+     */
+    private List<GridH2CollocationModel> getOrCreateUnions() {
+        if (unions == null) {
+            unions = new ArrayList<>(4);
+
+            unions.add(this);
+        }
+
+        return unions;
+    }
+
+    /**
+     * @param qctx Query context.
+     * @param info Sub-query info.
+     * @param filters Filters.
+     * @param filter Filter.
+     * @param validate Query validation flag.
+     * @return Collocation.
+     */
+    public static GridH2CollocationModel 
buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info,
+        TableFilter[] filters, int filter, boolean validate) {
+        GridH2CollocationModel cm;
+
+        if (info != null) {
+            // Go up until we reach the root query.
+            cm = buildCollocationModel(qctx, info.getUpper(), 
info.getFilters(), info.getFilter(), validate);
+        }
+        else {
+            // We are at the root query.
+            cm = qctx.queryCollocationModel();
+
+            if (cm == null) {
+                cm = createChildModel(null, -1, null, true, validate);
+
+                qctx.queryCollocationModel(cm);
+            }
+        }
+
+        if (filters == null)
+            return cm;
+
+        assert cm.view;
+
+        Select select = filters[0].getSelect();
+
+        // Handle union. We have to rely on fact that select will be the same 
on uppermost select.
+        // For sub-queries we will drop collocation models, so that they will 
be recalculated anyways.
+        if (cm.select != null && cm.select != select) {
+            List<GridH2CollocationModel> unions = cm.getOrCreateUnions();
+
+            // Try to find this select in existing unions.
+            // Start with 1 because at 0 it always will be c.
+            for (int i = 1; i < unions.size(); i++) {
+                GridH2CollocationModel u = unions.get(i);
+
+                if (u.select == select) {
+                    cm = u;
+
+                    break;
+                }
+            }
+
+            // Nothing was found, need to create new child in union.
+            if (cm.select != select)
+                cm = createChildModel(cm.upper, cm.filter, unions, true, 
validate);
+        }
+
+        cm.childFilters(filters);
+
+        return cm.child(filter, true);
+    }
+
+    /**
+     * @param qry Query.
+     * @return {@code true} If the query is collocated.
+     */
+    public static boolean isCollocated(Query qry) {
+        GridH2CollocationModel mdl = buildCollocationModel(null, -1, qry, 
null, true);
+
+        Type type = mdl.type(true);
+
+        if (!type.isCollocated() && mdl.multiplier == 
MULTIPLIER_REPLICATED_NOT_LAST)
+            throw new CacheException("Failed to execute query: for distributed 
join " +
+                "all REPLICATED caches must be at the end of the joined tables 
list.");
+
+        return type.isCollocated();
+    }
+
+    /**
+     * @param upper Upper.
+     * @param filter Filter.
+     * @param qry Query.
+     * @param unions Unions.
+     * @param validate Query validation flag.
+     * @return Built model.
+     */
+    private static GridH2CollocationModel 
buildCollocationModel(GridH2CollocationModel upper,
+        int filter,
+        Query qry,
+        List<GridH2CollocationModel> unions,
+        boolean validate) {
+        if (qry.isUnion()) {
+            if (unions == null)
+                unions = new ArrayList<>();
+
+            SelectUnion union = (SelectUnion)qry;
+
+            GridH2CollocationModel left = buildCollocationModel(upper, filter, 
union.getLeft(), unions, validate);
+            GridH2CollocationModel right = buildCollocationModel(upper, 
filter, union.getRight(), unions, validate);
+
+            assert left != null;
+            assert right != null;
+
+            return upper != null ? upper : left;
+        }
+
+        Select select = (Select)qry;
+
+        List<TableFilter> list = new ArrayList<>();
+
+        for (TableFilter f = select.getTopTableFilter(); f != null; f = 
f.getJoin())
+            list.add(f);
+
+        TableFilter[] filters = list.toArray(new TableFilter[list.size()]);
+
+        GridH2CollocationModel cm = createChildModel(upper, filter, unions, 
true, validate);
+
+        cm.childFilters(filters);
+
+        for (int i = 0; i < filters.length; i++) {
+            TableFilter f = filters[i];
+
+            if (f.getTable().isView())
+                buildCollocationModel(cm, i, getSubQuery(f), null, validate);
+            else if (f.getTable() instanceof GridH2Table)
+                createChildModel(cm, i, null, false, validate);
+        }
+
+        return upper != null ? upper : cm;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Error.
+     */
+    private static CacheException customAffinityError(String cacheName) {
+        return new CacheException("Failed to prepare distributed join query: 
can not use distributed joins for cache " +
+            "with custom AffinityKeyMapper configured. " +
+            "Please use AffinityKeyMapped annotation instead [cache=" + 
cacheName + ']');
+    }
+
+    /**
+     * Collocation type.
+     */
+    private enum Type {
+        /** */
+        PARTITIONED_COLLOCATED(true, true),
+
+        /** */
+        PARTITIONED_NOT_COLLOCATED(true, false),
+
+        /** */
+        REPLICATED(false, true);
+
+        /** */
+        private final boolean partitioned;
+
+        /** */
+        private final boolean collocated;
+
+        /**
+         * @param partitioned Partitioned.
+         * @param collocated Collocated.
+         */
+        Type(boolean partitioned, boolean collocated) {
+            this.partitioned = partitioned;
+            this.collocated = collocated;
+        }
+
+        /**
+         * @return {@code true} If partitioned.
+         */
+        public boolean isPartitioned() {
+            return partitioned;
+        }
+
+        /**
+         * @return {@code true} If collocated.
+         */
+        public boolean isCollocated() {
+            return collocated;
+        }
+
+        /**
+         * @param partitioned Partitioned.
+         * @param collocated Collocated.
+         * @return Type.
+         */
+        static Type of(boolean partitioned, boolean collocated) {
+            if (collocated)
+                return partitioned ? Type.PARTITIONED_COLLOCATED : 
Type.REPLICATED;
+
+            assert partitioned;
+
+            return Type.PARTITIONED_NOT_COLLOCATED;
+        }
+    }
+
+    /**
+     * Affinity of a table relative to previous joined tables.
+     */
+    private enum Affinity {
+        /** */
+        NONE,
+
+        /** */
+        HAS_AFFINITY_CONDITION,
+
+        /** */
+        COLLOCATED_JOIN
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 165ec0c..0e7f245 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -53,7 +53,7 @@ import org.h2.command.dml.SelectUnion;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.isCollocated;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.GridH2CollocationModel.isCollocated;
 import static 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst.TRUE;
 import static 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.AVG;
 import static 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.CAST;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index b69ee78..2e611de 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -75,7 +75,7 @@ import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture;
 import org.apache.ignite.internal.processors.query.h2.UpdateResult;
-import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
+import 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
@@ -111,8 +111,8 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.QUE
 import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
 import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0860542a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 20ee1b4..299ab5a 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -109,7 +109,7 @@ import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkAc
 import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
 import static 
org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
 import static 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier;
 

Reply via email to