http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index f12c0f3..babced3 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -27,13 +27,16 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode;
+import 
org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel;
+import org.apache.ignite.internal.processors.query.h2.opt.join.SourceKey;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 
 /**
@@ -44,10 +47,10 @@ public class GridH2QueryContext {
     private static final ThreadLocal<GridH2QueryContext> qctx = new 
ThreadLocal<>();
 
     /** */
-    private static final ConcurrentMap<Key, GridH2QueryContext> qctxs = new 
ConcurrentHashMap<>();
+    private static final ConcurrentMap<QueryContextKey, GridH2QueryContext> 
qctxs = new ConcurrentHashMap<>();
 
     /** */
-    private final Key key;
+    private final QueryContextKey key;
 
     /** */
     private volatile boolean cleared;
@@ -83,7 +86,7 @@ public class GridH2QueryContext {
     private int pageSize;
 
     /** */
-    private GridH2CollocationModel qryCollocationMdl;
+    private CollocationModel qryCollocationMdl;
 
     /** */
     private MvccSnapshot mvccSnapshot;
@@ -100,7 +103,7 @@ public class GridH2QueryContext {
     public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, 
GridH2QueryType type) {
         assert type != MAP;
 
-        key = new Key(locNodeId, nodeId, qryId, 0, type);
+        key = new QueryContextKey(locNodeId, nodeId, qryId, 0, type);
     }
 
     /**
@@ -117,7 +120,7 @@ public class GridH2QueryContext {
         GridH2QueryType type) {
         assert segmentId == 0 || type == MAP;
 
-        key = new Key(locNodeId, nodeId, qryId, segmentId, type);
+        key = new QueryContextKey(locNodeId, nodeId, qryId, segmentId, type);
     }
 
     /**
@@ -141,34 +144,34 @@ public class GridH2QueryContext {
      * @return Type.
      */
     public GridH2QueryType type() {
-        return key.type;
+        return key.type();
     }
 
     /**
      * @return Origin node ID.
      */
     public UUID originNodeId() {
-        return key.nodeId;
+        return key.nodeId();
     }
 
     /**
      * @return Query request ID.
      */
     public long queryId() {
-        return key.qryId;
+        return key.queryId();
     }
 
     /**
      * @return Query collocation model.
      */
-    public GridH2CollocationModel queryCollocationModel() {
+    public CollocationModel queryCollocationModel() {
         return qryCollocationMdl;
     }
 
     /**
      * @param qryCollocationMdl Query collocation model.
      */
-    public void queryCollocationModel(GridH2CollocationModel 
qryCollocationMdl) {
+    public void queryCollocationModel(CollocationModel qryCollocationMdl) {
         this.qryCollocationMdl = qryCollocationMdl;
     }
 
@@ -268,7 +271,7 @@ public class GridH2QueryContext {
 
     /** @return index segment ID. */
     public int segment() {
-        return key.segmentId;
+        return key.segmentId();
     }
 
     /**
@@ -351,7 +354,7 @@ public class GridH2QueryContext {
          assert qctx.get() == null;
 
          // We need MAP query context to be available to other threads to run 
distributed joins.
-         if (x.key.type == MAP && x.distributedJoinMode() != OFF && 
qctxs.putIfAbsent(x.key, x) != null)
+         if (x.key.type() == MAP && x.distributedJoinMode() != OFF && 
qctxs.putIfAbsent(x.key, x) != null)
              throw new IllegalStateException("Query context is already set.");
 
          qctx.set(x);
@@ -378,8 +381,12 @@ public class GridH2QueryContext {
     public static boolean clear(UUID locNodeId, UUID nodeId, long qryId, 
GridH2QueryType type) {
         boolean res = false;
 
-        for (Key key : qctxs.keySet()) {
-            if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId) 
&& key.qryId == qryId && key.type == type)
+        for (QueryContextKey key : qctxs.keySet()) {
+            if (key.localNodeId().equals(locNodeId) &&
+                key.nodeId().equals(nodeId) &&
+                key.queryId() == qryId &&
+                key.type() == type
+            )
                 res |= doClear(key, false);
         }
 
@@ -391,8 +398,8 @@ public class GridH2QueryContext {
      * @param nodeStop Node is stopping.
      * @return {@code True} if context was found.
      */
-    private static boolean doClear(Key key, boolean nodeStop) {
-        assert key.type == MAP : key.type;
+    private static boolean doClear(QueryContextKey key, boolean nodeStop) {
+        assert key.type() == MAP : key.type();
 
         GridH2QueryContext x = qctxs.remove(key);
 
@@ -436,8 +443,8 @@ public class GridH2QueryContext {
      * @param nodeId Dead node ID.
      */
     public static void clearAfterDeadNode(UUID locNodeId, UUID nodeId) {
-        for (Key key : qctxs.keySet()) {
-            if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId))
+        for (QueryContextKey key : qctxs.keySet()) {
+            if (key.localNodeId().equals(locNodeId) && 
key.nodeId().equals(nodeId))
                 doClear(key, false);
         }
     }
@@ -446,8 +453,8 @@ public class GridH2QueryContext {
      * @param locNodeId Local node ID.
      */
     public static void clearLocalNodeStop(UUID locNodeId) {
-        for (Key key : qctxs.keySet()) {
-            if (key.locNodeId.equals(locNodeId))
+        for (QueryContextKey key : qctxs.keySet()) {
+            if (key.localNodeId().equals(locNodeId))
                 doClear(key, true);
         }
     }
@@ -478,7 +485,7 @@ public class GridH2QueryContext {
         int segmentId,
         GridH2QueryType type
     ) {
-        return qctxs.get(new Key(locNodeId, nodeId, qryId, segmentId, type));
+        return qctxs.get(new QueryContextKey(locNodeId, nodeId, qryId, 
segmentId, type));
     }
 
     /**
@@ -537,116 +544,4 @@ public class GridH2QueryContext {
         return S.toString(GridH2QueryContext.class, this);
     }
 
-    /**
-     * Unique key for the query context.
-     */
-    private static class Key {
-        /** */
-        private final UUID locNodeId;
-
-        /** */
-        private final UUID nodeId;
-
-        /** */
-        private final long qryId;
-
-        /** */
-        private final int segmentId;
-
-        /** */
-        private final GridH2QueryType type;
-
-        /**
-         * @param locNodeId Local node ID.
-         * @param nodeId The node who initiated the query.
-         * @param qryId The query ID.
-         * @param segmentId Index segment ID.
-         * @param type Query type.
-         */
-        private Key(UUID locNodeId, UUID nodeId, long qryId, int segmentId, 
GridH2QueryType type) {
-            assert locNodeId != null;
-            assert nodeId != null;
-            assert type != null;
-
-            this.locNodeId = locNodeId;
-            this.nodeId = nodeId;
-            this.qryId = qryId;
-            this.segmentId = segmentId;
-            this.type = type;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            Key key = (Key)o;
-
-            return qryId == key.qryId && nodeId.equals(key.nodeId) && type == 
key.type &&
-               locNodeId.equals(key.locNodeId) ;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = locNodeId.hashCode();
-
-            res = 31 * res + nodeId.hashCode();
-            res = 31 * res + (int)(qryId ^ (qryId >>> 32));
-            res = 31 * res + type.hashCode();
-            res = 31 * res + segmentId;
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Key.class, this);
-        }
-    }
-
-    /**
-     * Key for source.
-     */
-    private static final class SourceKey {
-        /** */
-        UUID ownerId;
-
-        /** */
-        int segmentId;
-
-        /** */
-        int batchLookupId;
-
-        /**
-         * @param ownerId Owner node ID.
-         * @param segmentId Index segment ID.
-         * @param batchLookupId Batch lookup ID.
-         */
-        SourceKey(UUID ownerId, int segmentId, int batchLookupId) {
-            this.ownerId = ownerId;
-            this.segmentId = segmentId;
-            this.batchLookupId = batchLookupId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (o == null || !(o instanceof SourceKey))
-                return false;
-
-            SourceKey srcKey = (SourceKey)o;
-
-            return batchLookupId == srcKey.batchLookupId && segmentId == 
srcKey.segmentId &&
-                ownerId.equals(srcKey.ownerId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int hash = ownerId.hashCode();
-            hash = 31 * hash + segmentId;
-            return 31 * hash + batchLookupId;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextKey.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextKey.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextKey.java
new file mode 100644
index 0000000..ad0ff20
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextKey.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.UUID;
+
+/**
+ * Unique key for the query context.
+ */
+public class QueryContextKey {
+    /** */
+    private final UUID locNodeId;
+
+    /** */
+    private final UUID nodeId;
+
+    /** */
+    private final long qryId;
+
+    /** */
+    private final int segmentId;
+
+    /** */
+    private final GridH2QueryType type;
+
+    /**
+     * @param locNodeId Local node ID.
+     * @param nodeId The node who initiated the query.
+     * @param qryId The query ID.
+     * @param segmentId Index segment ID.
+     * @param type Query type.
+     */
+    QueryContextKey(UUID locNodeId, UUID nodeId, long qryId, int segmentId, 
GridH2QueryType type) {
+        assert locNodeId != null;
+        assert nodeId != null;
+        assert type != null;
+
+        this.locNodeId = locNodeId;
+        this.nodeId = nodeId;
+        this.qryId = qryId;
+        this.segmentId = segmentId;
+        this.type = type;
+    }
+
+    /**
+     * @return Local node ID.
+     */
+    public UUID localNodeId() {
+        return locNodeId;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public long queryId() {
+        return qryId;
+    }
+
+    /**
+     * @return Segment ID.
+     */
+    public int segmentId() {
+        return segmentId;
+    }
+
+    /**
+     * @return Type.
+     */
+    public GridH2QueryType type() {
+        return type;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        QueryContextKey key = (QueryContextKey)o;
+
+        return qryId == key.qryId && nodeId.equals(key.nodeId) && type == 
key.type &&
+           locNodeId.equals(key.locNodeId) ;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = locNodeId.hashCode();
+
+        res = 31 * res + nodeId.hashCode();
+        res = 31 * res + (int)(qryId ^ (qryId >>> 32));
+        res = 31 * res + type.hashCode();
+        res = 31 * res + segmentId;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(QueryContextKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java
new file mode 100644
index 0000000..632d72a
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.h2.index.Cursor;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+
+/**
+ * Merge cursor from multiple nodes.
+ */
+public class BroadcastCursor implements Cursor, Comparator<RangeStream> {
+    /** Index. */
+    private final GridH2IndexBase idx;
+
+    /** */
+    private final int rangeId;
+
+    /** */
+    private final RangeStream[] streams;
+
+    /** */
+    private boolean first = true;
+
+    /** */
+    private int off;
+
+    /**
+     * @param rangeId Range ID.
+     * @param segmentKeys Remote nodes.
+     * @param rangeStreams Range streams.
+     */
+    public BroadcastCursor(GridH2IndexBase idx, int rangeId, 
Collection<SegmentKey> segmentKeys,
+        Map<SegmentKey, RangeStream> rangeStreams) {
+        this.idx = idx;
+        this.rangeId = rangeId;
+
+        streams = new RangeStream[segmentKeys.size()];
+
+        int i = 0;
+
+        for (SegmentKey segmentKey : segmentKeys) {
+            RangeStream stream = rangeStreams.get(segmentKey);
+
+            assert stream != null;
+
+            streams[i++] = stream;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compare(RangeStream o1, RangeStream o2) {
+        if (o1 == o2)
+            return 0;
+
+        // Nulls are at the beginning of array.
+        if (o1 == null)
+            return -1;
+
+        if (o2 == null)
+            return 1;
+
+        return idx.compareRows(o1.get(rangeId), o2.get(rangeId));
+    }
+
+    /**
+     * Try to fetch the first row.
+     *
+     * @return {@code true} If we were able to find at least one row.
+     */
+    private boolean goFirst() {
+        // Fetch first row from all the streams and sort them.
+        for (int i = 0; i < streams.length; i++) {
+            if (!streams[i].next(rangeId)) {
+                streams[i] = null;
+                off++; // After sorting this offset will cut off all null 
elements at the beginning of array.
+            }
+        }
+
+        if (off == streams.length)
+            return false;
+
+        Arrays.sort(streams, this);
+
+        return true;
+    }
+
+    /**
+     * Fetch next row.
+     *
+     * @return {@code true} If we were able to find at least one row.
+     */
+    private boolean goNext() {
+        assert off != streams.length;
+
+        if (!streams[off].next(rangeId)) {
+            // Next row from current min stream was not found -> nullify that 
stream and bump offset forward.
+            streams[off] = null;
+
+            return ++off != streams.length;
+        }
+
+        // Bubble up current min stream with respect to fetched row to achieve 
correct sort order of streams.
+        GridH2IndexBase.bubbleUp(streams, off, this);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean next() {
+        if (first) {
+            first = false;
+
+            return goFirst();
+        }
+
+        return goNext();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Row get() {
+        return streams[off].get(rangeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public SearchRow getSearchRow() {
+        return get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean previous() {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
new file mode 100644
index 0000000..03653c7
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
@@ -0,0 +1,841 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import javax.cache.CacheException;
+
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.h2.command.dml.Query;
+import org.h2.command.dml.Select;
+import org.h2.command.dml.SelectUnion;
+import org.h2.expression.Comparison;
+import org.h2.expression.Expression;
+import org.h2.expression.ExpressionColumn;
+import org.h2.index.IndexCondition;
+import org.h2.index.ViewIndex;
+import org.h2.table.Column;
+import org.h2.table.IndexColumn;
+import org.h2.table.SubQueryInfo;
+import org.h2.table.Table;
+import org.h2.table.TableFilter;
+import org.h2.table.TableView;
+
+/**
+ * Collocation model for a query.
+ */
+public final class CollocationModel {
+    /** */
+    public static final int MULTIPLIER_COLLOCATED = 1;
+
+    /** */
+    private static final int MULTIPLIER_UNICAST = 50;
+
+    /** */
+    private static final int MULTIPLIER_BROADCAST = 200;
+
+    /** */
+    private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000;
+
+    /** */
+    private final CollocationModel upper;
+
+    /** */
+    private final int filter;
+
+    /** */
+    private final boolean view;
+
+    /** */
+    private int multiplier;
+
+    /** */
+    private Type type;
+
+    /** */
+    private CollocationModel[] children;
+
+    /** */
+    private TableFilter[] childFilters;
+
+    /** */
+    private List<CollocationModel> unions;
+
+    /** */
+    private Select select;
+
+    /** */
+    private final boolean validate;
+
+    /**
+     * @param upper Upper.
+     * @param filter Filter.
+     * @param view This model will be a subquery (or top level query) and must 
contain child filters.
+     * @param validate Query validation flag.
+     */
+    private CollocationModel(CollocationModel upper, int filter, boolean view, 
boolean validate) {
+        this.upper = upper;
+        this.filter = filter;
+        this.view = view;
+        this.validate = validate;
+    }
+
+    /**
+     * @return Table filter for this collocation model.
+     */
+    private TableFilter filter() {
+        return upper == null ? null : upper.childFilters[filter];
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        calculate();
+
+        SB b = new SB();
+
+        for (int lvl = 0; lvl < 20; lvl++) {
+            if (!toString(b, lvl))
+                break;
+
+            b.a('\n');
+        }
+
+        return b.toString();
+    }
+
+    /**
+     * @param b String builder.
+     * @param lvl Depth level.
+     */
+    private boolean toString(SB b, int lvl) {
+        boolean res = false;
+
+        if (lvl == 0) {
+            TableFilter f = filter();
+            String tblAlias = f == null ? "^" : f.getTableAlias();
+
+            b.a("[tbl=").a(tblAlias).a(", type=").a(type).a(", 
mul=").a(multiplier).a("]");
+
+            res = true;
+        }
+        else if (childFilters != null) {
+            assert lvl > 0;
+
+            lvl--;
+
+            for (int i = 0; i < childFilters.length; i++) {
+                if (lvl == 0)
+                    b.a(" | ");
+
+                res |= child(i, true).toString(b, lvl);
+            }
+
+            if (lvl == 0)
+                b.a(" | ");
+        }
+
+        return res;
+    }
+
+    /**
+     * @param upper Upper.
+     * @param filter Filter.
+     * @param unions Unions.
+     * @param view This model will be a subquery (or top level query) and must 
contain child filters.
+     * @param validate Query validation flag.
+     * @return Created child collocation model.
+     */
+    private static CollocationModel createChildModel(CollocationModel upper,
+        int filter,
+        List<CollocationModel> unions,
+        boolean view,
+        boolean validate) {
+        CollocationModel child = new CollocationModel(upper, filter, view, 
validate);
+
+        if (unions != null) {
+            // Bind created child to unions.
+            assert upper == null || upper.child(filter, false) != null || 
unions.isEmpty();
+
+            if (upper != null && unions.isEmpty()) {
+                assert upper.child(filter, false) == null;
+
+                upper.children[filter] = child;
+            }
+
+            unions.add(child);
+
+            child.unions = unions;
+        }
+        else if (upper != null) {
+            // Bind created child to upper model.
+            assert upper.child(filter, false) == null;
+
+            upper.children[filter] = child;
+        }
+
+        return child;
+    }
+
+    /**
+     * @param childFilters New child filters.
+     * @return {@code true} If child filters were updated.
+     */
+    private boolean childFilters(TableFilter[] childFilters) {
+        assert childFilters != null;
+        assert view;
+
+        Select select = childFilters[0].getSelect();
+
+        assert this.select == null || this.select == select;
+
+        if (this.select == null) {
+            this.select = select;
+
+            assert this.childFilters == null;
+        }
+        else if (Arrays.equals(this.childFilters, childFilters))
+            return false;
+
+        if (this.childFilters == null) {
+            // We have to clone because H2 reuses array and reorders elements.
+            this.childFilters = childFilters.clone();
+
+            children = new CollocationModel[childFilters.length];
+        }
+        else {
+            assert this.childFilters.length == childFilters.length;
+
+            // We have to copy because H2 reuses array and reorders elements.
+            System.arraycopy(childFilters, 0, this.childFilters, 0, 
childFilters.length);
+
+            Arrays.fill(children, null);
+        }
+
+        // Reset results.
+        type = null;
+        multiplier = 0;
+
+        return true;
+    }
+
+    /**
+     * Do the needed calculations.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private void calculate() {
+        if (type != null)
+            return;
+
+        if (view) { // We are at (sub-)query model.
+            assert childFilters != null;
+
+            boolean collocated = true;
+            boolean partitioned = false;
+            int maxMultiplier = MULTIPLIER_COLLOCATED;
+
+            for (int i = 0; i < childFilters.length; i++) {
+                CollocationModel child = child(i, true);
+
+                Type t = child.type(true);
+
+                if (child.multiplier == MULTIPLIER_REPLICATED_NOT_LAST)
+                    maxMultiplier = child.multiplier;
+
+                if (t.isPartitioned()) {
+                    partitioned = true;
+
+                    if (!t.isCollocated()) {
+                        collocated = false;
+
+                        int m = child.multiplier(true);
+
+                        if (m > maxMultiplier) {
+                            maxMultiplier = m;
+
+                            if (maxMultiplier == 
MULTIPLIER_REPLICATED_NOT_LAST)
+                                break;
+                        }
+                    }
+                }
+            }
+
+            type = Type.of(partitioned, collocated);
+            multiplier = maxMultiplier;
+        }
+        else {
+            assert upper != null;
+            assert childFilters == null;
+
+            // We are at table instance.
+            Table tbl = filter().getTable();
+
+            // Only partitioned tables will do distributed joins.
+            if (!(tbl instanceof GridH2Table) || 
!((GridH2Table)tbl).isPartitioned()) {
+                type = Type.REPLICATED;
+                multiplier = MULTIPLIER_COLLOCATED;
+
+                return;
+            }
+
+            // If we are the first partitioned table in a join, then we are 
"base" for all the rest partitioned tables
+            // which will need to get remote result (if there is no affinity 
condition). Since this query is broadcasted
+            // to all the affinity nodes the "base" does not need to get 
remote results.
+            if (!upper.findPartitionedTableBefore(filter)) {
+                type = Type.PARTITIONED_COLLOCATED;
+                multiplier = MULTIPLIER_COLLOCATED;
+            }
+            else {
+                // It is enough to make sure that our previous join by 
affinity key is collocated, then we are
+                // collocated. If we at least have affinity key condition, 
then we do unicast which is cheaper.
+                switch (upper.joinedWithCollocated(filter)) {
+                    case COLLOCATED_JOIN:
+                        type = Type.PARTITIONED_COLLOCATED;
+                        multiplier = MULTIPLIER_COLLOCATED;
+
+                        break;
+
+                    case HAS_AFFINITY_CONDITION:
+                        type = Type.PARTITIONED_NOT_COLLOCATED;
+                        multiplier = MULTIPLIER_UNICAST;
+
+                        break;
+
+                    case NONE:
+                        type = Type.PARTITIONED_NOT_COLLOCATED;
+                        multiplier = MULTIPLIER_BROADCAST;
+
+                        break;
+
+                    default:
+                        throw new IllegalStateException();
+                }
+            }
+
+            if (upper.previousReplicated(filter))
+                multiplier = MULTIPLIER_REPLICATED_NOT_LAST;
+        }
+    }
+
+    /**
+     * @param f Current filter.
+     * @return {@code true} If partitioned table was found.
+     */
+    private boolean findPartitionedTableBefore(int f) {
+        for (int i = 0; i < f; i++) {
+            CollocationModel child = child(i, true);
+
+            // The c can be null if it is not a GridH2Table and not a 
sub-query,
+            // it is a some kind of function table or anything else that 
considered replicated.
+            if (child != null && child.type(true).isPartitioned())
+                return true;
+        }
+
+        // We have to search globally in upper queries as well.
+        return upper != null && upper.findPartitionedTableBefore(filter);
+    }
+
+    /**
+     * @param f Current filter.
+     * @return {@code true} If previous table is REPLICATED.
+     */
+    @SuppressWarnings("SimplifiableIfStatement")
+    private boolean previousReplicated(int f) {
+        if (f > 0 && child(f - 1, true).type(true) == Type.REPLICATED)
+            return true;
+
+        return upper != null && upper.previousReplicated(filter);
+    }
+
+    /**
+     * @param f Filter.
+     * @return Affinity join type.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private Affinity joinedWithCollocated(int f) {
+        TableFilter tf = childFilters[f];
+
+        GridH2Table tbl = (GridH2Table)tf.getTable();
+
+        if (validate) {
+            if (tbl.isCustomAffinityMapper())
+                throw customAffinityError(tbl.cacheName());
+
+            if (F.isEmpty(tf.getIndexConditions())) {
+                throw new CacheException("Failed to prepare distributed join 
query: " +
+                    "join condition does not use index [joinedCache=" + 
tbl.cacheName() +
+                    ", plan=" + tf.getSelect().getPlanSQL() + ']');
+            }
+        }
+
+        IndexColumn affCol = tbl.getAffinityKeyColumn();
+
+        boolean affKeyCondFound = false;
+
+        if (affCol != null) {
+            ArrayList<IndexCondition> idxConditions = tf.getIndexConditions();
+
+            int affColId = affCol.column.getColumnId();
+
+            for (int i = 0; i < idxConditions.size(); i++) {
+                IndexCondition c = idxConditions.get(i);
+                int colId = c.getColumn().getColumnId();
+                int cmpType = c.getCompareType();
+
+                if ((cmpType == Comparison.EQUAL || cmpType == 
Comparison.EQUAL_NULL_SAFE) &&
+                    (colId == affColId || 
tbl.rowDescriptor().isKeyColumn(colId)) && c.isEvaluatable()) {
+                    affKeyCondFound = true;
+
+                    Expression exp = c.getExpression();
+                    exp = exp.getNonAliasExpression();
+
+                    if (exp instanceof ExpressionColumn) {
+                        ExpressionColumn expCol = (ExpressionColumn)exp;
+
+                        // This is one of our previous joins.
+                        TableFilter prevJoin = expCol.getTableFilter();
+
+                        if (prevJoin != null) {
+                            CollocationModel cm = child(indexOf(prevJoin), 
true);
+
+                            // If the previous joined model is a subquery 
(view), we can not be sure that
+                            // the found affinity column is the needed one, 
since we can select multiple
+                            // different affinity columns from different 
tables.
+                            if (cm != null && !cm.view) {
+                                Type t = cm.type(true);
+
+                                if (t.isPartitioned() && t.isCollocated() && 
isAffinityColumn(prevJoin, expCol, validate))
+                                    return Affinity.COLLOCATED_JOIN;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return affKeyCondFound ? Affinity.HAS_AFFINITY_CONDITION : 
Affinity.NONE;
+    }
+
+    /**
+     * @param f Table filter.
+     * @return Index.
+     */
+    private int indexOf(TableFilter f) {
+        for (int i = 0; i < childFilters.length; i++) {
+            if (childFilters[i] == f)
+                return i;
+        }
+
+        throw new IllegalStateException();
+    }
+
+    /**
+     * @param f Table filter.
+     * @param expCol Expression column.
+     * @param validate Query validation flag.
+     * @return {@code true} It it is an affinity column.
+     */
+    private static boolean isAffinityColumn(TableFilter f, ExpressionColumn 
expCol, boolean validate) {
+        Column col = expCol.getColumn();
+
+        if (col == null)
+            return false;
+
+        Table t = col.getTable();
+
+        if (t.isView()) {
+            Query qry;
+
+            if (f.getIndex() != null)
+                qry = getSubQuery(f);
+            else
+                qry = GridSqlQueryParser.VIEW_QUERY.get((TableView)t);
+
+            return isAffinityColumn(qry, expCol, validate);
+        }
+
+        if (t instanceof GridH2Table) {
+            GridH2Table t0 = (GridH2Table)t;
+
+            if (validate && t0.isCustomAffinityMapper())
+                throw customAffinityError((t0).cacheName());
+
+            IndexColumn affCol = t0.getAffinityKeyColumn();
+
+            return affCol != null && col.getColumnId() == 
affCol.column.getColumnId();
+        }
+
+        return false;
+    }
+
+    /**
+     * @param qry Query.
+     * @param expCol Expression column.
+     * @param validate Query validation flag.
+     * @return {@code true} It it is an affinity column.
+     */
+    private static boolean isAffinityColumn(Query qry, ExpressionColumn 
expCol, boolean validate) {
+        if (qry.isUnion()) {
+            SelectUnion union = (SelectUnion)qry;
+
+            return isAffinityColumn(union.getLeft(), expCol, validate) && 
isAffinityColumn(union.getRight(), expCol, validate);
+        }
+
+        Expression exp = 
qry.getExpressions().get(expCol.getColumn().getColumnId()).getNonAliasExpression();
+
+        if (exp instanceof ExpressionColumn) {
+            expCol = (ExpressionColumn)exp;
+
+            return isAffinityColumn(expCol.getTableFilter(), expCol, validate);
+        }
+
+        return false;
+    }
+
+    /**
+     * @return Multiplier.
+     */
+    public int calculateMultiplier() {
+        // We don't need multiplier for union here because it will be 
summarized in H2.
+        return multiplier(false);
+    }
+
+    /**
+     * @param withUnion With respect to union.
+     * @return Multiplier.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private int multiplier(boolean withUnion) {
+        calculate();
+
+        assert multiplier != 0;
+
+        if (withUnion && unions != null) {
+            int maxMultiplier = 0;
+
+            for (int i = 0; i < unions.size(); i++) {
+                int m = unions.get(i).multiplier(false);
+
+                if (m > maxMultiplier)
+                    maxMultiplier = m;
+            }
+
+            return maxMultiplier;
+        }
+
+        return multiplier;
+    }
+
+    /**
+     * @param withUnion With respect to union.
+     * @return Type.
+     */
+    private Type type(boolean withUnion) {
+        calculate();
+
+        assert type != null;
+
+        if (withUnion && unions != null) {
+            Type left = unions.get(0).type(false);
+
+            for (int i = 1; i < unions.size(); i++) {
+                Type right = unions.get(i).type(false);
+
+                if (!left.isCollocated() || !right.isCollocated()) {
+                    left = Type.PARTITIONED_NOT_COLLOCATED;
+
+                    break;
+                }
+                else if (!left.isPartitioned() && !right.isPartitioned())
+                    left = Type.REPLICATED;
+                else
+                    left = Type.PARTITIONED_COLLOCATED;
+            }
+
+            return left;
+        }
+
+        return type;
+    }
+
+    /**
+     * @param i Index.
+     * @param create Create child if needed.
+     * @return Child collocation.
+     */
+    private CollocationModel child(int i, boolean create) {
+        CollocationModel child = children[i];
+
+        if (child == null && create) {
+            TableFilter f = childFilters[i];
+
+            if (f.getTable().isView()) {
+                if (f.getIndex() == null) {
+                    // If we don't have view index yet, then we just creating 
empty model and it must be filled later.
+                    child = createChildModel(this, i, null, true, validate);
+                }
+                else
+                    child = buildCollocationModel(this, i, getSubQuery(f), 
null, validate);
+            }
+            else
+                child = createChildModel(this, i, null, false, validate);
+
+            assert child != null;
+            assert children[i] == child;
+        }
+
+        return child;
+    }
+
+    /**
+     * @param f Table filter.
+     * @return Sub-query.
+     */
+    private static Query getSubQuery(TableFilter f) {
+        return ((ViewIndex)f.getIndex()).getQuery();
+    }
+
+    /**
+     * @return Unions list.
+     */
+    private List<CollocationModel> getOrCreateUnions() {
+        if (unions == null) {
+            unions = new ArrayList<>(4);
+
+            unions.add(this);
+        }
+
+        return unions;
+    }
+
+    /**
+     * @param qctx Query context.
+     * @param info Sub-query info.
+     * @param filters Filters.
+     * @param filter Filter.
+     * @param validate Query validation flag.
+     * @return Collocation.
+     */
+    public static CollocationModel buildCollocationModel(GridH2QueryContext 
qctx, SubQueryInfo info,
+        TableFilter[] filters, int filter, boolean validate) {
+        CollocationModel cm;
+
+        if (info != null) {
+            // Go up until we reach the root query.
+            cm = buildCollocationModel(qctx, info.getUpper(), 
info.getFilters(), info.getFilter(), validate);
+        }
+        else {
+            // We are at the root query.
+            cm = qctx.queryCollocationModel();
+
+            if (cm == null) {
+                cm = createChildModel(null, -1, null, true, validate);
+
+                qctx.queryCollocationModel(cm);
+            }
+        }
+
+        if (filters == null)
+            return cm;
+
+        assert cm.view;
+
+        Select select = filters[0].getSelect();
+
+        // Handle union. We have to rely on fact that select will be the same 
on uppermost select.
+        // For sub-queries we will drop collocation models, so that they will 
be recalculated anyways.
+        if (cm.select != null && cm.select != select) {
+            List<CollocationModel> unions = cm.getOrCreateUnions();
+
+            // Try to find this select in existing unions.
+            // Start with 1 because at 0 it always will be c.
+            for (int i = 1; i < unions.size(); i++) {
+                CollocationModel u = unions.get(i);
+
+                if (u.select == select) {
+                    cm = u;
+
+                    break;
+                }
+            }
+
+            // Nothing was found, need to create new child in union.
+            if (cm.select != select)
+                cm = createChildModel(cm.upper, cm.filter, unions, true, 
validate);
+        }
+
+        cm.childFilters(filters);
+
+        return cm.child(filter, true);
+    }
+
+    /**
+     * @param qry Query.
+     * @return {@code true} If the query is collocated.
+     */
+    public static boolean isCollocated(Query qry) {
+        CollocationModel mdl = buildCollocationModel(null, -1, qry, null, 
true);
+
+        Type type = mdl.type(true);
+
+        if (!type.isCollocated() && mdl.multiplier == 
MULTIPLIER_REPLICATED_NOT_LAST)
+            throw new CacheException("Failed to execute query: for distributed 
join " +
+                "all REPLICATED caches must be at the end of the joined tables 
list.");
+
+        return type.isCollocated();
+    }
+
+    /**
+     * @param upper Upper.
+     * @param filter Filter.
+     * @param qry Query.
+     * @param unions Unions.
+     * @param validate Query validation flag.
+     * @return Built model.
+     */
+    private static CollocationModel buildCollocationModel(CollocationModel 
upper,
+        int filter,
+        Query qry,
+        List<CollocationModel> unions,
+        boolean validate) {
+        if (qry.isUnion()) {
+            if (unions == null)
+                unions = new ArrayList<>();
+
+            SelectUnion union = (SelectUnion)qry;
+
+            CollocationModel left = buildCollocationModel(upper, filter, 
union.getLeft(), unions, validate);
+            CollocationModel right = buildCollocationModel(upper, filter, 
union.getRight(), unions, validate);
+
+            assert left != null;
+            assert right != null;
+
+            return upper != null ? upper : left;
+        }
+
+        Select select = (Select)qry;
+
+        List<TableFilter> list = new ArrayList<>();
+
+        for (TableFilter f = select.getTopTableFilter(); f != null; f = 
f.getJoin())
+            list.add(f);
+
+        TableFilter[] filters = list.toArray(new TableFilter[list.size()]);
+
+        CollocationModel cm = createChildModel(upper, filter, unions, true, 
validate);
+
+        cm.childFilters(filters);
+
+        for (int i = 0; i < filters.length; i++) {
+            TableFilter f = filters[i];
+
+            if (f.getTable().isView())
+                buildCollocationModel(cm, i, getSubQuery(f), null, validate);
+            else if (f.getTable() instanceof GridH2Table)
+                createChildModel(cm, i, null, false, validate);
+        }
+
+        return upper != null ? upper : cm;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Error.
+     */
+    private static CacheException customAffinityError(String cacheName) {
+        return new CacheException("Failed to prepare distributed join query: 
can not use distributed joins for cache " +
+            "with custom AffinityKeyMapper configured. " +
+            "Please use AffinityKeyMapped annotation instead [cache=" + 
cacheName + ']');
+    }
+
+    /**
+     * Collocation type.
+     */
+    private enum Type {
+        /** */
+        PARTITIONED_COLLOCATED(true, true),
+
+        /** */
+        PARTITIONED_NOT_COLLOCATED(true, false),
+
+        /** */
+        REPLICATED(false, true);
+
+        /** */
+        private final boolean partitioned;
+
+        /** */
+        private final boolean collocated;
+
+        /**
+         * @param partitioned Partitioned.
+         * @param collocated Collocated.
+         */
+        Type(boolean partitioned, boolean collocated) {
+            this.partitioned = partitioned;
+            this.collocated = collocated;
+        }
+
+        /**
+         * @return {@code true} If partitioned.
+         */
+        public boolean isPartitioned() {
+            return partitioned;
+        }
+
+        /**
+         * @return {@code true} If collocated.
+         */
+        public boolean isCollocated() {
+            return collocated;
+        }
+
+        /**
+         * @param partitioned Partitioned.
+         * @param collocated Collocated.
+         * @return Type.
+         */
+        static Type of(boolean partitioned, boolean collocated) {
+            if (collocated)
+                return partitioned ? Type.PARTITIONED_COLLOCATED : 
Type.REPLICATED;
+
+            assert partitioned;
+
+            return Type.PARTITIONED_NOT_COLLOCATED;
+        }
+    }
+
+    /**
+     * Affinity of a table relative to previous joined tables.
+     */
+    private enum Affinity {
+        /** */
+        NONE,
+
+        /** */
+        HAS_AFFINITY_CONDITION,
+
+        /** */
+        COLLOCATED_JOIN
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CursorIteratorWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CursorIteratorWrapper.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CursorIteratorWrapper.java
new file mode 100644
index 0000000..5a174e8
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CursorIteratorWrapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.internal.processors.query.h2.H2Cursor;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+
+import java.util.Iterator;
+
+/**
+ *
+ */
+public final class CursorIteratorWrapper implements Iterator<GridH2Row> {
+    /** */
+    private final H2Cursor cursor;
+
+    /** Next element. */
+    private GridH2Row next;
+
+    /**
+     * @param cursor Cursor.
+     */
+    public CursorIteratorWrapper(H2Cursor cursor) {
+        assert cursor != null;
+
+        this.cursor = cursor;
+
+        if (cursor.next())
+            next = (GridH2Row)cursor.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() {
+        return next != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridH2Row next() {
+        GridH2Row res = next;
+
+        if (cursor.next())
+            next = (GridH2Row)cursor.get();
+        else
+            next = null;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove() {
+        throw new UnsupportedOperationException("operation is not supported");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java
new file mode 100644
index 0000000..7c958da
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+/**
+ * Defines set of distributed join modes.
+ */
+public enum DistributedJoinMode {
+    /**
+     * Distributed joins is disabled. Local joins will be performed instead.
+     */
+    OFF,
+
+    /**
+     * Distributed joins is enabled within local node only.
+     *
+     * NOTE: This mode is used with segmented indices for local sql queries.
+     * As in this case we need to make distributed join across local index 
segments
+     * and prevent range-queries to other nodes.
+     */
+    LOCAL_ONLY,
+
+    /**
+     * Distributed joins is enabled.
+     */
+    ON;
+
+    /**
+     * @param isLocal Query local flag.
+     * @param distributedJoins Query distributed joins flag.
+     * @return DistributedJoinMode for the query.
+     */
+    public static DistributedJoinMode distributedJoinMode(boolean isLocal, 
boolean distributedJoins) {
+        return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
new file mode 100644
index 0000000..02d2e44
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.h2.index.Cursor;
+import org.h2.index.IndexLookupBatch;
+import org.h2.result.SearchRow;
+import org.h2.util.DoneFuture;
+import org.h2.value.Value;
+import org.h2.value.ValueNull;
+
+import javax.cache.CacheException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Future;
+
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.LOCAL_ONLY;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS;
+import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;
+
+/**
+ * Index lookup batch.
+ */
+public class DistributedLookupBatch implements IndexLookupBatch {
+    /** Index. */
+    private final GridH2IndexBase idx;
+
+    /** */
+    private final GridCacheContext<?,?> cctx;
+
+    /** */
+    private final boolean ucast;
+
+    /** */
+    private final int affColId;
+
+    /** */
+    private GridH2QueryContext qctx;
+
+    /** */
+    private int batchLookupId;
+
+    /** */
+    private Map<SegmentKey, RangeStream> rangeStreams = Collections.emptyMap();
+
+    /** */
+    private List<SegmentKey> broadcastSegments;
+
+    /** */
+    private List<Future<Cursor>> res = Collections.emptyList();
+
+    /** */
+    private boolean batchFull;
+
+    /** */
+    private boolean findCalled;
+
+    /**
+     * @param cctx Cache Cache context.
+     * @param ucast Unicast or broadcast query.
+     * @param affColId Affinity column ID.
+     */
+    public DistributedLookupBatch(GridH2IndexBase idx, GridCacheContext<?, ?> 
cctx, boolean ucast, int affColId) {
+        this.idx = idx;
+        this.cctx = cctx;
+        this.ucast = ucast;
+        this.affColId = affColId;
+    }
+
+    /**
+     * @param firstRow First row.
+     * @param lastRow Last row.
+     * @return Affinity key or {@code null}.
+     */
+    private Object getAffinityKey(SearchRow firstRow, SearchRow lastRow) {
+        if (affColId == COL_NOT_EXISTS)
+            return null;
+
+        if (firstRow == null || lastRow == null)
+            return null;
+
+        Value affKeyFirst = firstRow.getValue(affColId);
+        Value affKeyLast = lastRow.getValue(affColId);
+
+        if (affKeyFirst != null && equal(affKeyFirst, affKeyLast))
+            return affKeyFirst == ValueNull.INSTANCE ? 
GridH2IndexBase.EXPLICIT_NULL : affKeyFirst.getObject();
+
+        if (idx.getTable().rowDescriptor().isKeyColumn(affColId))
+            return null;
+
+        // Try to extract affinity key from primary key.
+        Value pkFirst = firstRow.getValue(KEY_COL);
+        Value pkLast = lastRow.getValue(KEY_COL);
+
+        if (pkFirst == ValueNull.INSTANCE || pkLast == ValueNull.INSTANCE)
+            return GridH2IndexBase.EXPLICIT_NULL;
+
+        if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast))
+            return null;
+
+        Object pkAffKeyFirst = 
cctx.affinity().affinityKey(pkFirst.getObject());
+        Object pkAffKeyLast = cctx.affinity().affinityKey(pkLast.getObject());
+
+        if (pkAffKeyFirst == null || pkAffKeyLast == null)
+            throw new CacheException("Cache key without affinity key.");
+
+        if (pkAffKeyFirst.equals(pkAffKeyLast))
+            return pkAffKeyFirst;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
+    @Override public boolean addSearchRows(SearchRow firstRow, SearchRow 
lastRow) {
+        if (qctx == null || findCalled) {
+            if (qctx == null) {
+                // It is the first call after query begin (may be after reuse),
+                // reinitialize query context and result.
+                qctx = GridH2QueryContext.get();
+                res = new ArrayList<>();
+
+                assert qctx != null;
+                assert !findCalled;
+            }
+            else {
+                // Cleanup after the previous lookup phase.
+                assert batchLookupId != 0;
+
+                findCalled = false;
+                qctx.putStreams(batchLookupId, null);
+                res.clear();
+            }
+
+            // Reinitialize for the next lookup phase.
+            batchLookupId = qctx.nextBatchLookupId();
+            rangeStreams = new HashMap<>();
+        }
+
+        Object affKey = getAffinityKey(firstRow, lastRow);
+
+        boolean locQry = localQuery();
+
+        List<SegmentKey> segmentKeys;
+
+        if (affKey != null) {
+            // Affinity key is provided.
+            if (affKey == GridH2IndexBase.EXPLICIT_NULL) // Affinity key is 
explicit null, we will not find anything.
+                return false;
+
+            segmentKeys = F.asList(rangeSegment(affKey, locQry));
+        }
+        else {
+            // Affinity key is not provided or is not the same in upper and 
lower bounds, we have to broadcast.
+            if (broadcastSegments == null)
+                broadcastSegments = broadcastSegments(locQry);
+
+            segmentKeys = broadcastSegments;
+        }
+
+        if (locQry && segmentKeys.isEmpty())
+            return false; // Nothing to do
+
+        assert !F.isEmpty(segmentKeys) : segmentKeys;
+
+        final int rangeId = res.size();
+
+        // Create messages.
+        GridH2RowMessage first = idx.toSearchRowMessage(firstRow);
+        GridH2RowMessage last = idx.toSearchRowMessage(lastRow);
+
+        // Range containing upper and lower bounds.
+        GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last);
+
+        // Add range to every message of every participating node.
+        for (int i = 0; i < segmentKeys.size(); i++) {
+            SegmentKey segmentKey = segmentKeys.get(i);
+            assert segmentKey != null;
+
+            RangeStream stream = rangeStreams.get(segmentKey);
+
+            List<GridH2RowRangeBounds> bounds;
+
+            if (stream == null) {
+                stream = new RangeStream(cctx.kernalContext(), idx, qctx, 
segmentKey.node());
+
+                stream.request(createRequest(qctx, batchLookupId, 
segmentKey.segmentId()));
+                stream.request().bounds(bounds = new ArrayList<>());
+
+                rangeStreams.put(segmentKey, stream);
+            }
+            else
+                bounds = stream.request().bounds();
+
+            bounds.add(rangeBounds);
+
+            // If at least one node will have a full batch then we are ok.
+            if (bounds.size() >= qctx.pageSize())
+                batchFull = true;
+        }
+
+        Cursor cur;
+
+        if (segmentKeys.size() == 1)
+            cur = new UnicastCursor(rangeId, 
rangeStreams.get(F.first(segmentKeys)));
+        else
+            cur = new BroadcastCursor(idx, rangeId, segmentKeys, rangeStreams);
+
+        res.add(new DoneFuture<>(cur));
+
+        return true;
+    }
+
+    /**
+     * @param v1 First value.
+     * @param v2 Second value.
+     * @return {@code true} If they equal.
+     */
+    private boolean equal(Value v1, Value v2) {
+        return v1 == v2 || (v1 != null && v2 != null &&
+            v1.compareTypeSafe(v2, idx.getDatabase().getCompareMode()) == 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isBatchFull() {
+        return batchFull;
+    }
+
+    /**
+     * @return {@code True} if local query execution is enforced.
+     */
+    private boolean localQuery() {
+        assert qctx != null : "Missing query context: " + this;
+
+        return qctx.distributedJoinMode() == LOCAL_ONLY;
+    }
+
+    /**
+     *
+     */
+    private void startStreams() {
+        if (rangeStreams.isEmpty()) {
+            assert res.isEmpty();
+
+            return;
+        }
+
+        qctx.putStreams(batchLookupId, rangeStreams);
+
+        // Start streaming.
+        for (RangeStream stream : rangeStreams.values()) {
+            stream.start();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    @Override public List<Future<Cursor>> find() {
+        batchFull = false;
+        findCalled = true;
+
+        startStreams();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset(boolean beforeQry) {
+        if (beforeQry || qctx == null) // Query context can be null if 
addSearchRows was never called.
+            return;
+
+        assert batchLookupId != 0;
+
+        // Do cleanup after the query run.
+        qctx.putStreams(batchLookupId, null);
+        qctx = null; // The same query can be reused multiple times for 
different query contexts.
+        batchLookupId = 0;
+
+        rangeStreams = Collections.emptyMap();
+        broadcastSegments = null;
+        batchFull = false;
+        findCalled = false;
+        res = Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getPlanSQL() {
+        return ucast ? "unicast" : "broadcast";
+    }
+
+    /**
+     * @param affKeyObj Affinity key.
+     * @param isLocalQry Local query flag.
+     * @return Segment key for Affinity key.
+     */
+    public SegmentKey rangeSegment(Object affKeyObj, boolean isLocalQry) {
+        assert affKeyObj != null && affKeyObj != GridH2IndexBase.EXPLICIT_NULL 
: affKeyObj;
+
+        ClusterNode node;
+
+        int partition = cctx.affinity().partition(affKeyObj);
+
+        if (isLocalQry) {
+            if (qctx.partitionsMap() != null) {
+                // If we have explicit partitions map, we have to use it to 
calculate affinity node.
+                UUID nodeId = qctx.nodeForPartition(partition, cctx);
+
+                if(!cctx.localNodeId().equals(nodeId))
+                    return null; // Prevent remote index call for local 
queries.
+            }
+
+            if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, 
qctx.topologyVersion()))
+                return null;
+
+            node = cctx.localNode();
+        }
+        else{
+            if (qctx.partitionsMap() != null) {
+                // If we have explicit partitions map, we have to use it to 
calculate affinity node.
+                UUID nodeId = qctx.nodeForPartition(partition, cctx);
+
+                node = cctx.discovery().node(nodeId);
+            }
+            else // Get primary node for current topology version.
+                node = cctx.affinity().primaryByKey(affKeyObj, 
qctx.topologyVersion());
+
+            if (node == null) // Node was not found, probably topology changed 
and we need to retry the whole query.
+                throw H2Utils.retryException("Failed to get primary node by 
key for range segment.");
+        }
+
+        return new SegmentKey(node, idx.segmentForPartition(partition));
+    }
+
+    /**
+     * @param isLocalQry Local query flag.
+     * @return Collection of nodes for broadcasting.
+     */
+    public List<SegmentKey> broadcastSegments(boolean isLocalQry) {
+        Map<UUID, int[]> partMap = qctx.partitionsMap();
+
+        List<ClusterNode> nodes;
+
+        if (isLocalQry) {
+            if (partMap != null && !partMap.containsKey(cctx.localNodeId()))
+                return Collections.emptyList(); // Prevent remote index call 
for local queries.
+
+            nodes = Collections.singletonList(cctx.localNode());
+        }
+        else {
+            if (partMap == null)
+                nodes = new ArrayList<>(CU.affinityNodes(cctx, 
qctx.topologyVersion()));
+            else {
+                nodes = new ArrayList<>(partMap.size());
+
+                for (UUID nodeId : partMap.keySet()) {
+                    ClusterNode node = 
cctx.kernalContext().discovery().node(nodeId);
+
+                    if (node == null)
+                        throw H2Utils.retryException("Failed to get node by ID 
during broadcast [" +
+                            "nodeId=" + nodeId + ']');
+
+                    nodes.add(node);
+                }
+            }
+
+            if (F.isEmpty(nodes))
+                throw H2Utils.retryException("Failed to collect affinity nodes 
during broadcast [" +
+                    "cacheName=" + cctx.name() + ']');
+        }
+
+        int segmentsCount = idx.segmentsCount();
+
+        List<SegmentKey> res = new ArrayList<>(nodes.size() * segmentsCount);
+
+        for (ClusterNode node : nodes) {
+            for (int seg = 0; seg < segmentsCount; seg++)
+                res.add(new SegmentKey(node, seg));
+        }
+
+        return res;
+    }
+
+    /**
+     * @param qctx Query context.
+     * @param batchLookupId Batch lookup ID.
+     * @param segmentId Segment ID.
+     * @return Index range request.
+     */
+    public static GridH2IndexRangeRequest createRequest(GridH2QueryContext 
qctx, int batchLookupId, int segmentId) {
+        GridH2IndexRangeRequest req = new GridH2IndexRangeRequest();
+
+        req.originNodeId(qctx.originNodeId());
+        req.queryId(qctx.queryId());
+        req.originSegmentId(qctx.segment());
+        req.segment(segmentId);
+        req.batchLookupId(batchLookupId);
+
+        return req;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java
new file mode 100644
index 0000000..8cce83d
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static java.util.Collections.emptyIterator;
+
+/**
+ * Bounds iterator.
+ */
+public class RangeSource {
+    /** Index. */
+    private final GridH2IndexBase idx;
+
+    /** */
+    private Iterator<GridH2RowRangeBounds> boundsIter;
+
+    /** */
+    private int curRangeId = -1;
+
+    /** */
+    private final int segment;
+
+    /** */
+    private final BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter;
+
+    /** Iterator. */
+    private Iterator<GridH2Row> iter = emptyIterator();
+
+    /**
+     * @param bounds Bounds.
+     * @param segment Segment.
+     * @param filter Filter.
+     */
+    public RangeSource(
+        GridH2IndexBase idx,
+        Iterable<GridH2RowRangeBounds> bounds,
+        int segment,
+        BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter
+    ) {
+        this.idx = idx;
+        this.segment = segment;
+        this.filter = filter;
+
+        boundsIter = bounds.iterator();
+    }
+
+    /**
+     * @return {@code true} If there are more rows in this source.
+     */
+    public boolean hasMoreRows() throws IgniteCheckedException {
+        return boundsIter.hasNext() || iter.hasNext();
+    }
+
+    /**
+     * @param maxRows Max allowed rows.
+     * @return Range.
+     */
+    public GridH2RowRange next(int maxRows) {
+        assert maxRows > 0 : maxRows;
+
+        for (; ; ) {
+            if (iter.hasNext()) {
+                // Here we are getting last rows from previously partially 
fetched range.
+                List<GridH2RowMessage> rows = new ArrayList<>();
+
+                GridH2RowRange nextRange = new GridH2RowRange();
+
+                nextRange.rangeId(curRangeId);
+                nextRange.rows(rows);
+
+                do {
+                    rows.add(H2Utils.toRowMessage(iter.next()));
+                }
+                while (rows.size() < maxRows && iter.hasNext());
+
+                if (iter.hasNext())
+                    nextRange.setPartial();
+                else
+                    iter = emptyIterator();
+
+                return nextRange;
+            }
+
+            iter = emptyIterator();
+
+            if (!boundsIter.hasNext()) {
+                boundsIter = emptyIterator();
+
+                return null;
+            }
+
+            GridH2RowRangeBounds bounds = boundsIter.next();
+
+            curRangeId = bounds.rangeId();
+
+            iter = idx.findForSegment(bounds, segment, filter);
+
+            if (!iter.hasNext()) {
+                // We have to return empty range here.
+                GridH2RowRange emptyRange = new GridH2RowRange();
+
+                emptyRange.rangeId(curRangeId);
+
+                return emptyRange;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
new file mode 100644
index 0000000..0684089
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.index.Cursor;
+import org.h2.result.Row;
+import org.h2.value.Value;
+
+import javax.cache.CacheException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Collections.emptyIterator;
+import static java.util.Collections.singletonList;
+import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
+import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
+import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
+import static org.h2.result.Row.MEMORY_CALCULATE;
+
+/**
+ * Per node range stream.
+ */
+public class RangeStream {
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Index. */
+    private final GridH2IndexBase idx;
+
+    /** */
+    private final GridH2QueryContext qctx;
+
+    /** */
+    private final ClusterNode node;
+
+    /** */
+    private GridH2IndexRangeRequest req;
+
+    /** */
+    private int remainingRanges;
+
+    /** */
+    private final BlockingQueue<GridH2IndexRangeResponse> respQueue = new 
LinkedBlockingQueue<>();
+
+    /** */
+    private Iterator<GridH2RowRange> ranges = emptyIterator();
+
+    /** */
+    private Cursor cursor = GridH2Cursor.EMPTY;
+
+    /** */
+    private int cursorRangeId = -1;
+
+    /**
+     * @param qctx Query context.
+     * @param node Node.
+     */
+    public RangeStream(GridKernalContext ctx, GridH2IndexBase idx, 
GridH2QueryContext qctx, ClusterNode node) {
+        this.ctx = ctx;
+        this.idx = idx;
+        this.node = node;
+        this.qctx = qctx;
+    }
+
+    /**
+     * Start streaming.
+     */
+    public void start() {
+        remainingRanges = req.bounds().size();
+
+        assert remainingRanges > 0;
+
+        idx.send(singletonList(node), req);
+    }
+
+    /**
+     * @param msg Response.
+     */
+    public void onResponse(GridH2IndexRangeResponse msg) {
+        respQueue.add(msg);
+    }
+
+    /**
+     * @param req Current request.
+     */
+    public void request(GridH2IndexRangeRequest req) {
+        this.req = req;
+    }
+
+    /**
+     * @return Current request.
+     */
+    public GridH2IndexRangeRequest request() {
+        return req;
+    }
+
+    /**
+     * @return Response.
+     */
+    private GridH2IndexRangeResponse awaitForResponse() {
+        assert remainingRanges > 0;
+
+        final long start = U.currentTimeMillis();
+
+        for (int attempt = 0;; attempt++) {
+            if (qctx.isCleared())
+                throw H2Utils.retryException("Query is cancelled.");
+
+            if (ctx.isStopping())
+                throw H2Utils.retryException("Local node is stopping.");
+
+            GridH2IndexRangeResponse res;
+
+            try {
+                res = respQueue.poll(500, TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException ignored) {
+                throw H2Utils.retryException("Interrupted while waiting for 
reply.");
+            }
+
+            if (res != null) {
+                switch (res.status()) {
+                    case STATUS_OK:
+                        List<GridH2RowRange> ranges0 = res.ranges();
+
+                        remainingRanges -= ranges0.size();
+
+                        if (ranges0.get(ranges0.size() - 1).isPartial())
+                            remainingRanges++;
+
+                        if (remainingRanges > 0) {
+                            if (req.bounds() != null)
+                                req = 
DistributedLookupBatch.createRequest(qctx, req.batchLookupId(), req.segment());
+
+                            // Prefetch next page.
+                            idx.send(singletonList(node), req);
+                        }
+                        else
+                            req = null;
+
+                        return res;
+
+                    case STATUS_NOT_FOUND:
+                        if (req == null || req.bounds() == null) // We have 
already received the first response.
+                            throw H2Utils.retryException("Failure on remote 
node.");
+
+                        if (U.currentTimeMillis() - start > 30_000)
+                            throw H2Utils.retryException("Timeout reached.");
+
+                        try {
+                            U.sleep(20 * attempt);
+                        }
+                        catch (IgniteInterruptedCheckedException e) {
+                            throw new 
IgniteInterruptedException(e.getMessage());
+                        }
+
+                        // Retry to send the request once more after some time.
+                        idx.send(singletonList(node), req);
+
+                        break;
+
+                    case STATUS_ERROR:
+                        throw new CacheException(res.error());
+
+                    default:
+                        throw new IllegalStateException();
+                }
+            }
+
+            if (!ctx.discovery().alive(node))
+                throw H2Utils.retryException("Node has left topology: " + 
node.id());
+        }
+    }
+
+    /**
+     * @param rangeId Requested range ID.
+     * @return {@code true} If next row for the requested range was found.
+     */
+    public boolean next(final int rangeId) {
+        for (;;) {
+            if (rangeId == cursorRangeId) {
+                if (cursor.next())
+                    return true;
+            }
+            else if (rangeId < cursorRangeId)
+                return false;
+
+            cursor = GridH2Cursor.EMPTY;
+
+            while (!ranges.hasNext()) {
+                if (remainingRanges == 0) {
+                    ranges = emptyIterator();
+
+                    return false;
+                }
+
+                ranges = awaitForResponse().ranges().iterator();
+            }
+
+            GridH2RowRange range = ranges.next();
+
+            cursorRangeId = range.rangeId();
+
+            if (!F.isEmpty(range.rows())) {
+                final Iterator<GridH2RowMessage> it = range.rows().iterator();
+
+                if (it.hasNext()) {
+                    cursor = new GridH2Cursor(new Iterator<Row>() {
+                        @Override public boolean hasNext() {
+                            return it.hasNext();
+                        }
+
+                        @Override public Row next() {
+                            // Lazily convert messages into real rows.
+                            return toRow(it.next());
+                        }
+
+                        @Override public void remove() {
+                            throw new UnsupportedOperationException();
+                        }
+                    });
+                }
+            }
+        }
+    }
+
+    /**
+     * @param msg Message.
+     * @return Row.
+     */
+    private Row toRow(GridH2RowMessage msg) {
+        if (msg == null)
+            return null;
+
+        List<GridH2ValueMessage> vals = msg.values();
+
+        assert !F.isEmpty(vals) : vals;
+
+        Value[] vals0 = new Value[vals.size()];
+
+        for (int i = 0; i < vals0.length; i++) {
+            try {
+                vals0[i] = vals.get(i).value(ctx);
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        return idx.getDatabase().createRow(vals0, MEMORY_CALCULATE);
+    }
+
+    /**
+     * @param rangeId Requested range ID.
+     * @return Current row.
+     */
+    public Row get(int rangeId) {
+        assert rangeId == cursorRangeId;
+
+        return cursor.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SegmentKey.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SegmentKey.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SegmentKey.java
new file mode 100644
index 0000000..9bdbab4
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SegmentKey.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ * Segment key.
+ */
+public class SegmentKey {
+    /** */
+    private final ClusterNode node;
+
+    /** */
+    private final int segmentId;
+
+    /**
+     * Constructor.
+     *
+     * @param node Node.
+     * @param segmentId Segment ID.
+     */
+    public SegmentKey(ClusterNode node, int segmentId) {
+        assert node != null;
+
+        this.node = node;
+        this.segmentId = segmentId;
+    }
+
+    /**
+     * @return Node.
+     */
+    public ClusterNode node() {
+        return node;
+    }
+
+    /**
+     * @return Segment ID.
+     */
+    public int segmentId() {
+        return segmentId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        SegmentKey key = (SegmentKey)o;
+
+        return segmentId == key.segmentId && node.id().equals(key.node.id());
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = node.hashCode();
+
+        res = 31 * res + segmentId;
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SourceKey.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SourceKey.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SourceKey.java
new file mode 100644
index 0000000..9cf7629
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SourceKey.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import java.util.UUID;
+
+/**
+ * Key for source.
+ */
+public class SourceKey {
+    /** */
+    private final UUID ownerId;
+
+    /** */
+    private final int segmentId;
+
+    /** */
+    private final int batchLookupId;
+
+    /**
+     * @param ownerId Owner node ID.
+     * @param segmentId Index segment ID.
+     * @param batchLookupId Batch lookup ID.
+     */
+    public SourceKey(UUID ownerId, int segmentId, int batchLookupId) {
+        this.ownerId = ownerId;
+        this.segmentId = segmentId;
+        this.batchLookupId = batchLookupId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (!(o instanceof SourceKey))
+            return false;
+
+        SourceKey srcKey = (SourceKey)o;
+
+        return batchLookupId == srcKey.batchLookupId && segmentId == 
srcKey.segmentId &&
+            ownerId.equals(srcKey.ownerId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int hash = ownerId.hashCode();
+
+        hash = 31 * hash + segmentId;
+        hash = 31 * hash + batchLookupId;
+
+        return hash;
+    }
+}

Reply via email to