IGNITE-10621: SQL: Collect running query info for all types of queries. This closes #5620. This closes #5663.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/592cc346 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/592cc346 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/592cc346 Branch: refs/heads/ignite-601 Commit: 592cc3466b3b89d3b5c176400536cc91d21d7ab0 Parents: 383d8b2 Author: Yuriy Gerzhedovich <ygerzhedov...@gridgain.com> Authored: Tue Dec 25 15:33:49 2018 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Tue Dec 25 15:33:49 2018 +0300 ---------------------------------------------------------------------- .../thin/JdbcThinStreamingAbstractSelfTest.java | 5 +- .../processors/bulkload/BulkLoadProcessor.java | 25 +- .../processors/query/GridQueryIndexing.java | 9 +- .../processors/query/GridQueryProcessor.java | 6 +- .../processors/query/RunningQueryManager.java | 130 +++++ ...IgniteClientCacheInitializationFailTest.java | 7 +- .../ignite/testframework/GridTestUtils.java | 35 ++ .../cache/query/RegisteredQueryCursor.java | 65 +++ .../query/h2/DmlStatementsProcessor.java | 128 ++-- .../processors/query/h2/IgniteH2Indexing.java | 334 ++++++----- .../query/h2/twostep/DistributedUpdateRun.java | 15 +- .../h2/twostep/GridReduceQueryExecutor.java | 66 +-- .../query/h2/twostep/ReduceQueryRun.java | 25 +- .../processors/query/RunningQueriesTest.java | 578 ++++++++++++++++++- 14 files changed, 1092 insertions(+), 336 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java index 70dc781..a99274f 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java @@ -522,10 +522,11 @@ public abstract class JdbcThinStreamingAbstractSelfTest extends JdbcStreamingSel /** {@inheritDoc} */ @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker, - GridQueryCancel cancel) { + GridQueryCancel cancel, boolean registerAsNewQry) { IndexingWithContext.cliCtx = cliCtx; - return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, tracker, cancel); + return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, tracker, cancel, + registerAsNewQry); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java index ccf3e25..9dba60b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java @@ -17,14 +17,14 @@ package org.apache.ignite.internal.processors.bulkload; +import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.internal.processors.query.RunningQueryManager; import org.apache.ignite.internal.util.lang.IgniteClosureX; import org.apache.ignite.lang.IgniteBiTuple; -import java.util.List; - /** * Bulk load (COPY) command processor used on server to keep various context data and process portions of input * received from the client side. @@ -45,6 +45,12 @@ public class BulkLoadProcessor implements AutoCloseable { /** Becomes true after {@link #close()} method is called. */ private boolean isClosed; + /** Running query manager. */ + private final RunningQueryManager runningQryMgr; + + /** Query id. */ + private final Long qryId; + /** * Creates bulk load processor. * @@ -52,12 +58,16 @@ public class BulkLoadProcessor implements AutoCloseable { * @param dataConverter Converter, which transforms the list of strings parsed from the input stream to the * key+value entry to add to the cache. * @param outputStreamer Streamer that puts actual key/value into the cache. + * @param runningQryMgr Running query manager. + * @param qryId Running query id. */ public BulkLoadProcessor(BulkLoadParser inputParser, IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter, - BulkLoadCacheWriter outputStreamer) { + BulkLoadCacheWriter outputStreamer, RunningQueryManager runningQryMgr, Long qryId) { this.inputParser = inputParser; this.dataConverter = dataConverter; this.outputStreamer = outputStreamer; + this.runningQryMgr = runningQryMgr; + this.qryId = qryId; isClosed = false; } @@ -97,8 +107,13 @@ public class BulkLoadProcessor implements AutoCloseable { if (isClosed) return; - isClosed = true; + try { + isClosed = true; - outputStreamer.close(); + outputStreamer.close(); + } + finally { + runningQryMgr.unregister(qryId); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 2abafab..7ee8069 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -86,10 +86,13 @@ public interface GridQueryIndexing { * @param keepBinary Keep binary flag. * @param failOnMultipleStmts Whether an exception should be thrown for multiple statements query. * @param tracker Query tracker. + * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query, + * {@code false} otherwise. * @return Cursor. */ public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, - SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker, GridQueryCancel cancel); + SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker, + GridQueryCancel cancel, boolean registerAsNewQry); /** * Execute an INSERT statement using data streamer as receiver. @@ -125,10 +128,12 @@ public interface GridQueryIndexing { * @param keepBinary Keep binary flag. * @param filter Cache name and key filter. * @param cancel Query cancel. + * @param qryId Running query id. {@code null} in case query is not registered. * @return Cursor. */ public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry, - boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException; + boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel, + @Nullable Long qryId) throws IgniteCheckedException; /** * Executes text query. http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index bf2f943..c1edbbb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -64,8 +64,8 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; +import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; @@ -2169,8 +2169,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { @Override public List<FieldsQueryCursor<List<?>>> applyx() { GridQueryCancel cancel = new GridQueryCancel(); - List<FieldsQueryCursor<List<?>>> res = - idx.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, null, cancel); + List<FieldsQueryCursor<List<?>>> res = idx.querySqlFields(schemaName, qry, cliCtx, + keepBinary, failOnMultipleStmts, null, cancel, true); if (cctx != null) sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx, qryType); http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java new file mode 100644 index 0000000..d86d1f2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Keep information about all running queries. + */ +public class RunningQueryManager { + /** Keep registered user queries. */ + private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap<>(); + + /** Unique id for queries on single node. */ + private final AtomicLong qryIdGen = new AtomicLong(); + + /** + * Register running query. + * + * @param qry Query text. + * @param qryType Query type. + * @param schemaName Schema name. + * @param loc Local query flag. + * @param cancel Query cancel. Should be passed in case query is cancelable, or {@code null} otherwise. + * @return Registered RunningQueryInfo. + */ + public GridRunningQueryInfo register(String qry, GridCacheQueryType qryType, String schemaName, + boolean loc, @Nullable GridQueryCancel cancel) { + long qryId = qryIdGen.incrementAndGet(); + + GridRunningQueryInfo run = new GridRunningQueryInfo( + qryId, + qry, + qryType, + schemaName, + System.currentTimeMillis(), + cancel, + loc + ); + + GridRunningQueryInfo preRun = runs.putIfAbsent(qryId, run); + + assert preRun == null : "Running query already registered [prev_qry=" + preRun + ", newQry=" + run + ']'; + + return run; + } + + /** + * Unregister running query. + * + * @param runningQryInfo Running query info.. + * @return Unregistered running query info. {@code null} in case running query is not registered. + */ + @Nullable public GridRunningQueryInfo unregister(@Nullable GridRunningQueryInfo runningQryInfo) { + return (runningQryInfo != null) ? unregister(runningQryInfo.id()) : null; + } + + /** + * Unregister running query. + * + * @param qryId Query id. + * @return Unregistered running query info. {@code null} in case running query with give id wasn't found. + */ + @Nullable public GridRunningQueryInfo unregister(Long qryId) { + if (qryId == null) + return null; + + return runs.remove(qryId); + } + + /** + * Return long running user queries. + * + * @param duration Duration of long query. + * @return List of queries which running longer than given duration. + */ + public Collection<GridRunningQueryInfo> longRunningQueries(long duration) { + Collection<GridRunningQueryInfo> res = new ArrayList<>(); + + long curTime = System.currentTimeMillis(); + + for (GridRunningQueryInfo runningQryInfo : runs.values()) { + if (runningQryInfo.longQuery(curTime, duration)) + res.add(runningQryInfo); + } + + return res; + } + + /** + * Cancel query. + * + * @param qryId Query id. + */ + public void cancel(Long qryId) { + GridRunningQueryInfo run = runs.get(qryId); + + if (run != null) + run.cancel(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RunningQueryManager.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java index 7051fd8..cc0bee8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; +import org.apache.ignite.internal.processors.query.RunningQueryManager; import org.apache.ignite.internal.processors.query.SqlClientContext; import org.apache.ignite.internal.processors.query.UpdateSourceIterator; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; @@ -289,7 +290,8 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT /** {@inheritDoc} */ @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, - SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker, GridQueryCancel cancel) { + SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker, + GridQueryCancel cancel, boolean registerAsNewQry) { return null; } @@ -307,7 +309,8 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT /** {@inheritDoc} */ @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry, - boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException { + boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel, + Long qryId) throws IgniteCheckedException { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index 4499104..f666f1e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -34,6 +34,9 @@ import java.net.ServerSocket; import java.nio.file.attribute.PosixFilePermission; import java.security.GeneralSecurityException; import java.security.KeyStore; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -86,6 +89,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; +import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; +import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.GridBusyLock; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridAbsClosure; @@ -109,6 +114,8 @@ import org.apache.ignite.testframework.junits.GridAbstractTest; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.junit.Assert.assertNotNull; + /** * Utility class for tests. */ @@ -2078,4 +2085,32 @@ public final class GridTestUtils { System.setProperties(props); } } + + /** + * @param node Node to connect to. + * @param params Connection parameters. + * @return Thin JDBC connection to specified node. + */ + public static Connection connect(IgniteEx node, String params) throws SQLException { + Collection<GridPortRecord> recs = node.context().ports().records(); + + GridPortRecord cliLsnrRec = null; + + for (GridPortRecord rec : recs) { + if (rec.clazz() == ClientListenerProcessor.class) { + cliLsnrRec = rec; + + break; + } + } + + assertNotNull(cliLsnrRec); + + String connStr = "jdbc:ignite:thin://127.0.0.1:" + cliLsnrRec.port(); + + if (!F.isEmpty(params)) + connStr += "/?" + params; + + return DriverManager.getConnection(connStr); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java new file mode 100644 index 0000000..3e08c7d --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache.query; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.internal.processors.cache.QueryCursorImpl; +import org.apache.ignite.internal.processors.query.GridQueryCancel; +import org.apache.ignite.internal.processors.query.RunningQueryManager; + +/** + * Query cursor for registered as running queries. + * + * Running query will be unregistered during close of cursor. + */ +public class RegisteredQueryCursor<T> extends QueryCursorImpl<T> { + /** */ + private final AtomicBoolean unregistered = new AtomicBoolean(false); + + /** */ + private RunningQueryManager runningQryMgr; + + /** */ + private Long qryId; + + /** + * @param iterExec Query executor. + * @param cancel Cancellation closure. + * @param runningQryMgr Running query manager. + * @param qryId Registered running query id. + */ + public RegisteredQueryCursor(Iterable<T> iterExec, GridQueryCancel cancel, RunningQueryManager runningQryMgr, + Long qryId) { + super(iterExec, cancel); + + assert runningQryMgr != null; + assert qryId != null; + + this.runningQryMgr = runningQryMgr; + this.qryId = qryId; + } + + /** {@inheritDoc} */ + @Override public void close() { + if (unregistered.compareAndSet(false, true)) + runningQryMgr.unregister(qryId); + + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index dfd677b..920c0f0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.StaticMvccQueryTracker; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; import org.apache.ignite.internal.processors.odbc.SqlStateCode; @@ -66,6 +67,7 @@ import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.UpdateSourceIterator; @@ -415,6 +417,7 @@ public class DmlStatementsProcessor { /** * Perform given statement against given data streamer. Only rows based INSERT is supported. * + * @param qry Query. * @param schemaName Schema name. * @param streamer Streamer to feed data to. * @param stmt Statement. @@ -423,74 +426,82 @@ public class DmlStatementsProcessor { * @throws IgniteCheckedException if failed. */ @SuppressWarnings({"unchecked"}) - long streamUpdateQuery(String schemaName, IgniteDataStreamer streamer, PreparedStatement stmt, final Object[] args) - throws IgniteCheckedException { - idx.checkStatementStreamable(stmt); + long streamUpdateQuery(String qry, String schemaName, IgniteDataStreamer streamer, PreparedStatement stmt, + final Object[] args) throws IgniteCheckedException { + GridRunningQueryInfo runningQryInfo = idx.runningQueryManager().register(qry, + GridCacheQueryType.SQL_FIELDS, schemaName, true, null); - Prepared p = GridSqlQueryParser.prepared(stmt); + try { + idx.checkStatementStreamable(stmt); - assert p != null; + Prepared p = GridSqlQueryParser.prepared(stmt); - final UpdatePlan plan = getPlanForStatement(schemaName, null, p, null, true, null); + assert p != null; - assert plan.isLocalSubquery(); + final UpdatePlan plan = getPlanForStatement(schemaName, null, p, null, true, null); - final GridCacheContext cctx = plan.cacheContext(); + assert plan.isLocalSubquery(); - QueryCursorImpl<List<?>> cur; + final GridCacheContext cctx = plan.cacheContext(); - final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount()); + QueryCursorImpl<List<?>> cur; - QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() { - @Override public Iterator<List<?>> iterator() { - try { - Iterator<List<?>> it; + final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount()); - if (!F.isEmpty(plan.selectQuery())) { - GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), - plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)), - null, false, false, 0, null); + QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() { + @Override public Iterator<List<?>> iterator() { + try { + Iterator<List<?>> it; - it = res.iterator(); - } - else - it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator(); + if (!F.isEmpty(plan.selectQuery())) { + GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), + plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)), + null, false, false, 0, null); - return new GridQueryCacheObjectsIterator(it, coCtx, cctx.keepBinary()); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); + it = res.iterator(); + } + else + it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator(); + + return new GridQueryCacheObjectsIterator(it, coCtx, cctx.keepBinary()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } } - } - }, null); + }, null); - data.addAll(stepCur.getAll()); + data.addAll(stepCur.getAll()); - cur = new QueryCursorImpl<>(new Iterable<List<?>>() { - @Override public Iterator<List<?>> iterator() { - return data.iterator(); - } - }, null); + cur = new QueryCursorImpl<>(new Iterable<List<?>>() { + @Override public Iterator<List<?>> iterator() { + return data.iterator(); + } + }, null); - if (plan.rowCount() == 1) { - IgniteBiTuple t = plan.processRow(cur.iterator().next()); + if (plan.rowCount() == 1) { + IgniteBiTuple t = plan.processRow(cur.iterator().next()); - streamer.addData(t.getKey(), t.getValue()); + streamer.addData(t.getKey(), t.getValue()); - return 1; - } + return 1; + } - Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount()); + Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount()); - for (List<?> row : cur) { - final IgniteBiTuple t = plan.processRow(row); + for (List<?> row : cur) { + final IgniteBiTuple t = plan.processRow(row); - rows.put(t.getKey(), t.getValue()); - } + rows.put(t.getKey(), t.getValue()); + } - streamer.addData(rows); + streamer.addData(rows); - return rows.size(); + return rows.size(); + } + finally { + idx.runningQueryManager().unregister(runningQryInfo); + } } /** @@ -560,7 +571,7 @@ public class DmlStatementsProcessor { .setTimeout((int)timeout, TimeUnit.MILLISECONDS); FieldsQueryCursor<List<?>> cur = idx.querySqlFields(schemaName, newFieldsQry, null, - true, true, mvccTracker(cctx, tx), cancel).get(0); + true, true, mvccTracker(cctx, tx), cancel, false).get(0); it = plan.iteratorForTransaction(connMgr, cur); } @@ -648,7 +659,7 @@ public class DmlStatementsProcessor { .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS); cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schemaName, newFieldsQry, null, true, true, - null, cancel).get(0); + null, cancel, false).get(0); } else if (plan.hasRows()) cur = plan.createRows(fieldsQry.getArgs()); @@ -1168,7 +1179,7 @@ public class DmlStatementsProcessor { .setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS); cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schema, newFieldsQry, null, true, true, - new StaticMvccQueryTracker(cctx, mvccSnapshot), cancel).get(0); + new StaticMvccQueryTracker(cctx, mvccSnapshot), cancel, false).get(0); } else { final GridQueryFieldsResult res = idx.queryLocalSqlFields(schema, plan.selectQuery(), @@ -1193,24 +1204,32 @@ public class DmlStatementsProcessor { /** * Runs a DML statement for which we have internal command executor. * + * @param schemaName Schema name. * @param sql The SQL command text to execute. * @param cmd The command to execute. * @return The cursor returned by the statement. * @throws IgniteSQLException If failed. */ - public FieldsQueryCursor<List<?>> runNativeDmlStatement(String sql, SqlCommand cmd) { + public FieldsQueryCursor<List<?>> runNativeDmlStatement(String schemaName, String sql, SqlCommand cmd) { + GridRunningQueryInfo runningQryInfo = idx.runningQueryManager().register(sql, + GridCacheQueryType.SQL_FIELDS, schemaName, true, null); + try { if (cmd instanceof SqlBulkLoadCommand) - return processBulkLoadCommand((SqlBulkLoadCommand)cmd); + return processBulkLoadCommand((SqlBulkLoadCommand)cmd, runningQryInfo.id()); else throw new IgniteSQLException("Unsupported DML operation: " + sql, IgniteQueryErrorCode.UNSUPPORTED_OPERATION); } catch (IgniteSQLException e) { + idx.runningQueryManager().unregister(runningQryInfo); + throw e; } catch (Exception e) { + idx.runningQueryManager().unregister(runningQryInfo); + throw new IgniteSQLException("Unexpected DML operation failure: " + e.getMessage(), e); } } @@ -1219,10 +1238,12 @@ public class DmlStatementsProcessor { * Process bulk load COPY command. * * @param cmd The command. + * @param qryId Query id. * @return The context (which is the result of the first request/response). * @throws IgniteCheckedException If something failed. */ - public FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd) throws IgniteCheckedException { + public FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd, + Long qryId) throws IgniteCheckedException { if (cmd.packetSize() == null) cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE); @@ -1245,7 +1266,8 @@ public class DmlStatementsProcessor { BulkLoadParser inputParser = BulkLoadParser.createParser(cmd.inputFormat()); - BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, dataConverter, outputWriter); + BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, dataConverter, outputWriter, + idx.runningQueryManager(), qryId); BulkLoadAckClientParameters params = new BulkLoadAckClientParameters(cmd.localFileName(), cmd.packetSize()); http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 29f39b9..9a2ff90 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -33,10 +33,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; @@ -70,9 +67,11 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.cache.query.QueryTable; +import org.apache.ignite.internal.processors.cache.query.RegisteredQueryCursor; import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.odbc.SqlStateCode; @@ -90,6 +89,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.NestedTxMode; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; +import org.apache.ignite.internal.processors.query.RunningQueryManager; import org.apache.ignite.internal.processors.query.SqlClientContext; import org.apache.ignite.internal.processors.query.UpdateSourceIterator; import org.apache.ignite.internal.processors.query.h2.affinity.PartitionNode; @@ -174,7 +174,6 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkAc import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart; -import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT; import static org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_CACHE_ID; import static org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_STATE; @@ -232,14 +231,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { private GridReduceQueryExecutor rdcQryExec; /** */ - private AtomicLong qryIdGen; - - /** */ private GridSpinBusyLock busyLock; - /** */ - private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap<>(); - /** Row cache. */ private final H2RowCacheRegistry rowCache = new H2RowCacheRegistry(); @@ -256,6 +249,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { private DdlStatementsProcessor ddlProc; /** */ + private final RunningQueryManager runningQueryMgr = new RunningQueryManager(); + + /** */ private volatile GridBoundedConcurrentLinkedHashMap<H2TwoStepCachedQueryKey, H2TwoStepCachedQuery> twoStepCache = new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE); @@ -461,16 +457,14 @@ public class IgniteH2Indexing implements GridQueryIndexing { H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, typeName); if (tbl != null && tbl.luceneIndex() != null) { - GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, schemaName, - U.currentTimeMillis(), null, true); + GridRunningQueryInfo runningQryInfo = runningQueryManager().register(qry, + TEXT, schemaName, true, null); try { - runs.put(run.id(), run); - return tbl.luceneIndex().query(qry.toUpperCase(), filters); } finally { - runs.remove(run.id()); + runningQueryManager().unregister(runningQryInfo); } } @@ -641,11 +635,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridH2QueryContext.set(ctx); - GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry0, - SQL_FIELDS, schemaName, U.currentTimeMillis(), cancel, true); - - runs.putIfAbsent(run.id(), run); - try { ResultSet rs = executeSqlQueryWithTimer(stmt0, conn, qry0, params, timeout0, cancel); @@ -705,8 +694,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { } finally { GridH2QueryContext.clearThreadLocal(); - - runs.remove(run.id()); } } }; @@ -748,7 +735,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { throw new IgniteSQLException(e); } - return dmlProc.streamUpdateQuery(schemaName, streamer, stmt, params); + return dmlProc.streamUpdateQuery(qry, schemaName, streamer, stmt, params); } /** {@inheritDoc} */ @@ -782,7 +769,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { List<Long> res = new ArrayList<>(params.size()); for (int i = 0; i < params.size(); i++) - res.add(dmlProc.streamUpdateQuery(schemaName, streamer, stmt, params.get(i))); + res.add(dmlProc.streamUpdateQuery(qry, schemaName, streamer, stmt, params.get(i))); return res; } @@ -979,7 +966,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry, - final boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException { + final boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel, + Long qryId) throws IgniteCheckedException { String sql = qry.getSql(); List<Object> params = F.asList(qry.getArgs()); boolean enforceJoinOrder = qry.isEnforceJoinOrder(), startTx = autoStartTx(qry); @@ -988,17 +976,18 @@ public class IgniteH2Indexing implements GridQueryIndexing { final GridQueryFieldsResult res = queryLocalSqlFields(schemaName, sql, params, filter, enforceJoinOrder, startTx, timeout, cancel); - QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() { - @SuppressWarnings("NullableProblems") - @Override public Iterator<List<?>> iterator() { - try { - return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + Iterable<List<?>> iter = () -> { + try { + return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); } - }, cancel); + }; + + QueryCursorImpl<List<?>> cursor = qryId != null + ? new RegisteredQueryCursor<>(iter, cancel, runningQueryManager(), qryId) + : new QueryCursorImpl<>(iter, cancel); cursor.fieldsMeta(res.metaData()); @@ -1281,15 +1270,24 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** * Executes a query natively. * + * @param schemaName Schema name. * @param qry Query. * @param cmd Parsed command corresponding to query. * @param cliCtx Client context, or {@code null} if not applicable. * @return Result cursors. */ - private List<FieldsQueryCursor<List<?>>> queryDistributedSqlFieldsNative(SqlFieldsQuery qry, SqlCommand cmd, - @Nullable SqlClientContext cliCtx) { + private List<FieldsQueryCursor<List<?>>> queryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry, + SqlCommand cmd, @Nullable SqlClientContext cliCtx) { + Long qryId = null; + // Execute. try { + if (cmd instanceof SqlBulkLoadCommand) + return Collections.singletonList(dmlProc.runNativeDmlStatement(schemaName, qry.getSql(), cmd)); + + //Always registry new running query for native commands except COPY. Currently such operations don't support cancellation. + qryId = registerRunningQuery(schemaName, null, qry.getSql(), qry.isLocal(), true); + if (cmd instanceof SqlCreateIndexCommand || cmd instanceof SqlDropIndexCommand || cmd instanceof SqlAlterTableCommand @@ -1297,8 +1295,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { || cmd instanceof SqlAlterUserCommand || cmd instanceof SqlDropUserCommand) return Collections.singletonList(ddlProc.runDdlStatement(qry.getSql(), cmd)); - else if (cmd instanceof SqlBulkLoadCommand) - return Collections.singletonList(dmlProc.runNativeDmlStatement(qry.getSql(), cmd)); else if (cmd instanceof SqlSetStreamingCommand) { if (cliCtx == null) throw new IgniteSQLException("SET STREAMING command can only be executed from JDBC or ODBC driver."); @@ -1320,6 +1316,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.getSql() + ", err=" + e.getMessage() + ']', e); } + finally { + runningQueryMgr.unregister(qryId); + } } /** @@ -1444,7 +1443,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { @SuppressWarnings({"StringEquality", "unchecked"}) @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker, - GridQueryCancel cancel) { + GridQueryCancel cancel, boolean registerAsNewQry) { boolean mvccEnabled = mvccEnabled(ctx), startTx = autoStartTx(qry); try { @@ -1458,7 +1457,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } if (nativeCmd != null) - return queryDistributedSqlFieldsNative(qry, nativeCmd, cliCtx); + return queryDistributedSqlFieldsNative(schemaName, qry, nativeCmd, cliCtx); List<FieldsQueryCursor<List<?>>> res; @@ -1477,7 +1476,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { List<GridQueryFieldMetadata> meta = cachedQry.meta(); res = Collections.singletonList(doRunDistributedQuery(schemaName, qry, twoStepQry, meta, keepBinary, - startTx, tracker, cancel)); + startTx, tracker, cancel, registerAsNewQry)); + if (!twoStepQry.explain()) twoStepCache.putIfAbsent(cachedQryKey, new H2TwoStepCachedQuery(meta, twoStepQry.copy())); @@ -1500,7 +1500,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { IgniteQueryErrorCode.UNSUPPORTED_OPERATION); return (List<FieldsQueryCursor<List<?>>>)doRunPrepared(schemaName, prepared, qry, null, null, - keepBinary, startTx, tracker, cancel); + keepBinary, startTx, tracker, cancel, registerAsNewQry); } } } @@ -1532,7 +1532,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { firstArg += prepared.getParameters().size(); res.addAll(doRunPrepared(schemaName, prepared, newQry, twoStepQry, meta, keepBinary, startTx, tracker, - cancel)); + cancel, registerAsNewQry)); // We cannot cache two-step query for multiple statements query except the last statement if (parseRes.twoStepQuery() != null && parseRes.twoStepQueryKey() != null && @@ -1568,12 +1568,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param startTx Start transaction flag. * @param tracker MVCC tracker. * @param cancel Query cancel state holder. + * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query, * @return Query result. */ @SuppressWarnings("unchecked") private List<? extends FieldsQueryCursor<List<?>>> doRunPrepared(String schemaName, Prepared prepared, SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, boolean keepBinary, - boolean startTx, MvccQueryTracker tracker, GridQueryCancel cancel) { + boolean startTx, MvccQueryTracker tracker, GridQueryCancel cancel, boolean registerAsNewQry) { String sqlQry = qry.getSql(); boolean loc = qry.isLocal(); @@ -1581,55 +1582,56 @@ public class IgniteH2Indexing implements GridQueryIndexing { IndexingQueryFilter filter = (loc ? backupFilter(null, qry.getPartitions()) : null); if (!prepared.isQuery()) { - if (DmlStatementsProcessor.isDmlStatement(prepared)) { - try { - Connection conn = connMgr.connectionForThread(schemaName); - - if (!loc) - return dmlProc.updateSqlFieldsDistributed(schemaName, conn, prepared, qry, cancel); - else { - final GridQueryFieldsResult updRes = - dmlProc.updateSqlFieldsLocal(schemaName, conn, prepared, qry, filter, cancel); + Long qryId = registerRunningQuery(schemaName, cancel, sqlQry, loc, registerAsNewQry); - return Collections.singletonList(new QueryCursorImpl<>(new Iterable<List<?>>() { - @SuppressWarnings("NullableProblems") - @Override public Iterator<List<?>> iterator() { - try { - return new GridQueryCacheObjectsIterator(updRes.iterator(), objectContext(), - true); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); + try { + if (DmlStatementsProcessor.isDmlStatement(prepared)) { + try { + Connection conn = connMgr.connectionForThread(schemaName); + + if (!loc) + return dmlProc.updateSqlFieldsDistributed(schemaName, conn, prepared, qry, cancel); + else { + final GridQueryFieldsResult updRes = + dmlProc.updateSqlFieldsLocal(schemaName, conn, prepared, qry, filter, cancel); + + return Collections.singletonList(new QueryCursorImpl<>(new Iterable<List<?>>() { + @SuppressWarnings("NullableProblems") + @Override public Iterator<List<?>> iterator() { + try { + return new GridQueryCacheObjectsIterator(updRes.iterator(), objectContext(), + true); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } } - } - }, cancel)); + }, cancel)); + } + } + catch (IgniteCheckedException e) { + throw new IgniteSQLException("Failed to execute DML statement [stmt=" + sqlQry + + ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e); } } - catch (IgniteCheckedException e) { - throw new IgniteSQLException("Failed to execute DML statement [stmt=" + sqlQry + - ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e); - } - } - - if (DdlStatementsProcessor.isDdlStatement(prepared)) { - if (loc) - throw new IgniteSQLException("DDL statements are not supported for LOCAL caches", - IgniteQueryErrorCode.UNSUPPORTED_OPERATION); - return Collections.singletonList(ddlProc.runDdlStatement(sqlQry, prepared)); - } + if (DdlStatementsProcessor.isDdlStatement(prepared)) { + if (loc) + throw new IgniteSQLException("DDL statements are not supported for LOCAL caches", + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); - if (prepared instanceof NoOperation) { - QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl( - Collections.singletonList(Collections.singletonList(0L)), null, false); + return Collections.singletonList(ddlProc.runDdlStatement(sqlQry, prepared)); + } - resCur.fieldsMeta(UPDATE_RESULT_META); + if (prepared instanceof NoOperation) + return Collections.singletonList(H2Utils.zeroCursor()); - return Collections.singletonList(resCur); + throw new IgniteSQLException("Unsupported DDL/DML operation: " + prepared.getClass().getName(), + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + } + finally { + runningQueryMgr.unregister(qryId); } - - throw new IgniteSQLException("Unsupported DDL/DML operation: " + prepared.getClass().getName(), - IgniteQueryErrorCode.UNSUPPORTED_OPERATION); } if (twoStepQry != null) { @@ -1642,20 +1644,45 @@ public class IgniteH2Indexing implements GridQueryIndexing { checkSecurity(twoStepQry.cacheIds()); return Collections.singletonList(doRunDistributedQuery(schemaName, qry, twoStepQry, meta, keepBinary, - startTx, tracker, cancel)); + startTx, tracker, cancel, registerAsNewQry)); + } // We've encountered a local query, let's just run it. + Long qryId = registerRunningQuery(schemaName, cancel, sqlQry, loc, registerAsNewQry); + try { - return Collections.singletonList(queryLocalSqlFields(schemaName, qry, keepBinary, filter, cancel)); + return Collections.singletonList(queryLocalSqlFields(schemaName, qry, keepBinary, filter, cancel, qryId)); } catch (IgniteCheckedException e) { + runningQueryMgr.unregister(qryId); + throw new IgniteSQLException("Failed to execute local statement [stmt=" + sqlQry + ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e); } } /** + * @param schemaName Schema name. + * @param cancel Query cancel state holder. + * @param qry Query. + * @param loc {@code true} for local query. + * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query, + * @return Id of registered query or {@code null} if query wasn't registered. + */ + private Long registerRunningQuery(String schemaName, GridQueryCancel cancel, String qry, boolean loc, + boolean registerAsNewQry) { + if (registerAsNewQry) { + GridRunningQueryInfo runningQryInfo = runningQueryMgr.register(qry, + GridCacheQueryType.SQL_FIELDS, schemaName, loc, cancel); + + return runningQryInfo.id(); + } + + return null; + } + + /** * Check security access for caches. * * @param cacheIds Cache IDs. @@ -1939,11 +1966,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param startTx Start transaction flag. * @param mvccTracker Query tracker. * @param cancel Cancel handler. + * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query, * @return Cursor representing distributed query result. */ private FieldsQueryCursor<List<?>> doRunDistributedQuery(String schemaName, SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, boolean keepBinary, - boolean startTx, MvccQueryTracker mvccTracker, GridQueryCancel cancel) { + boolean startTx, MvccQueryTracker mvccTracker, GridQueryCancel cancel, boolean registerAsNewQry) { if (log.isDebugEnabled()) log.debug("Parsed query: `" + qry.getSql() + "` into two step query: " + twoStepQry); @@ -1952,55 +1980,81 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (cancel == null) cancel = new GridQueryCancel(); - // TODO: Use intersection (https://issues.apache.org/jira/browse/IGNITE-10567) - int partitions[] = qry.getPartitions(); + Long qryId = registerRunningQuery(schemaName, cancel, qry.getSql(), qry.isLocal(), registerAsNewQry); - if (partitions == null && twoStepQry.derivedPartitions() != null) { - try { - PartitionNode partTree = twoStepQry.derivedPartitions().tree(); + boolean cursorCreated = false; - Collection<Integer> partitions0 = partTree.apply(qry.getArgs()); + try { + // TODO: Use intersection (https://issues.apache.org/jira/browse/IGNITE-10567) + int partitions[] = qry.getPartitions(); - if (F.isEmpty(partitions0)) - partitions = new int[0]; - else { - partitions = new int[partitions0.size()]; + if (partitions == null && twoStepQry.derivedPartitions() != null) { + try { + PartitionNode partTree = twoStepQry.derivedPartitions().tree(); - int i = 0; + Collection<Integer> partitions0 = partTree.apply(qry.getArgs()); - for (Integer part : partitions0) - partitions[i++] = part; - } + if (F.isEmpty(partitions0)) + partitions = new int[0]; + else { + partitions = new int[partitions0.size()]; - if (partitions.length == 0) //here we know that result of requested query is empty - return new QueryCursorImpl<List<?>>(new Iterable<List<?>>(){ - @Override public Iterator<List<?>> iterator() { - return new Iterator<List<?>>(){ + int i = 0; - @Override public boolean hasNext() { - return false; - } + for (Integer part : partitions0) + partitions[i++] = part; + } - @Override public List<?> next() { - return null; - } - }; - } - }); - } - catch (IgniteCheckedException e) { - throw new CacheException("Failed to calculate derived partitions: [qry=" + qry.getSql() + ", params=" + - Arrays.deepToString(qry.getArgs()) + "]", e); + if (partitions.length == 0) { //here we know that result of requested query is empty + return new QueryCursorImpl<List<?>>(new Iterable<List<?>>() { + @Override public Iterator<List<?>> iterator() { + return new Iterator<List<?>>() { + @Override public boolean hasNext() { + return false; + } + + @Override public List<?> next() { + return null; + } + }; + } + }); + } + } + catch (IgniteCheckedException e) { + throw new CacheException("Failed to calculate derived partitions: [qry=" + qry.getSql() + + ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e); + } } - } - QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>( - runQueryTwoStep(schemaName, twoStepQry, keepBinary, qry.isEnforceJoinOrder(), startTx, qry.getTimeout(), - cancel, qry.getArgs(), partitions, qry.isLazy(), mvccTracker), cancel); + Iterable<List<?>> iter = runQueryTwoStep( + schemaName, + twoStepQry, + keepBinary, + qry.isEnforceJoinOrder(), + startTx, + qry.getTimeout(), + cancel, + qry.getArgs(), + partitions, + qry.isLazy(), + mvccTracker + ); - cursor.fieldsMeta(meta); + QueryCursorImpl<List<?>> cursor = registerAsNewQry + ? new RegisteredQueryCursor<>(iter, cancel, runningQueryManager(), qryId) + : new QueryCursorImpl<>(iter, cancel); - return cursor; + cursor.fieldsMeta(meta); + + cursorCreated = true; + + return cursor; + } + finally { + if (!cursorCreated) + runningQueryMgr.unregister(qryId); + } } /** @@ -2256,6 +2310,15 @@ public class IgniteH2Indexing implements GridQueryIndexing { return rdcQryExec; } + /** + * Return Running query manager. + * + * @return Running query manager. + */ + public RunningQueryManager runningQueryManager() { + return runningQueryMgr; + } + /** {@inheritDoc} */ @SuppressWarnings({"deprecation"}) @Override public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws IgniteCheckedException { @@ -2264,8 +2327,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { this.busyLock = busyLock; - qryIdGen = new AtomicLong(); - if (SysProperties.serializeJavaObject) { U.warn(log, "Serialization of Java objects in H2 was enabled."); @@ -2285,7 +2346,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { marshaller = ctx.config().getMarshaller(); mapQryExec = new GridMapQueryExecutor(busyLock); - rdcQryExec = new GridReduceQueryExecutor(qryIdGen, busyLock); + rdcQryExec = new GridReduceQueryExecutor(busyLock); mapQryExec.start(ctx, this); rdcQryExec.start(ctx, this); @@ -2556,25 +2617,14 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @Override public Collection<GridRunningQueryInfo> runningQueries(long duration) { - Collection<GridRunningQueryInfo> res = new ArrayList<>(); - - res.addAll(runs.values()); - res.addAll(rdcQryExec.longRunningQueries(duration)); - - return res; + return runningQueryMgr.longRunningQueries(duration); } /** {@inheritDoc} */ @Override public void cancelQueries(Collection<Long> queries) { if (!F.isEmpty(queries)) { - for (Long qryId : queries) { - GridRunningQueryInfo run = runs.get(qryId); - - if (run != null) - run.cancel(); - } - - rdcQryExec.cancelQueries(queries); + for (Long qryId : queries) + runningQueryMgr.cancel(qryId); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java index a783b8a..9e7b9ae 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.UUID; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.h2.UpdateResult; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -45,9 +44,6 @@ class DistributedUpdateRun { /** Accumulates error keys. */ private HashSet<Object> errorKeys; - /** Query info. */ - private final GridRunningQueryInfo qry; - /** Result future. */ private final GridFutureAdapter<UpdateResult> fut = new GridFutureAdapter<>(); @@ -55,23 +51,14 @@ class DistributedUpdateRun { * Constructor. * * @param nodeCount Number of nodes to await results from. - * @param qry Query info. */ - DistributedUpdateRun(int nodeCount, GridRunningQueryInfo qry) { + DistributedUpdateRun(int nodeCount) { this.nodeCount = nodeCount; - this.qry = qry; rspNodes = new HashSet<>(nodeCount); } /** - * @return Query info. - */ - GridRunningQueryInfo queryInfo() { - return qry; - } - - /** * @return Result future. */ GridFutureAdapter<UpdateResult> future() { http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 20ee1b4..7009bd5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -61,12 +61,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSe import org.apache.ignite.internal.processors.cache.distributed.near.TxTopologyVersionFuture; import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; -import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; import org.apache.ignite.internal.processors.query.GridQueryCancel; -import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.h2.H2FieldsIterator; import org.apache.ignite.internal.processors.query.h2.H2Utils; @@ -136,7 +134,7 @@ public class GridReduceQueryExecutor { private IgniteLogger log; /** */ - private final AtomicLong qryIdGen; + private final AtomicLong qryIdGen = new AtomicLong(); /** */ private final ConcurrentMap<Long, ReduceQueryRun> runs = new ConcurrentHashMap<>(); @@ -166,11 +164,9 @@ public class GridReduceQueryExecutor { /** * Constructor. * - * @param qryIdGen Query ID generator. * @param busyLock Busy lock. */ - public GridReduceQueryExecutor(AtomicLong qryIdGen, GridSpinBusyLock busyLock) { - this.qryIdGen = qryIdGen; + public GridReduceQueryExecutor(GridSpinBusyLock busyLock) { this.busyLock = busyLock; } @@ -435,8 +431,6 @@ public class GridReduceQueryExecutor { } } - long qryReqId = qryIdGen.incrementAndGet(); - List<Integer> cacheIds = qry.cacheIds(); boolean mvccEnabled = mvccEnabled(ctx); @@ -480,9 +474,10 @@ public class GridReduceQueryExecutor { } } - final ReduceQueryRun r = new ReduceQueryRun(qryReqId, qry.originalSql(), schemaName, - h2.connections().connectionForThread(schemaName), qry.mapQueries().size(), qry.pageSize(), - U.currentTimeMillis(), sfuFut, cancel); + long qryReqId = qryIdGen.incrementAndGet(); + + final ReduceQueryRun r = new ReduceQueryRun(h2.connections().connectionForThread(schemaName), + qry.mapQueries().size(), qry.pageSize(), sfuFut); Collection<ClusterNode> nodes; @@ -877,9 +872,6 @@ public class GridReduceQueryExecutor { ReducePartitionMapResult nodesParts = mapper.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, reqId); - final GridRunningQueryInfo qryInfo = new GridRunningQueryInfo(reqId, selectQry, GridCacheQueryType.SQL_FIELDS, - schemaName, U.currentTimeMillis(), cancel, false); - Collection<ClusterNode> nodes = nodesParts.nodes(); if (nodes == null) @@ -904,7 +896,7 @@ public class GridReduceQueryExecutor { } } - final DistributedUpdateRun r = new DistributedUpdateRun(nodes.size(), qryInfo); + final DistributedUpdateRun r = new DistributedUpdateRun(nodes.size()); int flags = enforceJoinOrder ? GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER : 0; @@ -1325,50 +1317,6 @@ public class GridReduceQueryExecutor { } /** - * Collect queries that already running more than specified duration. - * - * @param duration Duration to check. - * @return Collection of IDs and statements of long running queries. - */ - public Collection<GridRunningQueryInfo> longRunningQueries(long duration) { - Collection<GridRunningQueryInfo> res = new ArrayList<>(); - - long curTime = U.currentTimeMillis(); - - for (ReduceQueryRun run : runs.values()) { - if (run.queryInfo().longQuery(curTime, duration)) - res.add(run.queryInfo()); - } - - for (DistributedUpdateRun upd: updRuns.values()) { - if (upd.queryInfo().longQuery(curTime, duration)) - res.add(upd.queryInfo()); - } - - return res; - } - - /** - * Cancel specified queries. - * - * @param queries Queries IDs to cancel. - */ - public void cancelQueries(Collection<Long> queries) { - for (Long qryId : queries) { - ReduceQueryRun run = runs.get(qryId); - - if (run != null) - run.queryInfo().cancel(); - else { - DistributedUpdateRun upd = updRuns.get(qryId); - - if (upd != null) - upd.queryInfo().cancel(); - } - } - } - - /** * @param qryTimeout Query timeout. * @return Query retry timeout. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/592cc346/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java index 7ddd653..b488bc3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java @@ -27,22 +27,15 @@ import javax.cache.CacheException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSelectForUpdateFuture; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; -import org.apache.ignite.internal.processors.query.GridQueryCancel; -import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.util.typedef.F; import org.h2.jdbc.JdbcConnection; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; - /** * Query run. */ class ReduceQueryRun { /** */ - private final GridRunningQueryInfo qry; - - /** */ private final List<GridMergeIndex> idxs; /** */ @@ -62,20 +55,13 @@ class ReduceQueryRun { /** * Constructor. - * @param id Query ID. - * @param qry Query text. - * @param schemaName Schema name. * @param conn Connection. * @param idxsCnt Number of indexes. * @param pageSize Page size. - * @param startTime Start time. * @param selectForUpdateFut Future controlling {@code SELECT FOR UPDATE} query execution. - * @param cancel Query cancel handler. */ - ReduceQueryRun(Long id, String qry, String schemaName, Connection conn, int idxsCnt, int pageSize, long startTime, - GridNearTxSelectForUpdateFuture selectForUpdateFut, GridQueryCancel cancel) { - this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, schemaName, startTime, cancel, - false); + ReduceQueryRun(Connection conn, int idxsCnt, int pageSize, + GridNearTxSelectForUpdateFuture selectForUpdateFut) { this.conn = (JdbcConnection)conn; @@ -143,13 +129,6 @@ class ReduceQueryRun { } /** - * @return Query info. - */ - GridRunningQueryInfo queryInfo() { - return qry; - } - - /** * @return Page size. */ int pageSize() {