Initial split done.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9fa48e50
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9fa48e50
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9fa48e50

Branch: refs/heads/ignite-10759-1
Commit: 9fa48e5024453d4595f143d0040b7c030488d22d
Parents: 5e73890
Author: devozerov <voze...@gridgain.com>
Authored: Fri Dec 21 17:31:06 2018 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Fri Dec 21 17:31:06 2018 +0300

----------------------------------------------------------------------
 .../query/h2/opt/GridH2IndexBase.java           | 289 +----------------
 .../h2/opt/join/DistributedLookupBatch.java     | 310 +++++++++++++++++++
 2 files changed, 319 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9fa48e50/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 802b9d9..8b02d01 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -29,12 +29,11 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
 import org.apache.ignite.internal.processors.query.h2.H2Cursor;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
-import org.apache.ignite.internal.processors.query.h2.opt.join.BroadcastCursor;
 import 
org.apache.ignite.internal.processors.query.h2.opt.join.CursorIteratorWrapper;
+import 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch;
 import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource;
 import org.apache.ignite.internal.processors.query.h2.opt.join.RangeStream;
 import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey;
-import org.apache.ignite.internal.processors.query.h2.opt.join.UnicastCursor;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -54,7 +53,6 @@ import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.engine.Session;
 import org.h2.index.BaseIndex;
-import org.h2.index.Cursor;
 import org.h2.index.IndexCondition;
 import org.h2.index.IndexLookupBatch;
 import org.h2.index.ViewIndex;
@@ -63,34 +61,27 @@ import org.h2.result.Row;
 import org.h2.result.SearchRow;
 import org.h2.table.IndexColumn;
 import org.h2.table.TableFilter;
-import org.h2.util.DoneFuture;
 import org.h2.value.Value;
-import org.h2.value.ValueNull;
 
 import javax.cache.CacheException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.Future;
 
 import static java.util.Collections.singletonList;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS;
 import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
 import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
 import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
-import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;
 import static org.h2.result.Row.MEMORY_CALCULATE;
 
 /**
@@ -98,7 +89,7 @@ import static org.h2.result.Row.MEMORY_CALCULATE;
  */
 public abstract class GridH2IndexBase extends BaseIndex {
     /** */
-    private static final Object EXPLICIT_NULL = new Object();
+    public static final Object EXPLICIT_NULL = new Object();
 
     /** */
     private Object msgTopic;
@@ -111,7 +102,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
     /** */
     private final CIX2<ClusterNode,Message> locNodeHnd = new 
CIX2<ClusterNode,Message>() {
-        @Override public void applyx(ClusterNode clusterNode, Message msg) 
throws IgniteCheckedException {
+        @Override public void applyx(ClusterNode clusterNode, Message msg) {
             onMessage0(clusterNode.id(), msg);
         }
     };
@@ -121,6 +112,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
     /**
      * @param tbl Table.
      */
+    @SuppressWarnings("MapReplaceableByEnumMap")
     protected final void initDistributedJoinMessaging(GridH2Table tbl) {
         final GridH2RowDescriptor desc = tbl.rowDescriptor();
 
@@ -521,7 +513,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param isLocalQry Local query flag.
      * @return Collection of nodes for broadcasting.
      */
-    private List<SegmentKey> broadcastSegments(GridH2QueryContext qctx, 
GridCacheContext<?, ?> cctx, boolean isLocalQry) {
+    // TODO: Move to distributed batch
+    public List<SegmentKey> broadcastSegments(GridH2QueryContext qctx, 
GridCacheContext<?, ?> cctx, boolean isLocalQry) {
         Map<UUID, int[]> partMap = qctx.partitionsMap();
 
         List<ClusterNode> nodes;
@@ -574,6 +567,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param isLocalQry Local query flag.
      * @return Segment key for Affinity key.
      */
+    // TODO: Move to distributed batch
     public SegmentKey rangeSegment(GridCacheContext<?, ?> cctx, 
GridH2QueryContext qctx, Object affKeyObj,
         boolean isLocalQry) {
         assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj;
@@ -705,6 +699,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param row Table row.
      * @return Segment ID for given row.
      */
+    @SuppressWarnings("IfMayBeConditional")
     protected int segmentForRow(SearchRow row) {
         assert row != null;
 
@@ -728,272 +723,6 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * Index lookup batch.
-     */
-    private static class DistributedLookupBatch implements IndexLookupBatch {
-        /** Index. */
-        private final GridH2IndexBase idx;
-
-        /** */
-        final GridCacheContext<?,?> cctx;
-
-        /** */
-        final boolean ucast;
-
-        /** */
-        final int affColId;
-
-        /** */
-        GridH2QueryContext qctx;
-
-        /** */
-        int batchLookupId;
-
-        /** */
-        Map<SegmentKey, RangeStream> rangeStreams = Collections.emptyMap();
-
-        /** */
-        List<SegmentKey> broadcastSegments;
-
-        /** */
-        List<Future<Cursor>> res = Collections.emptyList();
-
-        /** */
-        boolean batchFull;
-
-        /** */
-        boolean findCalled;
-
-        /**
-         * @param cctx Cache Cache context.
-         * @param ucast Unicast or broadcast query.
-         * @param affColId Affinity column ID.
-         */
-        public DistributedLookupBatch(GridH2IndexBase idx, GridCacheContext<?, 
?> cctx, boolean ucast, int affColId) {
-            this.idx = idx;
-            this.cctx = cctx;
-            this.ucast = ucast;
-            this.affColId = affColId;
-        }
-
-        /**
-         * @param firstRow First row.
-         * @param lastRow Last row.
-         * @return Affinity key or {@code null}.
-         */
-        private Object getAffinityKey(SearchRow firstRow, SearchRow lastRow) {
-            if (affColId == COL_NOT_EXISTS)
-                return null;
-
-            if (firstRow == null || lastRow == null)
-                return null;
-
-            Value affKeyFirst = firstRow.getValue(affColId);
-            Value affKeyLast = lastRow.getValue(affColId);
-
-            if (affKeyFirst != null && equal(affKeyFirst, affKeyLast))
-                return affKeyFirst == ValueNull.INSTANCE ? EXPLICIT_NULL : 
affKeyFirst.getObject();
-
-            if (idx.getTable().rowDescriptor().isKeyColumn(affColId))
-                return null;
-
-            // Try to extract affinity key from primary key.
-            Value pkFirst = firstRow.getValue(KEY_COL);
-            Value pkLast = lastRow.getValue(KEY_COL);
-
-            if (pkFirst == ValueNull.INSTANCE || pkLast == ValueNull.INSTANCE)
-                return EXPLICIT_NULL;
-
-            if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast))
-                return null;
-
-            Object pkAffKeyFirst = 
cctx.affinity().affinityKey(pkFirst.getObject());
-            Object pkAffKeyLast = 
cctx.affinity().affinityKey(pkLast.getObject());
-
-            if (pkAffKeyFirst == null || pkAffKeyLast == null)
-                throw new CacheException("Cache key without affinity key.");
-
-            if (pkAffKeyFirst.equals(pkAffKeyLast))
-                return pkAffKeyFirst;
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("ForLoopReplaceableByForEach")
-        @Override public boolean addSearchRows(SearchRow firstRow, SearchRow 
lastRow) {
-            if (qctx == null || findCalled) {
-                if (qctx == null) {
-                    // It is the first call after query begin (may be after 
reuse),
-                    // reinitialize query context and result.
-                    qctx = GridH2QueryContext.get();
-                    res = new ArrayList<>();
-
-                    assert qctx != null;
-                    assert !findCalled;
-                }
-                else {
-                    // Cleanup after the previous lookup phase.
-                    assert batchLookupId != 0;
-
-                    findCalled = false;
-                    qctx.putStreams(batchLookupId, null);
-                    res.clear();
-                }
-
-                // Reinitialize for the next lookup phase.
-                batchLookupId = qctx.nextBatchLookupId();
-                rangeStreams = new HashMap<>();
-            }
-
-            Object affKey = getAffinityKey(firstRow, lastRow);
-
-            boolean locQry = localQuery();
-
-            List<SegmentKey> segmentKeys;
-
-            if (affKey != null) {
-                // Affinity key is provided.
-                if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, 
we will not find anything.
-                    return false;
-
-                segmentKeys = F.asList(idx.rangeSegment(cctx, qctx, affKey, 
locQry));
-            }
-            else {
-                // Affinity key is not provided or is not the same in upper 
and lower bounds, we have to broadcast.
-                if (broadcastSegments == null)
-                    broadcastSegments = idx.broadcastSegments(qctx, cctx, 
locQry);
-
-                segmentKeys = broadcastSegments;
-            }
-
-            if (locQry && segmentKeys.isEmpty())
-                return false; // Nothing to do
-
-            assert !F.isEmpty(segmentKeys) : segmentKeys;
-
-            final int rangeId = res.size();
-
-            // Create messages.
-            GridH2RowMessage first = idx.toSearchRowMessage(firstRow);
-            GridH2RowMessage last = idx.toSearchRowMessage(lastRow);
-
-            // Range containing upper and lower bounds.
-            GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, 
last);
-
-            // Add range to every message of every participating node.
-            for (int i = 0; i < segmentKeys.size(); i++) {
-                SegmentKey segmentKey = segmentKeys.get(i);
-                assert segmentKey != null;
-
-                RangeStream stream = rangeStreams.get(segmentKey);
-
-                List<GridH2RowRangeBounds> bounds;
-
-                if (stream == null) {
-                    stream = new RangeStream(cctx.kernalContext(), idx, qctx, 
segmentKey.node());
-
-                    stream.request(createRequest(qctx, batchLookupId, 
segmentKey.segmentId()));
-                    stream.request().bounds(bounds = new ArrayList<>());
-
-                    rangeStreams.put(segmentKey, stream);
-                }
-                else
-                    bounds = stream.request().bounds();
-
-                bounds.add(rangeBounds);
-
-                // If at least one node will have a full batch then we are ok.
-                if (bounds.size() >= qctx.pageSize())
-                    batchFull = true;
-            }
-
-            Future<Cursor> fut = new DoneFuture<>(segmentKeys.size() == 1 ?
-                new UnicastCursor(rangeId, segmentKeys, rangeStreams) :
-                new BroadcastCursor(idx, rangeId, segmentKeys, rangeStreams));
-
-            res.add(fut);
-
-            return true;
-        }
-
-        /**
-         * @param v1 First value.
-         * @param v2 Second value.
-         * @return {@code true} If they equal.
-         */
-        private boolean equal(Value v1, Value v2) {
-            return v1 == v2 || (v1 != null && v2 != null && 
v1.compareTypeSafe(v2, idx.getDatabase().getCompareMode()) == 0);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isBatchFull() {
-            return batchFull;
-        }
-
-        /**
-         * @return {@code True} if local query execution is enforced.
-         */
-        private boolean localQuery() {
-            assert qctx != null : "Missing query context: " + this;
-
-            return qctx.distributedJoinMode() == LOCAL_ONLY;
-        }
-
-        /**
-         *
-         */
-        private void startStreams() {
-            if (rangeStreams.isEmpty()) {
-                assert res.isEmpty();
-
-                return;
-            }
-
-            qctx.putStreams(batchLookupId, rangeStreams);
-
-            // Start streaming.
-            for (RangeStream stream : rangeStreams.values()) {
-                stream.start();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public List<Future<Cursor>> find() {
-            batchFull = false;
-            findCalled = true;
-
-            startStreams();
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void reset(boolean beforeQry) {
-            if (beforeQry || qctx == null) // Query context can be null if 
addSearchRows was never called.
-                return;
-
-            assert batchLookupId != 0;
-
-            // Do cleanup after the query run.
-            qctx.putStreams(batchLookupId, null);
-            qctx = null; // The same query can be reused multiple times for 
different query contexts.
-            batchLookupId = 0;
-
-            rangeStreams = Collections.emptyMap();
-            broadcastSegments = null;
-            batchFull = false;
-            findCalled = false;
-            res = Collections.emptyList();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String getPlanSQL() {
-            return ucast ? "unicast" : "broadcast";
-        }
-    }
-
-    /**
      * Find rows for the segments (distributed joins).
      *
      * @param bounds Bounds.
@@ -1001,6 +730,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param filter Filter.
      * @return Iterator.
      */
+    @SuppressWarnings("unchecked")
     public Iterator<GridH2Row> findForSegment(GridH2RowRangeBounds bounds, int 
segment,
         BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) {
         SearchRow first = toSearchRow(bounds.first());
@@ -1040,5 +770,4 @@ public abstract class GridH2IndexBase extends BaseIndex {
         for (int pos = 0; pos < columnIds.length; ++pos)
             columnIds[pos] = columns[pos].getColumnId();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fa48e50/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
new file mode 100644
index 0000000..268beb1
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
+import org.apache.ignite.internal.util.typedef.F;
+import org.h2.index.Cursor;
+import org.h2.index.IndexLookupBatch;
+import org.h2.result.SearchRow;
+import org.h2.util.DoneFuture;
+import org.h2.value.Value;
+import org.h2.value.ValueNull;
+
+import javax.cache.CacheException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS;
+import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;
+
+/**
+ * Index lookup batch.
+ */
+public class DistributedLookupBatch implements IndexLookupBatch {
+    /** Index. */
+    private final GridH2IndexBase idx;
+
+    /** */
+    private final GridCacheContext<?,?> cctx;
+
+    /** */
+    private final boolean ucast;
+
+    /** */
+    private final int affColId;
+
+    /** */
+    private GridH2QueryContext qctx;
+
+    /** */
+    private int batchLookupId;
+
+    /** */
+    private Map<SegmentKey, RangeStream> rangeStreams = Collections.emptyMap();
+
+    /** */
+    private List<SegmentKey> broadcastSegments;
+
+    /** */
+    private List<Future<Cursor>> res = Collections.emptyList();
+
+    /** */
+    private boolean batchFull;
+
+    /** */
+    private boolean findCalled;
+
+    /**
+     * @param cctx Cache Cache context.
+     * @param ucast Unicast or broadcast query.
+     * @param affColId Affinity column ID.
+     */
+    public DistributedLookupBatch(GridH2IndexBase idx, GridCacheContext<?, ?> 
cctx, boolean ucast, int affColId) {
+        this.idx = idx;
+        this.cctx = cctx;
+        this.ucast = ucast;
+        this.affColId = affColId;
+    }
+
+    /**
+     * @param firstRow First row.
+     * @param lastRow Last row.
+     * @return Affinity key or {@code null}.
+     */
+    private Object getAffinityKey(SearchRow firstRow, SearchRow lastRow) {
+        if (affColId == COL_NOT_EXISTS)
+            return null;
+
+        if (firstRow == null || lastRow == null)
+            return null;
+
+        Value affKeyFirst = firstRow.getValue(affColId);
+        Value affKeyLast = lastRow.getValue(affColId);
+
+        if (affKeyFirst != null && equal(affKeyFirst, affKeyLast))
+            return affKeyFirst == ValueNull.INSTANCE ? 
GridH2IndexBase.EXPLICIT_NULL : affKeyFirst.getObject();
+
+        if (idx.getTable().rowDescriptor().isKeyColumn(affColId))
+            return null;
+
+        // Try to extract affinity key from primary key.
+        Value pkFirst = firstRow.getValue(KEY_COL);
+        Value pkLast = lastRow.getValue(KEY_COL);
+
+        if (pkFirst == ValueNull.INSTANCE || pkLast == ValueNull.INSTANCE)
+            return GridH2IndexBase.EXPLICIT_NULL;
+
+        if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast))
+            return null;
+
+        Object pkAffKeyFirst = 
cctx.affinity().affinityKey(pkFirst.getObject());
+        Object pkAffKeyLast = cctx.affinity().affinityKey(pkLast.getObject());
+
+        if (pkAffKeyFirst == null || pkAffKeyLast == null)
+            throw new CacheException("Cache key without affinity key.");
+
+        if (pkAffKeyFirst.equals(pkAffKeyLast))
+            return pkAffKeyFirst;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override public boolean addSearchRows(SearchRow firstRow, SearchRow 
lastRow) {
+        if (qctx == null || findCalled) {
+            if (qctx == null) {
+                // It is the first call after query begin (may be after reuse),
+                // reinitialize query context and result.
+                qctx = GridH2QueryContext.get();
+                res = new ArrayList<>();
+
+                assert qctx != null;
+                assert !findCalled;
+            }
+            else {
+                // Cleanup after the previous lookup phase.
+                assert batchLookupId != 0;
+
+                findCalled = false;
+                qctx.putStreams(batchLookupId, null);
+                res.clear();
+            }
+
+            // Reinitialize for the next lookup phase.
+            batchLookupId = qctx.nextBatchLookupId();
+            rangeStreams = new HashMap<>();
+        }
+
+        Object affKey = getAffinityKey(firstRow, lastRow);
+
+        boolean locQry = localQuery();
+
+        List<SegmentKey> segmentKeys;
+
+        if (affKey != null) {
+            // Affinity key is provided.
+            if (affKey == GridH2IndexBase.EXPLICIT_NULL) // Affinity key is 
explicit null, we will not find anything.
+                return false;
+
+            segmentKeys = F.asList(idx.rangeSegment(cctx, qctx, affKey, 
locQry));
+        }
+        else {
+            // Affinity key is not provided or is not the same in upper and 
lower bounds, we have to broadcast.
+            if (broadcastSegments == null)
+                broadcastSegments = idx.broadcastSegments(qctx, cctx, locQry);
+
+            segmentKeys = broadcastSegments;
+        }
+
+        if (locQry && segmentKeys.isEmpty())
+            return false; // Nothing to do
+
+        assert !F.isEmpty(segmentKeys) : segmentKeys;
+
+        final int rangeId = res.size();
+
+        // Create messages.
+        GridH2RowMessage first = idx.toSearchRowMessage(firstRow);
+        GridH2RowMessage last = idx.toSearchRowMessage(lastRow);
+
+        // Range containing upper and lower bounds.
+        GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last);
+
+        // Add range to every message of every participating node.
+        for (int i = 0; i < segmentKeys.size(); i++) {
+            SegmentKey segmentKey = segmentKeys.get(i);
+            assert segmentKey != null;
+
+            RangeStream stream = rangeStreams.get(segmentKey);
+
+            List<GridH2RowRangeBounds> bounds;
+
+            if (stream == null) {
+                stream = new RangeStream(cctx.kernalContext(), idx, qctx, 
segmentKey.node());
+
+                stream.request(GridH2IndexBase.createRequest(qctx, 
batchLookupId, segmentKey.segmentId()));
+                stream.request().bounds(bounds = new ArrayList<>());
+
+                rangeStreams.put(segmentKey, stream);
+            }
+            else
+                bounds = stream.request().bounds();
+
+            bounds.add(rangeBounds);
+
+            // If at least one node will have a full batch then we are ok.
+            if (bounds.size() >= qctx.pageSize())
+                batchFull = true;
+        }
+
+        Future<Cursor> fut = new DoneFuture<>(segmentKeys.size() == 1 ?
+            new UnicastCursor(rangeId, segmentKeys, rangeStreams) :
+            new BroadcastCursor(idx, rangeId, segmentKeys, rangeStreams));
+
+        res.add(fut);
+
+        return true;
+    }
+
+    /**
+     * @param v1 First value.
+     * @param v2 Second value.
+     * @return {@code true} If they equal.
+     */
+    private boolean equal(Value v1, Value v2) {
+        return v1 == v2 || (v1 != null && v2 != null && v1.compareTypeSafe(v2, 
idx.getDatabase().getCompareMode()) == 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isBatchFull() {
+        return batchFull;
+    }
+
+    /**
+     * @return {@code True} if local query execution is enforced.
+     */
+    private boolean localQuery() {
+        assert qctx != null : "Missing query context: " + this;
+
+        return qctx.distributedJoinMode() == LOCAL_ONLY;
+    }
+
+    /**
+     *
+     */
+    private void startStreams() {
+        if (rangeStreams.isEmpty()) {
+            assert res.isEmpty();
+
+            return;
+        }
+
+        qctx.putStreams(batchLookupId, rangeStreams);
+
+        // Start streaming.
+        for (RangeStream stream : rangeStreams.values()) {
+            stream.start();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Future<Cursor>> find() {
+        batchFull = false;
+        findCalled = true;
+
+        startStreams();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset(boolean beforeQry) {
+        if (beforeQry || qctx == null) // Query context can be null if 
addSearchRows was never called.
+            return;
+
+        assert batchLookupId != 0;
+
+        // Do cleanup after the query run.
+        qctx.putStreams(batchLookupId, null);
+        qctx = null; // The same query can be reused multiple times for 
different query contexts.
+        batchLookupId = 0;
+
+        rangeStreams = Collections.emptyMap();
+        broadcastSegments = null;
+        batchFull = false;
+        findCalled = false;
+        res = Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getPlanSQL() {
+        return ucast ? "unicast" : "broadcast";
+    }
+}

Reply via email to