IGNITE-10621: SQL: Collect running query info for all types of queries. This 
closes #5620. This closes #5663.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/592cc346
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/592cc346
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/592cc346

Branch: refs/heads/ignite-601
Commit: 592cc3466b3b89d3b5c176400536cc91d21d7ab0
Parents: 383d8b2
Author: Yuriy Gerzhedovich <ygerzhedov...@gridgain.com>
Authored: Tue Dec 25 15:33:49 2018 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Tue Dec 25 15:33:49 2018 +0300

----------------------------------------------------------------------
 .../thin/JdbcThinStreamingAbstractSelfTest.java |   5 +-
 .../processors/bulkload/BulkLoadProcessor.java  |  25 +-
 .../processors/query/GridQueryIndexing.java     |   9 +-
 .../processors/query/GridQueryProcessor.java    |   6 +-
 .../processors/query/RunningQueryManager.java   | 130 +++++
 ...IgniteClientCacheInitializationFailTest.java |   7 +-
 .../ignite/testframework/GridTestUtils.java     |  35 ++
 .../cache/query/RegisteredQueryCursor.java      |  65 +++
 .../query/h2/DmlStatementsProcessor.java        | 128 ++--
 .../processors/query/h2/IgniteH2Indexing.java   | 334 ++++++-----
 .../query/h2/twostep/DistributedUpdateRun.java  |  15 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  66 +--
 .../query/h2/twostep/ReduceQueryRun.java        |  25 +-
 .../processors/query/RunningQueriesTest.java    | 578 ++++++++++++++++++-
 14 files changed, 1092 insertions(+), 336 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
index 70dc781..a99274f 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
@@ -522,10 +522,11 @@ public abstract class JdbcThinStreamingAbstractSelfTest 
extends JdbcStreamingSel
         /** {@inheritDoc} */
         @Override public List<FieldsQueryCursor<List<?>>> 
querySqlFields(String schemaName, SqlFieldsQuery qry,
             @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean 
failOnMultipleStmts, MvccQueryTracker tracker,
-            GridQueryCancel cancel) {
+            GridQueryCancel cancel, boolean registerAsNewQry) {
             IndexingWithContext.cliCtx = cliCtx;
 
-            return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, 
failOnMultipleStmts, tracker, cancel);
+            return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, 
failOnMultipleStmts, tracker, cancel,
+                registerAsNewQry);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
index ccf3e25..9dba60b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
@@ -17,14 +17,14 @@
 
 package org.apache.ignite.internal.processors.bulkload;
 
+import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.internal.processors.query.RunningQueryManager;
 import org.apache.ignite.internal.util.lang.IgniteClosureX;
 import org.apache.ignite.lang.IgniteBiTuple;
 
-import java.util.List;
-
 /**
  * Bulk load (COPY) command processor used on server to keep various context 
data and process portions of input
  * received from the client side.
@@ -45,6 +45,12 @@ public class BulkLoadProcessor implements AutoCloseable {
     /** Becomes true after {@link #close()} method is called. */
     private boolean isClosed;
 
+    /** Running query manager. */
+    private final RunningQueryManager runningQryMgr;
+
+    /** Query id. */
+    private final Long qryId;
+
     /**
      * Creates bulk load processor.
      *
@@ -52,12 +58,16 @@ public class BulkLoadProcessor implements AutoCloseable {
      * @param dataConverter Converter, which transforms the list of strings 
parsed from the input stream to the
      *     key+value entry to add to the cache.
      * @param outputStreamer Streamer that puts actual key/value into the 
cache.
+     * @param runningQryMgr Running query manager.
+     * @param qryId Running query id.
      */
     public BulkLoadProcessor(BulkLoadParser inputParser, 
IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter,
-        BulkLoadCacheWriter outputStreamer) {
+        BulkLoadCacheWriter outputStreamer, RunningQueryManager runningQryMgr, 
Long qryId) {
         this.inputParser = inputParser;
         this.dataConverter = dataConverter;
         this.outputStreamer = outputStreamer;
+        this.runningQryMgr = runningQryMgr;
+        this.qryId = qryId;
         isClosed = false;
     }
 
@@ -97,8 +107,13 @@ public class BulkLoadProcessor implements AutoCloseable {
         if (isClosed)
             return;
 
-        isClosed = true;
+        try {
+            isClosed = true;
 
-        outputStreamer.close();
+            outputStreamer.close();
+        }
+        finally {
+            runningQryMgr.unregister(qryId);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 2abafab..7ee8069 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -86,10 +86,13 @@ public interface GridQueryIndexing {
      * @param keepBinary Keep binary flag.
      * @param failOnMultipleStmts Whether an exception should be thrown for 
multiple statements query.
      * @param tracker Query tracker.
+     * @param registerAsNewQry {@code true} In case it's new query which 
should be registered as running query,
+     * {@code false} otherwise.
      * @return Cursor.
      */
     public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, 
SqlFieldsQuery qry,
-        SqlClientContext cliCtx, boolean keepBinary, boolean 
failOnMultipleStmts, MvccQueryTracker tracker, GridQueryCancel cancel);
+        SqlClientContext cliCtx, boolean keepBinary, boolean 
failOnMultipleStmts, MvccQueryTracker tracker,
+        GridQueryCancel cancel, boolean registerAsNewQry);
 
     /**
      * Execute an INSERT statement using data streamer as receiver.
@@ -125,10 +128,12 @@ public interface GridQueryIndexing {
      * @param keepBinary Keep binary flag.
      * @param filter Cache name and key filter.
      * @param cancel Query cancel.
+     * @param qryId Running query id. {@code null} in case query is not 
registered.
      * @return Cursor.
      */
     public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, 
SqlFieldsQuery qry,
-        boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel 
cancel) throws IgniteCheckedException;
+        boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel,
+        @Nullable Long qryId) throws IgniteCheckedException;
 
     /**
      * Executes text query.

http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index bf2f943..c1edbbb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -64,8 +64,8 @@ import 
org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
@@ -2169,8 +2169,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                     @Override public List<FieldsQueryCursor<List<?>>> applyx() 
{
                         GridQueryCancel cancel = new GridQueryCancel();
 
-                        List<FieldsQueryCursor<List<?>>> res =
-                            idx.querySqlFields(schemaName, qry, cliCtx, 
keepBinary, failOnMultipleStmts, null, cancel);
+                        List<FieldsQueryCursor<List<?>>> res = 
idx.querySqlFields(schemaName, qry, cliCtx,
+                            keepBinary, failOnMultipleStmts, null, cancel, 
true);
 
                         if (cctx != null)
                             sendQueryExecutedEvent(qry.getSql(), 
qry.getArgs(), cctx, qryType);

http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
new file mode 100644
index 0000000..d86d1f2
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Keep information about all running queries.
+ */
+public class RunningQueryManager {
+    /** Keep registered user queries. */
+    private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new 
ConcurrentHashMap<>();
+
+    /** Unique id for queries on single node. */
+    private final AtomicLong qryIdGen = new AtomicLong();
+
+    /**
+     * Register running query.
+     *
+     * @param qry Query text.
+     * @param qryType Query type.
+     * @param schemaName Schema name.
+     * @param loc Local query flag.
+     * @param cancel Query cancel. Should be passed in case query is 
cancelable, or {@code null} otherwise.
+     * @return Registered RunningQueryInfo.
+     */
+    public GridRunningQueryInfo register(String qry, GridCacheQueryType 
qryType, String schemaName,
+        boolean loc, @Nullable GridQueryCancel cancel) {
+        long qryId = qryIdGen.incrementAndGet();
+
+        GridRunningQueryInfo run = new GridRunningQueryInfo(
+            qryId,
+            qry,
+            qryType,
+            schemaName,
+            System.currentTimeMillis(),
+            cancel,
+            loc
+        );
+
+        GridRunningQueryInfo preRun = runs.putIfAbsent(qryId, run);
+
+        assert preRun == null : "Running query already registered [prev_qry=" 
+ preRun + ", newQry=" + run + ']';
+
+        return run;
+    }
+
+    /**
+     * Unregister running query.
+     *
+     * @param runningQryInfo Running query info..
+     * @return Unregistered running query info. {@code null} in case running 
query is not registered.
+     */
+    @Nullable public GridRunningQueryInfo unregister(@Nullable 
GridRunningQueryInfo runningQryInfo) {
+        return (runningQryInfo != null) ? unregister(runningQryInfo.id()) : 
null;
+    }
+
+    /**
+     * Unregister running query.
+     *
+     * @param qryId Query id.
+     * @return Unregistered running query info. {@code null} in case running 
query with give id wasn't found.
+     */
+    @Nullable public GridRunningQueryInfo unregister(Long qryId) {
+        if (qryId == null)
+            return null;
+
+        return runs.remove(qryId);
+    }
+
+    /**
+     * Return long running user queries.
+     *
+     * @param duration Duration of long query.
+     * @return List of queries which running longer than given duration.
+     */
+    public Collection<GridRunningQueryInfo> longRunningQueries(long duration) {
+        Collection<GridRunningQueryInfo> res = new ArrayList<>();
+
+        long curTime = System.currentTimeMillis();
+
+        for (GridRunningQueryInfo runningQryInfo : runs.values()) {
+            if (runningQryInfo.longQuery(curTime, duration))
+                res.add(runningQryInfo);
+        }
+
+        return res;
+    }
+
+    /**
+     * Cancel query.
+     *
+     * @param qryId Query id.
+     */
+    public void cancel(Long qryId) {
+        GridRunningQueryInfo run = runs.get(qryId);
+
+        if (run != null)
+            run.cancel();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(RunningQueryManager.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index 7051fd8..cc0bee8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -53,6 +53,7 @@ import 
org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.QueryField;
 import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
+import org.apache.ignite.internal.processors.query.RunningQueryManager;
 import org.apache.ignite.internal.processors.query.SqlClientContext;
 import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
 import 
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
@@ -289,7 +290,8 @@ public class IgniteClientCacheInitializationFailTest 
extends GridCommonAbstractT
 
         /** {@inheritDoc} */
         @Override public List<FieldsQueryCursor<List<?>>> 
querySqlFields(String schemaName, SqlFieldsQuery qry,
-            SqlClientContext cliCtx, boolean keepBinary, boolean 
failOnMultipleStmts, MvccQueryTracker tracker, GridQueryCancel cancel) {
+            SqlClientContext cliCtx, boolean keepBinary, boolean 
failOnMultipleStmts, MvccQueryTracker tracker,
+            GridQueryCancel cancel, boolean registerAsNewQry) {
             return null;
         }
 
@@ -307,7 +309,8 @@ public class IgniteClientCacheInitializationFailTest 
extends GridCommonAbstractT
 
         /** {@inheritDoc} */
         @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(String 
schemaName, SqlFieldsQuery qry,
-            boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel 
cancel) throws IgniteCheckedException {
+            boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel 
cancel,
+            Long qryId) throws IgniteCheckedException {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 4499104..f666f1e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -34,6 +34,9 @@ import java.net.ServerSocket;
 import java.nio.file.attribute.PosixFilePermission;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -86,6 +89,8 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.util.GridBusyLock;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridAbsClosure;
@@ -109,6 +114,8 @@ import 
org.apache.ignite.testframework.junits.GridAbstractTest;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static org.junit.Assert.assertNotNull;
+
 /**
  * Utility class for tests.
  */
@@ -2078,4 +2085,32 @@ public final class GridTestUtils {
             System.setProperties(props);
         }
     }
+
+    /**
+     * @param node Node to connect to.
+     * @param params Connection parameters.
+     * @return Thin JDBC connection to specified node.
+     */
+    public static Connection connect(IgniteEx node, String params) throws 
SQLException {
+        Collection<GridPortRecord> recs = node.context().ports().records();
+
+        GridPortRecord cliLsnrRec = null;
+
+        for (GridPortRecord rec : recs) {
+            if (rec.clazz() == ClientListenerProcessor.class) {
+                cliLsnrRec = rec;
+
+                break;
+            }
+        }
+
+        assertNotNull(cliLsnrRec);
+
+        String connStr = "jdbc:ignite:thin://127.0.0.1:" + cliLsnrRec.port();
+
+        if (!F.isEmpty(params))
+            connStr += "/?" + params;
+
+        return DriverManager.getConnection(connStr);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java
new file mode 100644
index 0000000..3e08c7d
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.RunningQueryManager;
+
+/**
+ * Query cursor for registered as running queries.
+ *
+ * Running query will be unregistered during close of cursor.
+ */
+public class RegisteredQueryCursor<T> extends QueryCursorImpl<T> {
+    /** */
+    private final AtomicBoolean unregistered = new AtomicBoolean(false);
+
+    /** */
+    private RunningQueryManager runningQryMgr;
+
+    /** */
+    private Long qryId;
+
+    /**
+     * @param iterExec Query executor.
+     * @param cancel Cancellation closure.
+     * @param runningQryMgr Running query manager.
+     * @param qryId Registered running query id.
+     */
+    public RegisteredQueryCursor(Iterable<T> iterExec, GridQueryCancel cancel, 
RunningQueryManager runningQryMgr,
+        Long qryId) {
+        super(iterExec, cancel);
+
+        assert runningQryMgr != null;
+        assert qryId != null;
+
+        this.runningQryMgr = runningQryMgr;
+        this.qryId = qryId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (unregistered.compareAndSet(false, true))
+            runningQryMgr.unregister(qryId);
+
+        super.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index dfd677b..920c0f0 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -58,6 +58,7 @@ import 
org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.mvcc.StaticMvccQueryTracker;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
@@ -66,6 +67,7 @@ import 
org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
 import 
org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
@@ -415,6 +417,7 @@ public class DmlStatementsProcessor {
     /**
      * Perform given statement against given data streamer. Only rows based 
INSERT is supported.
      *
+     * @param qry Query.
      * @param schemaName Schema name.
      * @param streamer Streamer to feed data to.
      * @param stmt Statement.
@@ -423,74 +426,82 @@ public class DmlStatementsProcessor {
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings({"unchecked"})
-    long streamUpdateQuery(String schemaName, IgniteDataStreamer streamer, 
PreparedStatement stmt, final Object[] args)
-        throws IgniteCheckedException {
-        idx.checkStatementStreamable(stmt);
+    long streamUpdateQuery(String qry, String schemaName, IgniteDataStreamer 
streamer, PreparedStatement stmt,
+        final Object[] args) throws IgniteCheckedException {
+        GridRunningQueryInfo runningQryInfo = 
idx.runningQueryManager().register(qry,
+            GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
 
-        Prepared p = GridSqlQueryParser.prepared(stmt);
+        try {
+            idx.checkStatementStreamable(stmt);
 
-        assert p != null;
+            Prepared p = GridSqlQueryParser.prepared(stmt);
 
-        final UpdatePlan plan = getPlanForStatement(schemaName, null, p, null, 
true, null);
+            assert p != null;
 
-        assert plan.isLocalSubquery();
+            final UpdatePlan plan = getPlanForStatement(schemaName, null, p, 
null, true, null);
 
-        final GridCacheContext cctx = plan.cacheContext();
+            assert plan.isLocalSubquery();
 
-        QueryCursorImpl<List<?>> cur;
+            final GridCacheContext cctx = plan.cacheContext();
 
-        final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
+            QueryCursorImpl<List<?>> cur;
 
-        QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new 
Iterable<List<?>>() {
-            @Override public Iterator<List<?>> iterator() {
-                try {
-                    Iterator<List<?>> it;
+            final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
 
-                    if (!F.isEmpty(plan.selectQuery())) {
-                        GridQueryFieldsResult res = 
idx.queryLocalSqlFields(idx.schema(cctx.name()),
-                            plan.selectQuery(), F.asList(U.firstNotNull(args, 
X.EMPTY_OBJECT_ARRAY)),
-                            null, false, false, 0, null);
+            QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new 
Iterable<List<?>>() {
+                @Override public Iterator<List<?>> iterator() {
+                    try {
+                        Iterator<List<?>> it;
 
-                        it = res.iterator();
-                    }
-                    else
-                        it = plan.createRows(U.firstNotNull(args, 
X.EMPTY_OBJECT_ARRAY)).iterator();
+                        if (!F.isEmpty(plan.selectQuery())) {
+                            GridQueryFieldsResult res = 
idx.queryLocalSqlFields(idx.schema(cctx.name()),
+                                plan.selectQuery(), 
F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)),
+                                null, false, false, 0, null);
 
-                    return new GridQueryCacheObjectsIterator(it, coCtx, 
cctx.keepBinary());
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
+                            it = res.iterator();
+                        }
+                        else
+                            it = plan.createRows(U.firstNotNull(args, 
X.EMPTY_OBJECT_ARRAY)).iterator();
+
+                        return new GridQueryCacheObjectsIterator(it, coCtx, 
cctx.keepBinary());
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
                 }
-            }
-        }, null);
+            }, null);
 
-        data.addAll(stepCur.getAll());
+            data.addAll(stepCur.getAll());
 
-        cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
-            @Override public Iterator<List<?>> iterator() {
-                return data.iterator();
-            }
-        }, null);
+            cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                @Override public Iterator<List<?>> iterator() {
+                    return data.iterator();
+                }
+            }, null);
 
-        if (plan.rowCount() == 1) {
-            IgniteBiTuple t = plan.processRow(cur.iterator().next());
+            if (plan.rowCount() == 1) {
+                IgniteBiTuple t = plan.processRow(cur.iterator().next());
 
-            streamer.addData(t.getKey(), t.getValue());
+                streamer.addData(t.getKey(), t.getValue());
 
-            return 1;
-        }
+                return 1;
+            }
 
-        Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
+            Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
 
-        for (List<?> row : cur) {
-            final IgniteBiTuple t = plan.processRow(row);
+            for (List<?> row : cur) {
+                final IgniteBiTuple t = plan.processRow(row);
 
-            rows.put(t.getKey(), t.getValue());
-        }
+                rows.put(t.getKey(), t.getValue());
+            }
 
-        streamer.addData(rows);
+            streamer.addData(rows);
 
-        return rows.size();
+            return rows.size();
+        }
+        finally {
+            idx.runningQueryManager().unregister(runningQryInfo);
+        }
     }
 
     /**
@@ -560,7 +571,7 @@ public class DmlStatementsProcessor {
                             .setTimeout((int)timeout, TimeUnit.MILLISECONDS);
 
                         FieldsQueryCursor<List<?>> cur = 
idx.querySqlFields(schemaName, newFieldsQry, null,
-                            true, true, mvccTracker(cctx, tx), cancel).get(0);
+                            true, true, mvccTracker(cctx, tx), cancel, 
false).get(0);
 
                         it = plan.iteratorForTransaction(connMgr, cur);
                     }
@@ -648,7 +659,7 @@ public class DmlStatementsProcessor {
                 .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);
 
             cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schemaName, 
newFieldsQry, null, true, true,
-                null, cancel).get(0);
+                null, cancel, false).get(0);
         }
         else if (plan.hasRows())
             cur = plan.createRows(fieldsQry.getArgs());
@@ -1168,7 +1179,7 @@ public class DmlStatementsProcessor {
                 .setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
 
             cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schema, 
newFieldsQry, null, true, true,
-                new StaticMvccQueryTracker(cctx, mvccSnapshot), cancel).get(0);
+                new StaticMvccQueryTracker(cctx, mvccSnapshot), cancel, 
false).get(0);
         }
         else {
             final GridQueryFieldsResult res = idx.queryLocalSqlFields(schema, 
plan.selectQuery(),
@@ -1193,24 +1204,32 @@ public class DmlStatementsProcessor {
     /**
      * Runs a DML statement for which we have internal command executor.
      *
+     * @param schemaName Schema name.
      * @param sql The SQL command text to execute.
      * @param cmd The command to execute.
      * @return The cursor returned by the statement.
      * @throws IgniteSQLException If failed.
      */
-    public FieldsQueryCursor<List<?>> runNativeDmlStatement(String sql, 
SqlCommand cmd) {
+    public FieldsQueryCursor<List<?>> runNativeDmlStatement(String schemaName, 
String sql, SqlCommand cmd) {
+        GridRunningQueryInfo runningQryInfo = 
idx.runningQueryManager().register(sql,
+            GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
+
         try {
             if (cmd instanceof SqlBulkLoadCommand)
-                return processBulkLoadCommand((SqlBulkLoadCommand)cmd);
+                return processBulkLoadCommand((SqlBulkLoadCommand)cmd, 
runningQryInfo.id());
             else
                 throw new IgniteSQLException("Unsupported DML operation: " + 
sql,
                     IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
         }
         catch (IgniteSQLException e) {
+            idx.runningQueryManager().unregister(runningQryInfo);
+
             throw e;
         }
         catch (Exception e) {
+            idx.runningQueryManager().unregister(runningQryInfo);
+
             throw new IgniteSQLException("Unexpected DML operation failure: " 
+ e.getMessage(), e);
         }
     }
@@ -1219,10 +1238,12 @@ public class DmlStatementsProcessor {
      * Process bulk load COPY command.
      *
      * @param cmd The command.
+     * @param qryId Query id.
      * @return The context (which is the result of the first request/response).
      * @throws IgniteCheckedException If something failed.
      */
-    public FieldsQueryCursor<List<?>> 
processBulkLoadCommand(SqlBulkLoadCommand cmd) throws IgniteCheckedException {
+    public FieldsQueryCursor<List<?>> 
processBulkLoadCommand(SqlBulkLoadCommand cmd,
+        Long qryId) throws IgniteCheckedException {
         if (cmd.packetSize() == null)
             cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE);
 
@@ -1245,7 +1266,8 @@ public class DmlStatementsProcessor {
 
         BulkLoadParser inputParser = 
BulkLoadParser.createParser(cmd.inputFormat());
 
-        BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, 
dataConverter, outputWriter);
+        BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, 
dataConverter, outputWriter,
+            idx.runningQueryManager(), qryId);
 
         BulkLoadAckClientParameters params = new 
BulkLoadAckClientParameters(cmd.localFileName(), cmd.packetSize());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 29f39b9..9a2ff90 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -33,10 +33,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
@@ -70,9 +67,11 @@ import 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
+import org.apache.ignite.internal.processors.cache.query.RegisteredQueryCursor;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
@@ -90,6 +89,7 @@ import 
org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.NestedTxMode;
 import org.apache.ignite.internal.processors.query.QueryField;
 import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
+import org.apache.ignite.internal.processors.query.RunningQueryManager;
 import org.apache.ignite.internal.processors.query.SqlClientContext;
 import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
 import org.apache.ignite.internal.processors.query.h2.affinity.PartitionNode;
@@ -174,7 +174,6 @@ import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkAc
 import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart;
-import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
 import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
 import static 
org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_CACHE_ID;
 import static 
org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_STATE;
@@ -232,14 +231,8 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
     private GridReduceQueryExecutor rdcQryExec;
 
     /** */
-    private AtomicLong qryIdGen;
-
-    /** */
     private GridSpinBusyLock busyLock;
 
-    /** */
-    private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new 
ConcurrentHashMap<>();
-
     /** Row cache. */
     private final H2RowCacheRegistry rowCache = new H2RowCacheRegistry();
 
@@ -256,6 +249,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private DdlStatementsProcessor ddlProc;
 
     /** */
+    private final RunningQueryManager runningQueryMgr = new 
RunningQueryManager();
+
+    /** */
     private volatile 
GridBoundedConcurrentLinkedHashMap<H2TwoStepCachedQueryKey, 
H2TwoStepCachedQuery> twoStepCache =
         new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE);
 
@@ -461,16 +457,14 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, 
typeName);
 
         if (tbl != null && tbl.luceneIndex() != null) {
-            GridRunningQueryInfo run = new 
GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, schemaName,
-                U.currentTimeMillis(), null, true);
+            GridRunningQueryInfo runningQryInfo = 
runningQueryManager().register(qry,
+                TEXT, schemaName, true, null);
 
             try {
-                runs.put(run.id(), run);
-
                 return tbl.luceneIndex().query(qry.toUpperCase(), filters);
             }
             finally {
-                runs.remove(run.id());
+                runningQueryManager().unregister(runningQryInfo);
             }
         }
 
@@ -641,11 +635,6 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
 
                     GridH2QueryContext.set(ctx);
 
-                    GridRunningQueryInfo run = new 
GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry0,
-                        SQL_FIELDS, schemaName, U.currentTimeMillis(), cancel, 
true);
-
-                    runs.putIfAbsent(run.id(), run);
-
                     try {
                         ResultSet rs = executeSqlQueryWithTimer(stmt0, conn, 
qry0, params, timeout0, cancel);
 
@@ -705,8 +694,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     }
                     finally {
                         GridH2QueryContext.clearThreadLocal();
-
-                        runs.remove(run.id());
                     }
                 }
             };
@@ -748,7 +735,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             throw new IgniteSQLException(e);
         }
 
-        return dmlProc.streamUpdateQuery(schemaName, streamer, stmt, params);
+        return dmlProc.streamUpdateQuery(qry, schemaName, streamer, stmt, 
params);
     }
 
     /** {@inheritDoc} */
@@ -782,7 +769,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         List<Long> res = new ArrayList<>(params.size());
 
         for (int i = 0; i < params.size(); i++)
-            res.add(dmlProc.streamUpdateQuery(schemaName, streamer, stmt, 
params.get(i)));
+            res.add(dmlProc.streamUpdateQuery(qry, schemaName, streamer, stmt, 
params.get(i)));
 
         return res;
     }
@@ -979,7 +966,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /** {@inheritDoc} */
     @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(String 
schemaName, SqlFieldsQuery qry,
-        final boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel 
cancel) throws IgniteCheckedException {
+        final boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel 
cancel,
+        Long qryId) throws IgniteCheckedException {
         String sql = qry.getSql();
         List<Object> params = F.asList(qry.getArgs());
         boolean enforceJoinOrder = qry.isEnforceJoinOrder(), startTx = 
autoStartTx(qry);
@@ -988,17 +976,18 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         final GridQueryFieldsResult res = queryLocalSqlFields(schemaName, sql, 
params, filter,
             enforceJoinOrder, startTx, timeout, cancel);
 
-        QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new 
Iterable<List<?>>() {
-            @SuppressWarnings("NullableProblems")
-            @Override public Iterator<List<?>> iterator() {
-                try {
-                    return new GridQueryCacheObjectsIterator(res.iterator(), 
objectContext(), keepBinary);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
+        Iterable<List<?>> iter = () -> {
+            try {
+                return new GridQueryCacheObjectsIterator(res.iterator(), 
objectContext(), keepBinary);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
             }
-        }, cancel);
+        };
+
+        QueryCursorImpl<List<?>> cursor = qryId != null
+            ? new RegisteredQueryCursor<>(iter, cancel, runningQueryManager(), 
qryId)
+            : new QueryCursorImpl<>(iter, cancel);
 
         cursor.fieldsMeta(res.metaData());
 
@@ -1281,15 +1270,24 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     /**
      * Executes a query natively.
      *
+     * @param schemaName Schema name.
      * @param qry Query.
      * @param cmd Parsed command corresponding to query.
      * @param cliCtx Client context, or {@code null} if not applicable.
      * @return Result cursors.
      */
-    private List<FieldsQueryCursor<List<?>>> 
queryDistributedSqlFieldsNative(SqlFieldsQuery qry, SqlCommand cmd,
-        @Nullable SqlClientContext cliCtx) {
+    private List<FieldsQueryCursor<List<?>>> 
queryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry,
+        SqlCommand cmd, @Nullable SqlClientContext cliCtx) {
+        Long qryId = null;
+
         // Execute.
         try {
+            if (cmd instanceof SqlBulkLoadCommand)
+                return 
Collections.singletonList(dmlProc.runNativeDmlStatement(schemaName, 
qry.getSql(), cmd));
+
+            //Always registry new running query for native commands except 
COPY. Currently such operations don't support cancellation.
+            qryId = registerRunningQuery(schemaName, null, qry.getSql(), 
qry.isLocal(), true);
+
             if (cmd instanceof SqlCreateIndexCommand
                 || cmd instanceof SqlDropIndexCommand
                 || cmd instanceof SqlAlterTableCommand
@@ -1297,8 +1295,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 || cmd instanceof SqlAlterUserCommand
                 || cmd instanceof SqlDropUserCommand)
                 return 
Collections.singletonList(ddlProc.runDdlStatement(qry.getSql(), cmd));
-            else if (cmd instanceof SqlBulkLoadCommand)
-                return 
Collections.singletonList(dmlProc.runNativeDmlStatement(qry.getSql(), cmd));
             else if (cmd instanceof SqlSetStreamingCommand) {
                 if (cliCtx == null)
                     throw new IgniteSQLException("SET STREAMING command can 
only be executed from JDBC or ODBC driver.");
@@ -1320,6 +1316,9 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             throw new IgniteSQLException("Failed to execute DDL statement 
[stmt=" + qry.getSql() +
                 ", err=" + e.getMessage() + ']', e);
         }
+        finally {
+            runningQueryMgr.unregister(qryId);
+        }
     }
 
     /**
@@ -1444,7 +1443,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     @SuppressWarnings({"StringEquality", "unchecked"})
     @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String 
schemaName, SqlFieldsQuery qry,
         @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean 
failOnMultipleStmts, MvccQueryTracker tracker,
-        GridQueryCancel cancel) {
+        GridQueryCancel cancel, boolean registerAsNewQry) {
         boolean mvccEnabled = mvccEnabled(ctx), startTx = autoStartTx(qry);
 
         try {
@@ -1458,7 +1457,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             }
 
             if (nativeCmd != null)
-                return queryDistributedSqlFieldsNative(qry, nativeCmd, cliCtx);
+                return queryDistributedSqlFieldsNative(schemaName, qry, 
nativeCmd, cliCtx);
 
             List<FieldsQueryCursor<List<?>>> res;
 
@@ -1477,7 +1476,8 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                     List<GridQueryFieldMetadata> meta = cachedQry.meta();
 
                     res = 
Collections.singletonList(doRunDistributedQuery(schemaName, qry, twoStepQry, 
meta, keepBinary,
-                        startTx, tracker, cancel));
+                        startTx, tracker, cancel, registerAsNewQry));
+
 
                     if (!twoStepQry.explain())
                         twoStepCache.putIfAbsent(cachedQryKey, new 
H2TwoStepCachedQuery(meta, twoStepQry.copy()));
@@ -1500,7 +1500,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                                 IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
                         return 
(List<FieldsQueryCursor<List<?>>>)doRunPrepared(schemaName, prepared, qry, 
null, null,
-                            keepBinary, startTx, tracker, cancel);
+                            keepBinary, startTx, tracker, cancel, 
registerAsNewQry);
                     }
                 }
             }
@@ -1532,7 +1532,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 firstArg += prepared.getParameters().size();
 
                 res.addAll(doRunPrepared(schemaName, prepared, newQry, 
twoStepQry, meta, keepBinary, startTx, tracker,
-                    cancel));
+                    cancel, registerAsNewQry));
 
                 // We cannot cache two-step query for multiple statements 
query except the last statement
                 if (parseRes.twoStepQuery() != null && 
parseRes.twoStepQueryKey() != null &&
@@ -1568,12 +1568,13 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * @param startTx Start transaction flag.
      * @param tracker MVCC tracker.
      * @param cancel Query cancel state holder.
+     * @param registerAsNewQry {@code true} In case it's new query which 
should be registered as running query,
      * @return Query result.
      */
     @SuppressWarnings("unchecked")
     private List<? extends FieldsQueryCursor<List<?>>> doRunPrepared(String 
schemaName, Prepared prepared,
         SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, 
List<GridQueryFieldMetadata> meta, boolean keepBinary,
-        boolean startTx, MvccQueryTracker tracker, GridQueryCancel cancel) {
+        boolean startTx, MvccQueryTracker tracker, GridQueryCancel cancel, 
boolean registerAsNewQry) {
         String sqlQry = qry.getSql();
 
         boolean loc = qry.isLocal();
@@ -1581,55 +1582,56 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         IndexingQueryFilter filter = (loc ? backupFilter(null, 
qry.getPartitions()) : null);
 
         if (!prepared.isQuery()) {
-            if (DmlStatementsProcessor.isDmlStatement(prepared)) {
-                try {
-                    Connection conn = connMgr.connectionForThread(schemaName);
-
-                    if (!loc)
-                        return dmlProc.updateSqlFieldsDistributed(schemaName, 
conn, prepared, qry, cancel);
-                    else {
-                        final GridQueryFieldsResult updRes =
-                            dmlProc.updateSqlFieldsLocal(schemaName, conn, 
prepared, qry, filter, cancel);
+            Long qryId = registerRunningQuery(schemaName, cancel, sqlQry, loc, 
registerAsNewQry);
 
-                        return Collections.singletonList(new 
QueryCursorImpl<>(new Iterable<List<?>>() {
-                            @SuppressWarnings("NullableProblems")
-                            @Override public Iterator<List<?>> iterator() {
-                                try {
-                                    return new 
GridQueryCacheObjectsIterator(updRes.iterator(), objectContext(),
-                                        true);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    throw new IgniteException(e);
+            try {
+                if (DmlStatementsProcessor.isDmlStatement(prepared)) {
+                    try {
+                        Connection conn = 
connMgr.connectionForThread(schemaName);
+
+                        if (!loc)
+                            return 
dmlProc.updateSqlFieldsDistributed(schemaName, conn, prepared, qry, cancel);
+                        else {
+                            final GridQueryFieldsResult updRes =
+                                dmlProc.updateSqlFieldsLocal(schemaName, conn, 
prepared, qry, filter, cancel);
+
+                            return Collections.singletonList(new 
QueryCursorImpl<>(new Iterable<List<?>>() {
+                                @SuppressWarnings("NullableProblems")
+                                @Override public Iterator<List<?>> iterator() {
+                                    try {
+                                        return new 
GridQueryCacheObjectsIterator(updRes.iterator(), objectContext(),
+                                            true);
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        throw new IgniteException(e);
+                                    }
                                 }
-                            }
-                        }, cancel));
+                            }, cancel));
+                        }
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteSQLException("Failed to execute DML 
statement [stmt=" + sqlQry +
+                            ", params=" + Arrays.deepToString(qry.getArgs()) + 
"]", e);
                     }
                 }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteSQLException("Failed to execute DML 
statement [stmt=" + sqlQry +
-                        ", params=" + Arrays.deepToString(qry.getArgs()) + 
"]", e);
-                }
-            }
-
-            if (DdlStatementsProcessor.isDdlStatement(prepared)) {
-                if (loc)
-                    throw new IgniteSQLException("DDL statements are not 
supported for LOCAL caches",
-                        IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
-                return 
Collections.singletonList(ddlProc.runDdlStatement(sqlQry, prepared));
-            }
+                if (DdlStatementsProcessor.isDdlStatement(prepared)) {
+                    if (loc)
+                        throw new IgniteSQLException("DDL statements are not 
supported for LOCAL caches",
+                            IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
-            if (prepared instanceof NoOperation) {
-                QueryCursorImpl<List<?>> resCur = 
(QueryCursorImpl<List<?>>)new QueryCursorImpl(
-                    Collections.singletonList(Collections.singletonList(0L)), 
null, false);
+                    return 
Collections.singletonList(ddlProc.runDdlStatement(sqlQry, prepared));
+                }
 
-                resCur.fieldsMeta(UPDATE_RESULT_META);
+                if (prepared instanceof NoOperation)
+                    return Collections.singletonList(H2Utils.zeroCursor());
 
-                return Collections.singletonList(resCur);
+                throw new IgniteSQLException("Unsupported DDL/DML operation: " 
+ prepared.getClass().getName(),
+                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+            }
+            finally {
+                runningQueryMgr.unregister(qryId);
             }
-
-            throw new IgniteSQLException("Unsupported DDL/DML operation: " + 
prepared.getClass().getName(),
-                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
         }
 
         if (twoStepQry != null) {
@@ -1642,20 +1644,45 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 checkSecurity(twoStepQry.cacheIds());
 
             return Collections.singletonList(doRunDistributedQuery(schemaName, 
qry, twoStepQry, meta, keepBinary,
-                startTx, tracker, cancel));
+                startTx, tracker, cancel, registerAsNewQry));
+
         }
 
         // We've encountered a local query, let's just run it.
+        Long qryId = registerRunningQuery(schemaName, cancel, sqlQry, loc, 
registerAsNewQry);
+
         try {
-            return Collections.singletonList(queryLocalSqlFields(schemaName, 
qry, keepBinary, filter, cancel));
+            return Collections.singletonList(queryLocalSqlFields(schemaName, 
qry, keepBinary, filter, cancel, qryId));
         }
         catch (IgniteCheckedException e) {
+            runningQueryMgr.unregister(qryId);
+
             throw new IgniteSQLException("Failed to execute local statement 
[stmt=" + sqlQry +
                 ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e);
         }
     }
 
     /**
+     * @param schemaName Schema name.
+     * @param cancel Query cancel state holder.
+     * @param qry Query.
+     * @param loc {@code true} for local query.
+     * @param registerAsNewQry {@code true} In case it's new query which 
should be registered as running query,
+     * @return Id of registered query or {@code null} if query wasn't 
registered.
+     */
+    private Long registerRunningQuery(String schemaName, GridQueryCancel 
cancel, String qry, boolean loc,
+        boolean registerAsNewQry) {
+        if (registerAsNewQry) {
+            GridRunningQueryInfo runningQryInfo = runningQueryMgr.register(qry,
+                GridCacheQueryType.SQL_FIELDS, schemaName, loc, cancel);
+
+            return runningQryInfo.id();
+        }
+
+        return null;
+    }
+
+    /**
      * Check security access for caches.
      *
      * @param cacheIds Cache IDs.
@@ -1939,11 +1966,12 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * @param startTx Start transaction flag.
      * @param mvccTracker Query tracker.
      * @param cancel Cancel handler.
+     * @param registerAsNewQry {@code true} In case it's new query which 
should be registered as running query,
      * @return Cursor representing distributed query result.
      */
     private FieldsQueryCursor<List<?>> doRunDistributedQuery(String 
schemaName, SqlFieldsQuery qry,
         GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, 
boolean keepBinary,
-        boolean startTx, MvccQueryTracker mvccTracker, GridQueryCancel cancel) 
{
+        boolean startTx, MvccQueryTracker mvccTracker, GridQueryCancel cancel, 
boolean registerAsNewQry) {
         if (log.isDebugEnabled())
             log.debug("Parsed query: `" + qry.getSql() + "` into two step 
query: " + twoStepQry);
 
@@ -1952,55 +1980,81 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         if (cancel == null)
             cancel = new GridQueryCancel();
 
-        // TODO: Use intersection 
(https://issues.apache.org/jira/browse/IGNITE-10567)
-        int partitions[] = qry.getPartitions();
+        Long qryId = registerRunningQuery(schemaName, cancel, qry.getSql(), 
qry.isLocal(), registerAsNewQry);
 
-        if (partitions == null && twoStepQry.derivedPartitions() != null) {
-            try {
-                PartitionNode partTree = twoStepQry.derivedPartitions().tree();
+        boolean cursorCreated = false;
 
-                Collection<Integer> partitions0 = 
partTree.apply(qry.getArgs());
+        try {
+            // TODO: Use intersection 
(https://issues.apache.org/jira/browse/IGNITE-10567)
+            int partitions[] = qry.getPartitions();
 
-                if (F.isEmpty(partitions0))
-                    partitions = new int[0];
-                else {
-                    partitions = new int[partitions0.size()];
+            if (partitions == null && twoStepQry.derivedPartitions() != null) {
+                try {
+                    PartitionNode partTree = 
twoStepQry.derivedPartitions().tree();
 
-                    int i = 0;
+                    Collection<Integer> partitions0 = 
partTree.apply(qry.getArgs());
 
-                    for (Integer part : partitions0)
-                        partitions[i++] = part;
-                }
+                    if (F.isEmpty(partitions0))
+                        partitions = new int[0];
+                    else {
+                        partitions = new int[partitions0.size()];
 
-                if (partitions.length == 0) //here we know that result of 
requested query is empty
-                    return new QueryCursorImpl<List<?>>(new 
Iterable<List<?>>(){
-                        @Override public Iterator<List<?>> iterator() {
-                            return new Iterator<List<?>>(){
+                        int i = 0;
 
-                                @Override public boolean hasNext() {
-                                    return false;
-                                }
+                        for (Integer part : partitions0)
+                            partitions[i++] = part;
+                    }
 
-                                @Override public List<?> next() {
-                                    return null;
-                                }
-                            };
-                        }
-                    });
-            }
-            catch (IgniteCheckedException e) {
-                throw new CacheException("Failed to calculate derived 
partitions: [qry=" + qry.getSql() + ", params=" +
-                    Arrays.deepToString(qry.getArgs()) + "]", e);
+                    if (partitions.length == 0) { //here we know that result 
of requested query is empty
+                        return new QueryCursorImpl<List<?>>(new 
Iterable<List<?>>() {
+                            @Override public Iterator<List<?>> iterator() {
+                                return new Iterator<List<?>>() {
+                                    @Override public boolean hasNext() {
+                                        return false;
+                                    }
+
+                                    @Override public List<?> next() {
+                                        return null;
+                                    }
+                                };
+                            }
+                        });
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    throw new CacheException("Failed to calculate derived 
partitions: [qry=" + qry.getSql() +
+                        ", params=" + Arrays.deepToString(qry.getArgs()) + 
"]", e);
+                }
             }
-        }
 
-        QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
-            runQueryTwoStep(schemaName, twoStepQry, keepBinary, 
qry.isEnforceJoinOrder(), startTx, qry.getTimeout(),
-                cancel, qry.getArgs(), partitions, qry.isLazy(), mvccTracker), 
cancel);
+            Iterable<List<?>> iter = runQueryTwoStep(
+                schemaName,
+                twoStepQry,
+                keepBinary,
+                qry.isEnforceJoinOrder(),
+                startTx,
+                qry.getTimeout(),
+                cancel,
+                qry.getArgs(),
+                partitions,
+                qry.isLazy(),
+                mvccTracker
+            );
 
-        cursor.fieldsMeta(meta);
+            QueryCursorImpl<List<?>> cursor = registerAsNewQry
+                ? new RegisteredQueryCursor<>(iter, cancel, 
runningQueryManager(), qryId)
+                : new QueryCursorImpl<>(iter, cancel);
 
-        return cursor;
+            cursor.fieldsMeta(meta);
+
+            cursorCreated = true;
+
+            return cursor;
+        }
+        finally {
+            if (!cursorCreated)
+                runningQueryMgr.unregister(qryId);
+        }
     }
 
     /**
@@ -2256,6 +2310,15 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         return rdcQryExec;
     }
 
+    /**
+     * Return Running query manager.
+     *
+     * @return Running query manager.
+     */
+    public RunningQueryManager runningQueryManager() {
+        return runningQueryMgr;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"deprecation"})
     @Override public void start(GridKernalContext ctx, GridSpinBusyLock 
busyLock) throws IgniteCheckedException {
@@ -2264,8 +2327,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
         this.busyLock = busyLock;
 
-        qryIdGen = new AtomicLong();
-
         if (SysProperties.serializeJavaObject) {
             U.warn(log, "Serialization of Java objects in H2 was enabled.");
 
@@ -2285,7 +2346,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         marshaller = ctx.config().getMarshaller();
 
         mapQryExec = new GridMapQueryExecutor(busyLock);
-        rdcQryExec = new GridReduceQueryExecutor(qryIdGen, busyLock);
+        rdcQryExec = new GridReduceQueryExecutor(busyLock);
 
         mapQryExec.start(ctx, this);
         rdcQryExec.start(ctx, this);
@@ -2556,25 +2617,14 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
     /** {@inheritDoc} */
     @Override public Collection<GridRunningQueryInfo> runningQueries(long 
duration) {
-        Collection<GridRunningQueryInfo> res = new ArrayList<>();
-
-        res.addAll(runs.values());
-        res.addAll(rdcQryExec.longRunningQueries(duration));
-
-        return res;
+        return runningQueryMgr.longRunningQueries(duration);
     }
 
     /** {@inheritDoc} */
     @Override public void cancelQueries(Collection<Long> queries) {
         if (!F.isEmpty(queries)) {
-            for (Long qryId : queries) {
-                GridRunningQueryInfo run = runs.get(qryId);
-
-                if (run != null)
-                    run.cancel();
-            }
-
-            rdcQryExec.cancelQueries(queries);
+            for (Long qryId : queries)
+                runningQueryMgr.cancel(qryId);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
index a783b8a..9e7b9ae 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.UUID;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.h2.UpdateResult;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -45,9 +44,6 @@ class DistributedUpdateRun {
     /** Accumulates error keys. */
     private HashSet<Object> errorKeys;
 
-    /** Query info. */
-    private final GridRunningQueryInfo qry;
-
     /** Result future. */
     private final GridFutureAdapter<UpdateResult> fut = new 
GridFutureAdapter<>();
 
@@ -55,23 +51,14 @@ class DistributedUpdateRun {
      * Constructor.
      *
      * @param nodeCount Number of nodes to await results from.
-     * @param qry Query info.
      */
-    DistributedUpdateRun(int nodeCount, GridRunningQueryInfo qry) {
+    DistributedUpdateRun(int nodeCount) {
         this.nodeCount = nodeCount;
-        this.qry = qry;
 
         rspNodes = new HashSet<>(nodeCount);
     }
 
     /**
-     * @return Query info.
-     */
-    GridRunningQueryInfo queryInfo() {
-        return qry;
-    }
-
-    /**
      * @return Result future.
      */
     GridFutureAdapter<UpdateResult> future() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 20ee1b4..7009bd5 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -61,12 +61,10 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSe
 import 
org.apache.ignite.internal.processors.cache.distributed.near.TxTopologyVersionFuture;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import 
org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.h2.H2FieldsIterator;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
@@ -136,7 +134,7 @@ public class GridReduceQueryExecutor {
     private IgniteLogger log;
 
     /** */
-    private final AtomicLong qryIdGen;
+    private final AtomicLong qryIdGen = new AtomicLong();
 
     /** */
     private final ConcurrentMap<Long, ReduceQueryRun> runs = new 
ConcurrentHashMap<>();
@@ -166,11 +164,9 @@ public class GridReduceQueryExecutor {
     /**
      * Constructor.
      *
-     * @param qryIdGen Query ID generator.
      * @param busyLock Busy lock.
      */
-    public GridReduceQueryExecutor(AtomicLong qryIdGen, GridSpinBusyLock 
busyLock) {
-        this.qryIdGen = qryIdGen;
+    public GridReduceQueryExecutor(GridSpinBusyLock busyLock) {
         this.busyLock = busyLock;
     }
 
@@ -435,8 +431,6 @@ public class GridReduceQueryExecutor {
                 }
             }
 
-            long qryReqId = qryIdGen.incrementAndGet();
-
             List<Integer> cacheIds = qry.cacheIds();
 
             boolean mvccEnabled = mvccEnabled(ctx);
@@ -480,9 +474,10 @@ public class GridReduceQueryExecutor {
                 }
             }
 
-            final ReduceQueryRun r = new ReduceQueryRun(qryReqId, 
qry.originalSql(), schemaName,
-                h2.connections().connectionForThread(schemaName), 
qry.mapQueries().size(), qry.pageSize(),
-                U.currentTimeMillis(), sfuFut, cancel);
+            long qryReqId = qryIdGen.incrementAndGet();
+
+            final ReduceQueryRun r = new 
ReduceQueryRun(h2.connections().connectionForThread(schemaName),
+                qry.mapQueries().size(), qry.pageSize(), sfuFut);
 
             Collection<ClusterNode> nodes;
 
@@ -877,9 +872,6 @@ public class GridReduceQueryExecutor {
         ReducePartitionMapResult nodesParts =
             mapper.nodesForPartitions(cacheIds, topVer, parts, 
isReplicatedOnly, reqId);
 
-        final GridRunningQueryInfo qryInfo = new GridRunningQueryInfo(reqId, 
selectQry, GridCacheQueryType.SQL_FIELDS,
-            schemaName, U.currentTimeMillis(), cancel, false);
-
         Collection<ClusterNode> nodes = nodesParts.nodes();
 
         if (nodes == null)
@@ -904,7 +896,7 @@ public class GridReduceQueryExecutor {
             }
         }
 
-        final DistributedUpdateRun r = new DistributedUpdateRun(nodes.size(), 
qryInfo);
+        final DistributedUpdateRun r = new DistributedUpdateRun(nodes.size());
 
         int flags = enforceJoinOrder ? 
GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER : 0;
 
@@ -1325,50 +1317,6 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * Collect queries that already running more than specified duration.
-     *
-     * @param duration Duration to check.
-     * @return Collection of IDs and statements of long running queries.
-     */
-    public Collection<GridRunningQueryInfo> longRunningQueries(long duration) {
-        Collection<GridRunningQueryInfo> res = new ArrayList<>();
-
-        long curTime = U.currentTimeMillis();
-
-        for (ReduceQueryRun run : runs.values()) {
-            if (run.queryInfo().longQuery(curTime, duration))
-                res.add(run.queryInfo());
-        }
-
-        for (DistributedUpdateRun upd: updRuns.values()) {
-            if (upd.queryInfo().longQuery(curTime, duration))
-                res.add(upd.queryInfo());
-        }
-
-        return res;
-    }
-
-    /**
-     * Cancel specified queries.
-     *
-     * @param queries Queries IDs to cancel.
-     */
-    public void cancelQueries(Collection<Long> queries) {
-        for (Long qryId : queries) {
-            ReduceQueryRun run = runs.get(qryId);
-
-            if (run != null)
-                run.queryInfo().cancel();
-            else {
-                DistributedUpdateRun upd = updRuns.get(qryId);
-
-                if (upd != null)
-                    upd.queryInfo().cancel();
-            }
-        }
-    }
-
-    /**
      * @param qryTimeout Query timeout.
      * @return Query retry timeout.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
index 7ddd653..b488bc3 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -27,22 +27,15 @@ import javax.cache.CacheException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSelectForUpdateFuture;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.util.typedef.F;
 import org.h2.jdbc.JdbcConnection;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
-
 /**
  * Query run.
  */
 class ReduceQueryRun {
     /** */
-    private final GridRunningQueryInfo qry;
-
-    /** */
     private final List<GridMergeIndex> idxs;
 
     /** */
@@ -62,20 +55,13 @@ class ReduceQueryRun {
 
     /**
      * Constructor.
-     * @param id Query ID.
-     * @param qry Query text.
-     * @param schemaName Schema name.
      * @param conn Connection.
      * @param idxsCnt Number of indexes.
      * @param pageSize Page size.
-     * @param startTime Start time.
      * @param selectForUpdateFut Future controlling {@code SELECT FOR UPDATE} 
query execution.
-     * @param cancel Query cancel handler.
      */
-    ReduceQueryRun(Long id, String qry, String schemaName, Connection conn, 
int idxsCnt, int pageSize, long startTime,
-        GridNearTxSelectForUpdateFuture selectForUpdateFut, GridQueryCancel 
cancel) {
-        this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, schemaName, 
startTime, cancel,
-            false);
+    ReduceQueryRun(Connection conn, int idxsCnt, int pageSize,
+        GridNearTxSelectForUpdateFuture selectForUpdateFut) {
 
         this.conn = (JdbcConnection)conn;
 
@@ -143,13 +129,6 @@ class ReduceQueryRun {
     }
 
     /**
-     * @return Query info.
-     */
-    GridRunningQueryInfo queryInfo() {
-        return qry;
-    }
-
-    /**
      * @return Page size.
      */
     int pageSize() {

Reply via email to