AMashenkov commented on a change in pull request #9081:
URL: https://github.com/apache/ignite/pull/9081#discussion_r633556166



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
##########
@@ -169,103 +97,30 @@ protected 
GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long reqId
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean onPage(UUID nodeId, boolean last) {
-        assert Thread.holdsLock(this);
-
-        if (!loc) {
-            rcvd.add(nodeId);
-
-            if (rcvd.containsAll(subgrid))
-                firstPageLatch.countDown();
-        }
-
-        boolean futFinish;
-
-        if (last) {
-            futFinish = loc || (subgrid.remove(nodeId) && subgrid.isEmpty());
-
-            if (futFinish)
-                firstPageLatch.countDown();
-        }
-        else
-            futFinish = false;
-
-        return futFinish;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void loadPage() {
-        assert !Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = null;
-
-        synchronized (this) {
-            if (!isDone() && rcvd.containsAll(subgrid)) {
-                rcvd.clear();
-
-                nodes = nodes();
-            }
-        }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, false);
+    @Override protected boolean onPage(@Nullable UUID nodeId, boolean last) {
+        return reducer.onPage(nodeId, last);
     }
 
     /** {@inheritDoc} */
     @Override protected void loadAllPages() throws 
IgniteInterruptedCheckedException {
-        assert !Thread.holdsLock(this);
-
-        U.await(firstPageLatch);
-
-        Collection<ClusterNode> nodes = null;
-
-        synchronized (this) {
-            if (!isDone() && !subgrid.isEmpty())
-                nodes = nodes();
-        }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, true);
-    }
-
-    /**
-     * @return Nodes to send requests to.
-     */
-    private Collection<ClusterNode> nodes() {
-        assert Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = new ArrayList<>(subgrid.size());
-
-        for (UUID nodeId : subgrid) {
-            ClusterNode node = cctx.discovery().node(nodeId);
-
-            if (node != null)
-                nodes.add(node);
-        }
-
-        return nodes;
+        reducer.loadAll();
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(Collection<R> res, Throwable err) {
-        boolean done = super.onDone(res, err);
-
-        // Must release the lath after onDone() in order for a waiting thread 
to see an exception, if any.
-        firstPageLatch.countDown();
-
-        return done;
+    @Override protected Reducer<R> reducer() {
+        return reducer;
     }
 
     /** {@inheritDoc} */
     @Override public boolean onCancelled() {
-        firstPageLatch.countDown();
+        reducer.onLastPage();
 
         return super.onCancelled();
     }
 
     /** {@inheritDoc} */
     @Override public void onTimeout() {
-        firstPageLatch.countDown();
+        reducer.onLastPage();
 
         super.onTimeout();
     }

Review comment:
       Do we need all these callbacks in QueryFutureAdapter just to delegate 
calls to reducer
   and having reducer() at same time?
   I see a pattern you already use: reducer().onPage() and similar.
   

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/MergeSortDistributedCacheQueryReducer.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reducer of distirbuted query that sort result through all nodes. Note that 
it's assumed that every node
+ * returns pre-sorted collection of data.
+ */
+public class MergeSortDistributedCacheQueryReducer<R> extends 
UnsortedDistributedCacheQueryReducer<R> {
+    /** Map of Node ID to stream. Used for inserting new pages. */
+    private final Map<UUID, NodePageStream> streamsMap;
+
+    /** Array of streams. Used for iteration over result pages. */
+    private final NodePageStream[] streams;
+
+    /** If {@code true} then there wasn't call {@link #hasNext()} on this 
reducer. */
+    private boolean first = true;
+
+    /**
+     * Offset of current stream to get next value. This offset only increments 
if a stream is done. The array {@link
+     * #streams} is sorted to put stream with lowest value on this offset.
+     */
+    private int streamOff;
+
+    /** Compares head of streams to get lowest value at the moment. */
+    private final Comparator<NodePageStream> streamCmp;
+
+    /**
+     * @param rowCmp Comparator to sort query results from different nodes.
+     */
+    public MergeSortDistributedCacheQueryReducer(GridCacheQueryFutureAdapter 
fut, long reqId,
+        CacheQueryPageRequester fetcher, Collection<ClusterNode> nodes, 
Comparator<R> rowCmp
+    ) {
+        super(fut, reqId, fetcher, nodes);
+
+        streamsMap = new ConcurrentHashMap<>(nodes.size());
+        streams = (NodePageStream[])Array.newInstance(NodePageStream.class, 
nodes.size());
+
+        int i = 0;
+
+        for (ClusterNode node : nodes) {
+            streams[i] = new NodePageStream(node.id());
+            streamsMap.put(node.id(), streams[i++]);
+        }
+
+        streamCmp = (o1, o2) -> {
+            if (o1 == o2)
+                return 0;
+
+            if (o1 == null)
+                return -1;
+
+            if (o2 == null)
+                return 1;
+
+            return rowCmp.compare(o1.head, o2.head);
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public R next() throws IgniteCheckedException {
+        for (int i = streamOff; i < streams.length; i++) {
+            NodePageStream s = streams[i];
+
+            // Skip if stream has head: on previous next() head extracts from 
stream[streamOff], others streams do
+            // have head or it is the first run.
+            if (s.head() != null)
+                break;
+
+            if (!s.hasNext()) {
+                // Nullify obsolete stream, move left bound.
+                streamsMap.remove(s.nodeId);
+                streams[i] = null;
+                streamOff++;
+            }
+            else {
+                // Prefetch head value.
+                s.next();
+            }
+        }
+
+        if (finished())
+            throw new NoSuchElementException("No next element. Please, be sure 
to invoke hasNext() before next().");
+
+        if (first) {
+            first = false;
+
+            Arrays.sort(streams, streamCmp);
+        } else
+            bubbleUp(streams, streamOff, streamCmp);
+
+        return streams[streamOff].get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() throws IgniteCheckedException {
+        for (int i = streamOff; i < streams.length; i++) {
+            if (streams[i].hasNext())
+                return true;
+
+            // Nullify obsolete stream, move left bound.
+            streamsMap.remove(streams[i].nodeId);
+            streams[i] = null;
+            streamOff++;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void loadPage() {
+        assert !Thread.holdsLock(queueLock());
+
+        List<UUID> nodes;
+
+        synchronized (queueLock()) {

Review comment:
       You use the same lock to synchronize access to streams[] and all queues 
in all NodeStrems objects.
   What is the purpose to use a shared lock?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/MergeSortDistributedCacheQueryReducer.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reducer of distirbuted query that sort result through all nodes. Note that 
it's assumed that every node
+ * returns pre-sorted collection of data.
+ */
+public class MergeSortDistributedCacheQueryReducer<R> extends 
UnsortedDistributedCacheQueryReducer<R> {

Review comment:
       MergeSorted reducer uses per-node Stream, while Unsorted uses single 
shared Stream and they have different logic of merging results.
   Why do you extend Unsorted reducer instead of Abstract one here?
   Some common fields can be safely pulled up into the Abstract reduceer.
   
   Also, I'd move all these classes hierarchy to the package  'reducer' and 
made PageStream/NodeStream to top-level classes, with the package visibility. 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/UnsortedDistributedCacheQueryReducer.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reducer of distributed query, fetch pages from remote nodes. All pages go 
in single page stream so no ordering is provided.
+ */
+class UnsortedDistributedCacheQueryReducer<R> extends 
AbstractCacheQueryReducer<R> implements DistributedCacheQueryReducer<R> {
+    /**
+     * Whether it is allowed to send cache query result requests to nodes.
+     * It is set to {@code false} if a query finished or failed.
+     */
+    protected volatile boolean loadAllowed;
+
+    /** Query request ID. */
+    protected final long reqId;
+
+    /**
+     * Dynamic collection of nodes that run this query. If a node finishes 
this query then remove it from this colleciton.
+     */
+    protected final Collection<UUID> subgrid = new HashSet<>();
+
+    /**
+     * List of nodes that respons with cache query result pages. This 
collection should be cleaned before sending new
+     * cache query request.
+     */
+    protected final Collection<UUID> rcvd = new HashSet<>();
+
+    /** Requester of cache query result pages. */
+    protected final CacheQueryPageRequester pageRequester;
+
+    /** Cache context. */
+    protected final GridCacheContext cctx;
+
+    /** Count down this latch when every node responses on initial cache query 
request. */
+    private final CountDownLatch firstPageLatch = new CountDownLatch(1);
+
+    /** Single page stream. */
+    private final PageStream pageStream;
+
+    /** Query future. */
+    protected final GridCacheQueryFutureAdapter fut;
+
+    /**
+     * @param reqId Cache query request ID.
+     * @param pageRequester Provides a functionality to request pages from 
remote nodes.
+     * @param nodes Collection of nodes this query applies to.
+     */
+    UnsortedDistributedCacheQueryReducer(GridCacheQueryFutureAdapter fut, long 
reqId, CacheQueryPageRequester pageRequester,
+        Collection<ClusterNode> nodes) {
+        super(fut);
+
+        this.reqId = reqId;
+        this.pageRequester = pageRequester;
+
+        synchronized (queueLock()) {
+            for (ClusterNode node : nodes)
+                subgrid.add(node.id());
+        }
+
+        cctx = fut.cctx;
+
+        pageStream = new PageStream();
+
+        this.fut = fut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() throws IgniteCheckedException {
+        return pageStream.hasNext();
+    }
+
+    /** {@inheritDoc} */
+    @Override public R next() throws IgniteCheckedException {
+        return pageStream.next();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addPage(@Nullable UUID nodeId, Collection<R> data) {
+        pageStream.addPage(data);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onLastPage() {
+        super.onLastPage();
+
+        loadAllowed = false;
+
+        firstPageLatch.countDown();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        Collection<ClusterNode> allNodes = cctx.discovery().allNodes();
+        Collection<ClusterNode> nodes;
+
+        synchronized (queueLock()) {
+            nodes = F.retain(allNodes, true,
+                new P1<ClusterNode>() {
+                    @Override public boolean apply(ClusterNode node) {
+                        return !cctx.localNodeId().equals(node.id()) && 
subgrid.contains(node.id());
+                    }
+                }
+            );
+
+            rcvd.clear();
+            subgrid.clear();
+        }
+
+        pageRequester.cancelQueryRequest(reqId, nodes, fut.fields());
+
+        pageStream.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void loadPage() {
+        assert !Thread.holdsLock(queueLock());
+
+        Collection<UUID> nodes = null;
+
+        synchronized (queueLock()) {
+            // Loads only queue is empty to avoid memory consumption on 
additional pages.
+            if (!pageStream.queue.isEmpty())
+                return;
+
+            if (loadAllowed && rcvd.containsAll(subgrid)) {
+                rcvd.clear();
+
+                nodes = new ArrayList<>(subgrid);
+            }
+        }
+
+        if (nodes != null)
+            pageRequester.requestPages(reqId, fut, nodes, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadAll() throws IgniteInterruptedCheckedException {
+        assert !Thread.holdsLock(queueLock());
+
+        U.await(firstPageLatch);
+
+        Collection<UUID> nodes = null;
+
+        synchronized (queueLock()) {
+            if (loadAllowed && !subgrid.isEmpty())
+                nodes = new ArrayList<>(subgrid);
+        }
+
+        if (nodes != null)
+            pageRequester.requestPages(reqId, fut, nodes, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onPage(@Nullable UUID nodeId, boolean last) {

Review comment:
       onPage with `boolean last` and onLastPage() looks confusing.
   Looks like an architectural lack.
   What is the difference between these methods?  

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/DistributedCacheQueryReducer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.UUID;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reducer for distributed cache query.
+ */
+public interface DistributedCacheQueryReducer<T> extends CacheQueryReducer<T> {

Review comment:
       Let's have separate classes/interfaces hierarchy for local and 
distributed queries.
   Because they have different contracts for similar methods. E.g. onLastPage() 
and onPage(nodeId, true);
   Having both methods looks ambiguous.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/DistributedCacheQueryReducer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.UUID;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reducer for distributed cache query.
+ */
+public interface DistributedCacheQueryReducer<T> extends CacheQueryReducer<T> {
+    /**
+     * Callback that invoked after getting a page from remote node. Checks 
whether it is the last page for query or not.
+     *
+     * @param nodeId Node ID of remote page.
+     * @param last Whether page is last for specified node.
+     * @return Whether page is last for a query.
+     */
+    public boolean onPage(@Nullable UUID nodeId, boolean last);
+
+    /**
+     * Loads full cache query result pages from remote nodes. It can be done 
for speedup operation if user invokes
+     * get() on {@link GridCacheQueryFutureAdapter} instead of using it as 
iterator.
+     *
+     * @throws IgniteInterruptedCheckedException If thread is interrupted.
+     */
+    public void loadAll() throws IgniteInterruptedCheckedException;
+
+    /**
+     * Callback to handle node left.
+     *
+     * @param nodeId Node ID that left a cluster.
+     * @return {@code true} if specified node runs this query.
+     */
+    public boolean onNodeLeft(UUID nodeId);
+
+    /** Blocks while reducer doesn't get first result item for this query. */
+    public void awaitFirstItem() throws InterruptedException;

Review comment:
       ```suggestion
       /** 
       * Blocks current thread until reducer will be ready to return the very 
first result item for the query. 
       */
       public void awaitInitialization() throws InterruptedException;
   ```

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/DistributedCacheQueryReducer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.UUID;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reducer for distributed cache query.
+ */
+public interface DistributedCacheQueryReducer<T> extends CacheQueryReducer<T> {
+    /**
+     * Callback that invoked after getting a page from remote node. Checks 
whether it is the last page for query or not.
+     *
+     * @param nodeId Node ID of remote page.
+     * @param last Whether page is last for specified node.
+     * @return Whether page is last for a query.
+     */
+    public boolean onPage(@Nullable UUID nodeId, boolean last);
+
+    /**
+     * Loads full cache query result pages from remote nodes. It can be done 
for speedup operation if user invokes
+     * get() on {@link GridCacheQueryFutureAdapter} instead of using it as 
iterator.
+     *
+     * @throws IgniteInterruptedCheckedException If thread is interrupted.
+     */
+    public void loadAll() throws IgniteInterruptedCheckedException;
+
+    /**
+     * Callback to handle node left.
+     *
+     * @param nodeId Node ID that left a cluster.
+     * @return {@code true} if specified node runs this query.

Review comment:
       It is unclear, what result will be returned if the node has gone after 
the last page was received and how it should be interpreted.
   Or what if the node has gone, but we already received sufficient data for 
finishing the query, 
   e.g. query has limits, but the user app consumes data slowly. 
   
   I think Reducer should only check internal invariants and mark node as 
failed.
   And caller should recheck 'if the reducer is alive' right after calling this 
method.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/DistributedCacheQueryReducer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.UUID;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reducer for distributed cache query.
+ */
+public interface DistributedCacheQueryReducer<T> extends CacheQueryReducer<T> {
+    /**
+     * Callback that invoked after getting a page from remote node. Checks 
whether it is the last page for query or not.
+     *
+     * @param nodeId Node ID of remote page.
+     * @param last Whether page is last for specified node.
+     * @return Whether page is last for a query.

Review comment:
       It looks like the method is called when we got a new page and notify the 
Reducer.
   What the purpose to get a flag to the caller back?
   Why reducer can't apply additional logic by itself or call a listener (like 
in true OOP) instead of return magic value.
   
   What the purpose of the `last` parameter? What does `false` value mean and 
what is the impact to return value?
   

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/DistributedCacheQueryReducer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.UUID;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reducer for distributed cache query.
+ */
+public interface DistributedCacheQueryReducer<T> extends CacheQueryReducer<T> {
+    /**
+     * Callback that invoked after getting a page from remote node. Checks 
whether it is the last page for query or not.
+     *
+     * @param nodeId Node ID of remote page.
+     * @param last Whether page is last for specified node.
+     * @return Whether page is last for a query.
+     */
+    public boolean onPage(@Nullable UUID nodeId, boolean last);
+
+    /**
+     * Loads full cache query result pages from remote nodes. It can be done 
for speedup operation if user invokes
+     * get() on {@link GridCacheQueryFutureAdapter} instead of using it as 
iterator.
+     *
+     * @throws IgniteInterruptedCheckedException If thread is interrupted.
+     */
+    public void loadAll() throws IgniteInterruptedCheckedException;

Review comment:
       Ok. One can forcibly load all the pages, but where will the data be 
returned or saved?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/DistributedCacheQueryReducer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.UUID;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reducer for distributed cache query.
+ */
+public interface DistributedCacheQueryReducer<T> extends CacheQueryReducer<T> {
+    /**
+     * Callback that invoked after getting a page from remote node. Checks 
whether it is the last page for query or not.
+     *
+     * @param nodeId Node ID of remote page.
+     * @param last Whether page is last for specified node.
+     * @return Whether page is last for a query.
+     */
+    public boolean onPage(@Nullable UUID nodeId, boolean last);
+
+    /**
+     * Loads full cache query result pages from remote nodes. It can be done 
for speedup operation if user invokes
+     * get() on {@link GridCacheQueryFutureAdapter} instead of using it as 
iterator.
+     *
+     * @throws IgniteInterruptedCheckedException If thread is interrupted.
+     */
+    public void loadAll() throws IgniteInterruptedCheckedException;
+
+    /**
+     * Callback to handle node left.
+     *
+     * @param nodeId Node ID that left a cluster.
+     * @return {@code true} if specified node runs this query.
+     */
+    public boolean onNodeLeft(UUID nodeId);
+
+    /** Blocks while reducer doesn't get first result item for this query. */
+    public void awaitFirstItem() throws InterruptedException;
+
+    /** Callback that invokes when this query is cancelled. */
+    public void cancel();

Review comment:
       ```suggestion
       /** 
       * Callback is invoked on the query cancellation. 
       */
       public void onCancel();
   ```

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryReducer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This class is responsible for reducing results of cache query. Query 
results are delivered with function
+ * {@link #addPage(UUID, Collection)}. Note that this reducer deeply interacts 
with corresponding query future
+ * {@link GridCacheQueryFutureAdapter}, so they used the same lock object. It 
guards reducer pages operations
+ * and the future status. Custom reduce logic is applied within {@link 
#next()} and {@link #hasNext()}.
+ *
+ * <T> is a type of cache query result item.
+ */
+public interface CacheQueryReducer<T> {
+    /**
+     * @return Next item.
+     */
+    public T next() throws IgniteCheckedException;
+
+    /**
+     * @return {@code true} if there is a next item, otherwise {@code false}.
+     */
+    public boolean hasNext() throws IgniteCheckedException;
+
+    /**
+     * Offer query result page for reduce. Note that the data collection may 
contain extension of type T.
+     * In such cases it stores additional payload for custom reducer logic.
+     *
+     * @param nodeId Node ID that sent this page. {@code null} means local 
node or error page.
+     * @param data Page data rows.
+     */
+    public void addPage(@Nullable UUID nodeId, Collection<T> data);
+
+    /**
+     * Callback that invokes after reducer get last query result page.
+     * Also invokes for failed queries to let reducer know that there won't be 
new pages.

Review comment:
       Why there is no onCancel() or onError() callbacks that will cause 
Exception throwing in `next` method?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/MergeSortDistributedCacheQueryReducer.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reducer of distirbuted query that sort result through all nodes. Note that 
it's assumed that every node
+ * returns pre-sorted collection of data.
+ */
+public class MergeSortDistributedCacheQueryReducer<R> extends 
UnsortedDistributedCacheQueryReducer<R> {
+    /** Map of Node ID to stream. Used for inserting new pages. */
+    private final Map<UUID, NodePageStream> streamsMap;
+
+    /** Array of streams. Used for iteration over result pages. */
+    private final NodePageStream[] streams;
+
+    /** If {@code true} then there wasn't call {@link #hasNext()} on this 
reducer. */
+    private boolean first = true;
+
+    /**
+     * Offset of current stream to get next value. This offset only increments 
if a stream is done. The array {@link
+     * #streams} is sorted to put stream with lowest value on this offset.
+     */
+    private int streamOff;
+
+    /** Compares head of streams to get lowest value at the moment. */
+    private final Comparator<NodePageStream> streamCmp;
+
+    /**
+     * @param rowCmp Comparator to sort query results from different nodes.
+     */
+    public MergeSortDistributedCacheQueryReducer(GridCacheQueryFutureAdapter 
fut, long reqId,
+        CacheQueryPageRequester fetcher, Collection<ClusterNode> nodes, 
Comparator<R> rowCmp
+    ) {
+        super(fut, reqId, fetcher, nodes);
+
+        synchronized (queueLock()) {
+            streamsMap = new ConcurrentHashMap<>(nodes.size());
+            streams = 
(NodePageStream[])Array.newInstance(NodePageStream.class, nodes.size());
+
+            int i = 0;
+
+            for (ClusterNode node : nodes) {
+                streams[i] = new NodePageStream(node.id());
+                streamsMap.put(node.id(), streams[i++]);
+            }
+        }
+
+        streamCmp = (o1, o2) -> {
+            if (o1 == o2)
+                return 0;
+
+            if (o1 == null)
+                return -1;
+
+            if (o2 == null)
+                return 1;
+
+            return rowCmp.compare(o1.head, o2.head);
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public R next() throws IgniteCheckedException {
+        for (int i = streamOff; i < streams.length; i++) {
+            NodePageStream s = streams[i];
+
+            // Skip if stream has head: on previous next() head extracts from 
stream[streamOff], others streams do
+            // have head or it is the first run.
+            if (s.head() != null)
+                break;
+
+            if (!s.hasNext()) {
+                // Nullify obsolete stream, move left bound.
+                streamsMap.remove(s.nodeId);
+                streams[i] = null;
+                streamOff++;
+            }
+            else {
+                // Prefetch head value.
+                s.next();
+            }
+        }
+
+        if (finished())
+            throw new NoSuchElementException("No next element. Please, be sure 
to invoke hasNext() before next().");
+
+        if (first) {
+            first = false;
+
+            Arrays.sort(streams, streamCmp);
+        } else
+            bubbleUp(streams, streamOff, streamCmp);
+
+        return streams[streamOff].get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() throws IgniteCheckedException {
+        for (int i = streamOff; i < streams.length; i++) {
+            if (streams[i].hasNext())
+                return true;
+
+            // Nullify obsolete stream, move left bound.
+            streamsMap.remove(streams[i].nodeId);
+            streams[i] = null;
+            streamOff++;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void loadPage() {
+        assert !Thread.holdsLock(queueLock());
+
+        List<UUID> nodes;
+
+        synchronized (queueLock()) {
+            if (!loadAllowed)
+                return;
+
+            nodes = new ArrayList<>();
+
+            for (int i = streamOff; i < streams.length; i++) {
+                UUID nodeId = streams[i].nodeId;
+
+                // Try to contain 2 pages for every stream to avoid waits. A 
node has to be present in collections:
+                // 1. rcvd - as requested and already handled before, so 
currently no parallel request to this node.
+                // 2. subgrid - as this node still has pages to request.
+                if (streams[i].queue.size() < 1 && rcvd.remove(nodeId) && 
subgrid.contains(nodeId))
+                    nodes.add(nodeId);
+            }
+        }
+
+        if (nodes != null)
+            pageRequester.requestPages(reqId, fut, nodes, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onPage(@Nullable UUID nodeId, boolean last) {
+        boolean qryLast = super.onPage(nodeId, last);
+
+        if (nodeId == null)
+            nodeId = cctx.localNodeId();
+
+        if (last && streamsMap.containsKey(nodeId)) {
+            streamsMap.get(nodeId).allPagesReady = true;
+
+            streamsMap.remove(nodeId);
+        }
+
+        return qryLast;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addPage(@Nullable UUID nodeId, Collection<R> data) {
+        assert Thread.holdsLock(queueLock());
+
+        // Local node.
+        if (nodeId == null) {
+            nodeId = cctx.localNodeId();
+
+            // If nodeId is NULL and query doesn't execute on local node, then 
it is error, notify all streams.
+            if (!streamsMap.containsKey(nodeId)) {
+                assert data.isEmpty();
+
+                for (PageStream stream: streamsMap.values())
+                    stream.addPage(data);

Review comment:
       So, why we have a magic constant NULL instead of a separate callback for 
this with error as a param?
   It is totally unclear, why UnsortedReducer doesn't have a similar null 
check...
   
   Also, passing an empty page looks like a magic constant as well.
   Do we need to notify other streams? Do we know at this point, that 
prefetched result (failed) will ever be needed? 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/UnsortedDistributedCacheQueryReducer.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reducer of distributed query, fetch pages from remote nodes. All pages go 
in single page stream so no ordering is provided.
+ */
+class UnsortedDistributedCacheQueryReducer<R> extends 
AbstractCacheQueryReducer<R> implements DistributedCacheQueryReducer<R> {
+    /**
+     * Whether it is allowed to send cache query result requests to nodes.
+     * It is set to {@code false} if a query finished or failed.
+     */
+    protected volatile boolean loadAllowed;
+
+    /** Query request ID. */
+    protected final long reqId;
+
+    /**
+     * Dynamic collection of nodes that run this query. If a node finishes 
this query then remove it from this colleciton.
+     */
+    protected final Collection<UUID> subgrid = new HashSet<>();
+
+    /**
+     * List of nodes that respons with cache query result pages. This 
collection should be cleaned before sending new
+     * cache query request.
+     */
+    protected final Collection<UUID> rcvd = new HashSet<>();
+
+    /** Requester of cache query result pages. */
+    protected final CacheQueryPageRequester pageRequester;
+
+    /** Cache context. */
+    protected final GridCacheContext cctx;
+
+    /** Count down this latch when every node responses on initial cache query 
request. */
+    private final CountDownLatch firstPageLatch = new CountDownLatch(1);
+
+    /** Single page stream. */
+    private final PageStream pageStream;
+
+    /** Query future. */
+    protected final GridCacheQueryFutureAdapter fut;
+
+    /**
+     * @param reqId Cache query request ID.
+     * @param pageRequester Provides a functionality to request pages from 
remote nodes.
+     * @param nodes Collection of nodes this query applies to.
+     */
+    UnsortedDistributedCacheQueryReducer(GridCacheQueryFutureAdapter fut, long 
reqId, CacheQueryPageRequester pageRequester,
+        Collection<ClusterNode> nodes) {
+        super(fut);
+
+        this.reqId = reqId;
+        this.pageRequester = pageRequester;
+
+        synchronized (queueLock()) {
+            for (ClusterNode node : nodes)
+                subgrid.add(node.id());
+        }
+
+        cctx = fut.cctx;
+
+        pageStream = new PageStream();
+
+        this.fut = fut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() throws IgniteCheckedException {
+        return pageStream.hasNext();
+    }
+
+    /** {@inheritDoc} */
+    @Override public R next() throws IgniteCheckedException {
+        return pageStream.next();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addPage(@Nullable UUID nodeId, Collection<R> data) {
+        pageStream.addPage(data);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onLastPage() {
+        super.onLastPage();
+
+        loadAllowed = false;
+
+        firstPageLatch.countDown();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        Collection<ClusterNode> allNodes = cctx.discovery().allNodes();
+        Collection<ClusterNode> nodes;
+
+        synchronized (queueLock()) {
+            nodes = F.retain(allNodes, true,
+                new P1<ClusterNode>() {
+                    @Override public boolean apply(ClusterNode node) {
+                        return !cctx.localNodeId().equals(node.id()) && 
subgrid.contains(node.id());
+                    }
+                }
+            );
+
+            rcvd.clear();
+            subgrid.clear();
+        }
+
+        pageRequester.cancelQueryRequest(reqId, nodes, fut.fields());
+
+        pageStream.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void loadPage() {
+        assert !Thread.holdsLock(queueLock());
+
+        Collection<UUID> nodes = null;
+
+        synchronized (queueLock()) {
+            // Loads only queue is empty to avoid memory consumption on 
additional pages.
+            if (!pageStream.queue.isEmpty())
+                return;
+
+            if (loadAllowed && rcvd.containsAll(subgrid)) {
+                rcvd.clear();
+
+                nodes = new ArrayList<>(subgrid);
+            }
+        }
+
+        if (nodes != null)
+            pageRequester.requestPages(reqId, fut, nodes, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadAll() throws IgniteInterruptedCheckedException {
+        assert !Thread.holdsLock(queueLock());
+
+        U.await(firstPageLatch);
+
+        Collection<UUID> nodes = null;
+
+        synchronized (queueLock()) {
+            if (loadAllowed && !subgrid.isEmpty())
+                nodes = new ArrayList<>(subgrid);
+        }
+
+        if (nodes != null)
+            pageRequester.requestPages(reqId, fut, nodes, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onPage(@Nullable UUID nodeId, boolean last) {
+        assert Thread.holdsLock(queueLock());
+
+        if (nodeId == null)
+            nodeId = cctx.localNodeId();
+
+        rcvd.add(nodeId);
+
+        if (!loadAllowed && rcvd.containsAll(subgrid) ) {
+            firstPageLatch.countDown();
+            loadAllowed = true;
+        }
+
+        return last && subgrid.remove(nodeId) && subgrid.isEmpty();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        synchronized (queueLock()) {
+            return subgrid.contains(nodeId);

Review comment:
       You can have 2 separate calls instead.
   fut.onNodeLeft -> rdc.onNodeLeft 
   fut.onNodeLeft -> rdc.isFailed/isFinished

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/MergeSortDistributedCacheQueryReducer.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reducer of distirbuted query that sort result through all nodes. Note that 
it's assumed that every node
+ * returns pre-sorted collection of data.
+ */
+public class MergeSortDistributedCacheQueryReducer<R> extends 
UnsortedDistributedCacheQueryReducer<R> {
+    /** Map of Node ID to stream. Used for inserting new pages. */
+    private final Map<UUID, NodePageStream> streamsMap;
+
+    /** Array of streams. Used for iteration over result pages. */
+    private final NodePageStream[] streams;
+
+    /** If {@code true} then there wasn't call {@link #hasNext()} on this 
reducer. */
+    private boolean first = true;
+
+    /**
+     * Offset of current stream to get next value. This offset only increments 
if a stream is done. The array {@link
+     * #streams} is sorted to put stream with lowest value on this offset.
+     */
+    private int streamOff;
+
+    /** Compares head of streams to get lowest value at the moment. */
+    private final Comparator<NodePageStream> streamCmp;
+
+    /**
+     * @param rowCmp Comparator to sort query results from different nodes.
+     */
+    public MergeSortDistributedCacheQueryReducer(GridCacheQueryFutureAdapter 
fut, long reqId,
+        CacheQueryPageRequester fetcher, Collection<ClusterNode> nodes, 
Comparator<R> rowCmp
+    ) {
+        super(fut, reqId, fetcher, nodes);
+
+        synchronized (queueLock()) {
+            streamsMap = new ConcurrentHashMap<>(nodes.size());
+            streams = 
(NodePageStream[])Array.newInstance(NodePageStream.class, nodes.size());
+
+            int i = 0;
+
+            for (ClusterNode node : nodes) {
+                streams[i] = new NodePageStream(node.id());
+                streamsMap.put(node.id(), streams[i++]);
+            }
+        }
+
+        streamCmp = (o1, o2) -> {
+            if (o1 == o2)
+                return 0;
+
+            if (o1 == null)
+                return -1;
+
+            if (o2 == null)
+                return 1;
+
+            return rowCmp.compare(o1.head, o2.head);
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public R next() throws IgniteCheckedException {
+        for (int i = streamOff; i < streams.length; i++) {
+            NodePageStream s = streams[i];
+
+            // Skip if stream has head: on previous next() head extracts from 
stream[streamOff], others streams do
+            // have head or it is the first run.
+            if (s.head() != null)
+                break;

Review comment:
       After you got the last item from the first page, you will be blocked on 
`next()` call until the second page will be received from the node, otherwise 
you will not be able to resort streams correctly.
   You need to have at least one page from every node to be ready at any time 
for non-blocking reducing.
   
   I wouldn't bet on equal data distribution. Assume, one node will have the 
largest part of the query result,
   then you will end up in synchronous ping-pong with a single node.
   
   Effective prefetching is to request next page right after you start 
processing the previous one.
   If you bother about fetching few additional pages, then you can configure 
twice smaller page, aren't you?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to