AMashenkov commented on a change in pull request #9081:
URL: https://github.com/apache/ignite/pull/9081#discussion_r655438939



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryReducer.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * This class is responsible for reducing results of cache query. Query 
results are delivered with function
+ * {@link #onPage(UUID, Collection, boolean)}. Note that this reducer deeply 
interacts with corresponding query future
+ * {@link GridCacheQueryFutureAdapter}, so they used the same lock object. It 
guards reducer pages operations
+ * and the future status. Custom reduce logic is applied within {@link 
#next()} and {@link #hasNext()}.
+ *
+ * <T> is a type of cache query result item.
+ */
+public interface CacheQueryReducer<T> {
+    /**
+     * @return Next item.
+     */
+    public T next() throws IgniteCheckedException;
+
+    /**
+     * @return {@code true} if there is a next item, otherwise {@code false}.
+     */
+    public boolean hasNext() throws IgniteCheckedException;
+
+    /**
+     * Offer query result page for reduce. Note that the data collection may 
contain extension of type T.
+     * In such cases data item contains additional payload for custom reducer 
logic.
+     *
+     * @param nodeId Node ID that sent this page.
+     * @param data Page data rows.
+     * @param last Whether this page is last for specified {@code nodeId}.
+     * @return {@code true} if this page is final page for query and no more 
pages are waited, otherwise {@code false}.

Review comment:
       ```suggestion
        * Callback that invoked on receiving a new page. 
        *
        * @param nodeId Node ID that sent this page.
        * @param data Page data rows.
        * @param last Whether this page is last for specified {@code nodeId}.
        * @return {@code true} if this page is final page for query and no more 
pages are waited, {@code false} otherwise.
   ```

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryReducer.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * This class is responsible for reducing results of cache query. Query 
results are delivered with function
+ * {@link #onPage(UUID, Collection, boolean)}. Note that this reducer deeply 
interacts with corresponding query future
+ * {@link GridCacheQueryFutureAdapter}, so they used the same lock object. It 
guards reducer pages operations
+ * and the future status. Custom reduce logic is applied within {@link 
#next()} and {@link #hasNext()}.

Review comment:
       ```suggestion
    * This class is responsible for reducing results of cache query. 
    *
    * Query results are delivered via callback {@link #onPage(UUID, Collection, 
boolean)}.
    * @see GridCacheQueryFutureAdapter.
   ```

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryReducer.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * This class is responsible for reducing results of cache query. Query 
results are delivered with function
+ * {@link #onPage(UUID, Collection, boolean)}. Note that this reducer deeply 
interacts with corresponding query future
+ * {@link GridCacheQueryFutureAdapter}, so they used the same lock object. It 
guards reducer pages operations
+ * and the future status. Custom reduce logic is applied within {@link 
#next()} and {@link #hasNext()}.

Review comment:
       Actually, interface don't imply any coupling with the  query future.
   If you think that this worth to be mentioned, then abstract class or 
concrete implementation would be a better place.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/DistributedCacheQueryReducer.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.UUID;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+
+/**
+ * Reducer for distributed cache query.
+ */
+public interface DistributedCacheQueryReducer<T> extends CacheQueryReducer<T> {
+    /**
+     * Checks whether cache query still runs on specified node. If a query 
finished (send all pages) on this node, then
+     * this method has to return {@code false}.
+     *
+     * @param nodeId Node ID.
+     * @return {@code true} if specified node runs this query.

Review comment:
       ```suggestion
        * Checks whether node with provided {@code nodeId} is a map node for 
the query.
        * Note: if all the pages were received this node, then the method will 
return {@code false}.
        *
        * @param nodeId Node ID.
        * @return {@code true} if specified node is a map node for the query, 
{@code false} otherwise.
   ```

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
##########
@@ -69,205 +50,43 @@ protected 
GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long reqId
         GridCacheQueryManager<K, V> mgr = ctx.queries();
 
         assert mgr != null;
-
-        synchronized (this) {
-            for (ClusterNode node : nodes)
-                subgrid.add(node.id());
-        }
     }
 
     /** {@inheritDoc} */
-    @Override protected void cancelQuery() throws IgniteCheckedException {
-        final GridCacheQueryManager<K, V> qryMgr = cctx.queries();
-
-        assert qryMgr != null;
-
-        try {
-            Collection<ClusterNode> allNodes = cctx.discovery().allNodes();
-            Collection<ClusterNode> nodes;
-
-            synchronized (this) {
-                nodes = F.retain(allNodes, true,
-                    new P1<ClusterNode>() {
-                        @Override public boolean apply(ClusterNode node) {
-                            return !cctx.localNodeId().equals(node.id()) && 
subgrid.contains(node.id());
-                        }
-                    }
-                );
-
-                subgrid.clear();
-            }
-
-            final GridCacheQueryRequest req = new 
GridCacheQueryRequest(cctx.cacheId(),
-                reqId,
-                fields(),
-                qryMgr.queryTopologyVersion(),
-                cctx.deploymentEnabled());
-
-            // Process cancel query directly (without sending) for local node,
-            cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
-                @Override public Object call() {
-                    qryMgr.processQueryRequest(cctx.localNodeId(), req);
-
-                    return null;
-                }
-            });
-
-            if (!nodes.isEmpty()) {
-                for (ClusterNode node : nodes) {
-                    try {
-                        cctx.io().send(node, req, cctx.ioPolicy());
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (cctx.io().checkNodeLeft(node.id(), e, false)) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to send cancel request, node 
failed: " + node);
-                        }
-                        else
-                            U.error(log, "Failed to send cancel request 
[node=" + node + ']', e);
-                    }
-                }
-            }
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send cancel request (will cancel query in 
any case).", e);
-        }
+    @Override protected void cancelQuery() {
+        reducer.onCancel();
 
-        qryMgr.onQueryFutureCanceled(reqId);
+        cctx.queries().onQueryFutureCanceled(reqId);
 
         clear();
     }
 
-    /** {@inheritDoc} */
-    @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+    /** Fail if a node runs this query left cluster. */

Review comment:
       ```suggestion
       /** {@inheritDoc} */
   ```

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
##########
@@ -101,12 +104,30 @@
     /** Event listener. */
     private GridLocalEventListener lsnr;
 
+    /** Requester of cache query result pages. */
+    private CacheQueryPageRequester pageRequester;
+
     /** {@inheritDoc} */
     @Override public void start0() throws IgniteCheckedException {
         super.start0();
 
         assert cctx.config().getCacheMode() != LOCAL;
 
+        pageRequester = new CacheQueryPageRequester(cctx) {
+            /** {@inheritDoc} */

Review comment:
       ```suggestion
   ```

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
##########
@@ -523,57 +544,31 @@ else if (!cancelled.contains(res.requestId()))
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public CacheQueryFuture<?> queryDistributed(GridCacheQueryBean 
qry, final Collection<ClusterNode> nodes) {
+        return queryDistributed(qry, nodes, false);
+    }
+
+    /** */
+    private CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, final 
Collection<ClusterNode> nodes, boolean fields) {
         assert cctx.config().getCacheMode() != LOCAL;
 
         if (log.isDebugEnabled())
             log.debug("Executing distributed query: " + qry);
 
         long reqId = cctx.io().nextIoId();
 
-        final GridCacheDistributedQueryFuture<K, V, ?> fut =
-            new GridCacheDistributedQueryFuture<>(cctx, reqId, qry, nodes);
+        final GridCacheDistributedQueryFuture fut = fields ?
+            new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry)
+            : new GridCacheDistributedQueryFuture(cctx, reqId, qry);

Review comment:
       ```suggestion
               new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry) :
               new GridCacheDistributedQueryFuture(cctx, reqId, qry);
   ```

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
##########
@@ -146,40 +137,50 @@ public GridCacheQueryBean query() {
     /**
      * @return If fields query.
      */
-    boolean fields() {
+    public boolean fields() {
         return false;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean onDone(Collection<R> res, Throwable err) {
-        cctx.time().removeTimeoutObject(this);
-
-        return super.onDone(res, err);
-    }
-
     /** {@inheritDoc} */
     @Override public R next() {
         try {
-            R next = unmaskNull(internalIterator().next());
+            if (!limitDisabled && cnt == capacity)
+                return null;
 
-            cnt.decrementAndGet();
+            checkError();
+
+            R next = null;
+
+            if (reducer().hasNext()) {
+                next = unmaskNull(reducer().next());
+
+                if (!limitDisabled) {
+                    cnt++;
+
+                    // Exceed limit, stop page loading and cancel queries.
+                    if (cnt == capacity)
+                        cancel();
+                }
+            }
+
+            checkError();
 
             return next;
         }
-        catch (NoSuchElementException ignored) {
-            return null;
-        }
         catch (IgniteCheckedException e) {
             throw CU.convertToCacheException(e);
         }
     }
 
+    /** @return Cache query results reducer. */

Review comment:
       ```suggestion
       /** 
       * @return Cache query results reducer. 
       */
   ```

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
##########
@@ -146,40 +137,50 @@ public GridCacheQueryBean query() {
     /**
      * @return If fields query.
      */
-    boolean fields() {
+    public boolean fields() {
         return false;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean onDone(Collection<R> res, Throwable err) {
-        cctx.time().removeTimeoutObject(this);
-
-        return super.onDone(res, err);
-    }
-
     /** {@inheritDoc} */
     @Override public R next() {
         try {
-            R next = unmaskNull(internalIterator().next());
+            if (!limitDisabled && cnt == capacity)
+                return null;
 
-            cnt.decrementAndGet();
+            checkError();
+
+            R next = null;
+
+            if (reducer().hasNext()) {
+                next = unmaskNull(reducer().next());
+
+                if (!limitDisabled) {
+                    cnt++;
+
+                    // Exceed limit, stop page loading and cancel queries.
+                    if (cnt == capacity)
+                        cancel();
+                }
+            }
+
+            checkError();
 
             return next;
         }
-        catch (NoSuchElementException ignored) {
-            return null;
-        }
         catch (IgniteCheckedException e) {
             throw CU.convertToCacheException(e);
         }
     }
 
+    /** @return Cache query results reducer. */
+    protected abstract CacheQueryReducer<R> reducer();
+
     /**
-     * Waits for the first page to be received from remote node(s), if any.
+     * Waits for the first item to be received from remote node(s), if any.
      *
      * @throws IgniteCheckedException If query execution failed with an error.
      */
-    public abstract void awaitFirstPage() throws IgniteCheckedException;
+    public abstract void awaitFirstItem() throws IgniteCheckedException;

Review comment:
       Maybe awaitFirstItemAvailable() will be better.
   Actually, we wait here not for the first item/page from remote, but the 
first item available after the reduction applied.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
##########
@@ -306,10 +221,12 @@ else if (capacity > 0) {
     }
 
     /**
+     * Entrypoint for handling query result page from remote node.
+     *
      * @param nodeId Sender node.
      * @param data Page data.
      * @param err Error (if was).
-     * @param finished Finished or not.
+     * @param finished Whether it is the last page for sender node.

Review comment:
       Maybe 'lastPage' flag would be a better name?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Stream over single node.
+ */
+public class NodePageStream<R> extends PageStream<R> {
+    /** */
+    private final UUID nodeId;
+
+    /** */
+    private R head;
+
+    /** */
+    protected NodePageStream(GridCacheQueryAdapter qry, Object queueLock, long 
timeoutTime,
+        UUID nodeId, BiConsumer<Collection<UUID>, Boolean> reqPages) {
+        super(qry, queueLock, timeoutTime, new HashSet<>(F.asList(nodeId)), 
reqPages);
+
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return Head of stream, that is last item returned with {@link #next()}.
+     */
+    public R head() {

Review comment:
       ```suggestion
       /**
       * Peek the stream head for the next item.
       *
       * Note: requires {@link #hasNext()} to be called at first.
       * @return The item will be returned with {@link #next()}.
       */
       R head() {
   ```

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Stream over single node.
+ */
+public class NodePageStream<R> extends PageStream<R> {

Review comment:
       Let's make the class package visible if possible.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Stream over single node.
+ */
+public class NodePageStream<R> extends PageStream<R> {
+    /** */
+    private final UUID nodeId;
+
+    /** */
+    private R head;
+
+    /** */
+    protected NodePageStream(GridCacheQueryAdapter qry, Object queueLock, long 
timeoutTime,
+        UUID nodeId, BiConsumer<Collection<UUID>, Boolean> reqPages) {
+        super(qry, queueLock, timeoutTime, new HashSet<>(F.asList(nodeId)), 
reqPages);
+
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return Head of stream, that is last item returned with {@link #next()}.
+     */
+    public R head() {
+        return head;
+    }
+
+    /**
+     * @return {@code true} If this stream has next row, {@code false} 
otherwise.
+     */

Review comment:
       ```suggestion
       /** {@inheritDoc} */
   ```

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Stream over single node.
+ */
+public class NodePageStream<R> extends PageStream<R> {
+    /** */
+    private final UUID nodeId;
+
+    /** */
+    private R head;
+
+    /** */
+    protected NodePageStream(GridCacheQueryAdapter qry, Object queueLock, long 
timeoutTime,
+        UUID nodeId, BiConsumer<Collection<UUID>, Boolean> reqPages) {
+        super(qry, queueLock, timeoutTime, new HashSet<>(F.asList(nodeId)), 
reqPages);
+
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return Head of stream, that is last item returned with {@link #next()}.
+     */
+    public R head() {
+        return head;
+    }
+
+    /**
+     * @return {@code true} If this stream has next row, {@code false} 
otherwise.
+     */
+    @Override public boolean hasNext() throws IgniteCheckedException {
+        if (head != null)
+            return true;
+        else {
+            if (!super.hasNext())
+                return false;
+            else
+                return (head = super.next()) != null;
+        }
+    }
+
+    /**
+     * @return Next item from this stream.
+     */

Review comment:
       ```suggestion
       /** {@inheritDoc} */
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to