IGNITE-4436 WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7fec2f49 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7fec2f49 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7fec2f49 Branch: refs/heads/ignite-4436-2 Commit: 7fec2f49ae38326cb8d7d49703083614bd128a75 Parents: 40c9f50 Author: Alexey Kuznetsov <akuznet...@apache.org> Authored: Fri Feb 3 18:02:02 2017 +0700 Committer: Alexey Kuznetsov <akuznet...@apache.org> Committed: Fri Feb 3 18:02:02 2017 +0700 ---------------------------------------------------------------------- .../internal/processors/query/GridQuery.java | 66 ------------- .../processors/query/GridQueryIndexing.java | 4 +- .../processors/query/GridQueryProcessor.java | 4 +- .../processors/query/GridRunningQueryInfo.java | 98 ++++++++++++++++++++ .../internal/visor/VisorMultiNodeTask.java | 2 +- .../visor/query/VisorCancelQueriesTask.java | 17 ++-- .../query/VisorCollectCurrentQueriesTask.java | 6 +- .../ignite/internal/visor/query/VisorQuery.java | 7 +- .../cache/query/GridCacheTwoStepQuery.java | 18 +++- .../processors/query/h2/IgniteH2Indexing.java | 78 +++++++++++++--- .../query/h2/sql/GridSqlQuerySplitter.java | 4 +- .../h2/twostep/GridReduceQueryExecutor.java | 38 +++----- .../cache/GridCacheCrossCacheQuerySelfTest.java | 10 +- 13 files changed, 220 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQuery.java deleted file mode 100644 index ff7c9da..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQuery.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query; - -import java.util.UUID; - -/** - * Query descriptor. - */ -public class GridQuery { - /** */ - private UUID id; - - /** */ - private String qry; - - /** */ - private String cache; - - /** - * @param id Query ID. - * @param qry Query text. - * @param cache Cache where query was executed. - */ - public GridQuery(UUID id, String qry, String cache) { - this.id = id; - this.qry = qry; - this.cache = cache; - } - - /** - * @return Id. - */ - public UUID id() { - return id; - } - - /** - * @return Query. - */ - public String query() { - return qry; - } - - /** - * @return Cache. - */ - public String cache() { - return cache; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index e368063..323038b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -246,14 +246,14 @@ public interface GridQueryIndexing { * @param duration Duration to check. * @return Collection of long running queries. */ - public Collection<GridQuery> runningQueries(long duration); + public Collection<GridRunningQueryInfo> runningQueries(long duration); /** * Cancel specified queries. * * @param queries Queries ID's to cancel. */ - public void cancelQueries(Set<UUID> queries); + public void cancelQueries(Set<Long> queries); /** * Cancels all executing queries. http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 1e5c5d8..c14a8a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -942,7 +942,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param duration Duration to check. * @return Collection of long running queries. */ - public Collection<GridQuery> runningQueries(long duration) { + public Collection<GridRunningQueryInfo> runningQueries(long duration) { if (moduleEnabled()) return idx.runningQueries(duration); @@ -954,7 +954,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * * @param queries Queries ID's to cancel. */ - public void cancelQueries(Set<UUID> queries) { + public void cancelQueries(Set<Long> queries) { if (moduleEnabled()) idx.cancelQueries(queries); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java new file mode 100644 index 0000000..ea37d15 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +/** + * Query descriptor. + */ +public class GridRunningQueryInfo { + /** */ + private final long id; + + /** */ + private final String qry; + + /** */ + private final String cache; + + /** */ + private final long startTime; + + /** */ + private final GridQueryCancel cancel; + + /** + * @param id Query ID. + * @param qry Query text. + * @param cache Cache where query was executed. + * @param startTime Query start time. + * @param cancel Query cancel. + */ + public GridRunningQueryInfo(Long id, String qry, String cache, long startTime, GridQueryCancel cancel) { + this.id = id; + this.qry = qry; + this.cache = cache; + this.startTime = startTime; + this.cancel = cancel; + } + + /** + * @return Query ID. + */ + public Long id() { + return id; + } + + /** + * @return Query text. + */ + public String query() { + return qry; + } + + /** + * @return Cache where query was executed. + */ + public String cache() { + return cache; + } + + /** + * @return Query start time. + */ + public long startTime() { + return startTime; + } + + /** + * @param curTime Current time. + * @param duration Duration of long query. + * @return {@code true} if this query should be considered as long running query. + */ + public boolean longQuery(long curTime, long duration) { + return curTime - startTime > duration; + } + + /** + * Cancel query. + */ + public void cancel() { + if (cancel != null) + cancel.cancel(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java index 57f1346..ece1a17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java @@ -130,4 +130,4 @@ public abstract class VisorMultiNodeTask<A, R, J> implements ComputeTask<VisorTa logFinish(ignite.log(), getClass(), start); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java index 88d7eec..b40a082 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java @@ -18,11 +18,13 @@ package org.apache.ignite.internal.visor.query; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorMultiNodeTask; import org.jetbrains.annotations.Nullable; @@ -31,12 +33,12 @@ import org.jetbrains.annotations.Nullable; * Task to cancel queries. */ @GridInternal -public class VisorCancelQueriesTask extends VisorMultiNodeTask<Set<UUID>, Void, Void> { +public class VisorCancelQueriesTask extends VisorMultiNodeTask<Map<UUID, Set<Long>>, Void, Void> { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override protected VisorCancelQueriesJob job(Set<UUID> arg) { + @Override protected VisorCancelQueriesJob job(Map<UUID, Set<Long>> arg) { return new VisorCancelQueriesJob(arg, debug); } @@ -48,20 +50,23 @@ public class VisorCancelQueriesTask extends VisorMultiNodeTask<Set<UUID>, Void, /** * Job to cancel queries on node. */ - private static class VisorCancelQueriesJob extends VisorJob<Set<UUID>, Void> { + private static class VisorCancelQueriesJob extends VisorJob<Map<UUID, Set<Long>>, Void> { /** * Create job with specified argument. * * @param arg Job argument. * @param debug Flag indicating whether debug information should be printed into node log. */ - protected VisorCancelQueriesJob(@Nullable Set<UUID> arg, boolean debug) { + protected VisorCancelQueriesJob(@Nullable Map<UUID, Set<Long>> arg, boolean debug) { super(arg, debug); } /** {@inheritDoc} */ - @Override protected Void run(@Nullable Set<UUID> queries) throws IgniteException { - ignite.context().query().cancelQueries(queries); + @Override protected Void run(@Nullable Map<UUID, Set<Long>> arg) throws IgniteException { + Set<Long> queries = arg.get(ignite.localNode().id()); + + if (!F.isEmpty(queries)) + ignite.context().query().cancelQueries(queries); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java index 1638da3..0dc0ec5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeJobResult; -import org.apache.ignite.internal.processors.query.GridQuery; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorMultiNodeTask; @@ -74,11 +74,11 @@ public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map /** {@inheritDoc} */ @Override protected Collection<VisorQuery> run(@Nullable Long duration) throws IgniteException { - Collection<GridQuery> queries = ignite.context().query().runningQueries(duration); + Collection<GridRunningQueryInfo> queries = ignite.context().query().runningQueries(duration); Collection<VisorQuery> res = new ArrayList<>(queries.size()); - for (GridQuery qry : queries) + for (GridRunningQueryInfo qry : queries) res.add(new VisorQuery(qry.id(), qry.query(), qry.cache())); return res; http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java index 518091c..e9beff9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.visor.query; import java.io.Serializable; -import java.util.UUID; /** * Arguments for {@link VisorQueryTask}. @@ -28,7 +27,7 @@ public class VisorQuery implements Serializable { private static final long serialVersionUID = 0L; /** */ - private UUID id; + private Long id; /** Query text. */ private String qry; @@ -41,7 +40,7 @@ public class VisorQuery implements Serializable { * @param qry Query text. * @param cache Cache where query was executed. */ - public VisorQuery(UUID id, String qry, String cache) { + public VisorQuery(Long id, String qry, String cache) { this.id = id; this.qry = qry; this.cache = cache; @@ -50,7 +49,7 @@ public class VisorQuery implements Serializable { /** * @return Query ID. */ - public UUID id() { + public Long id() { return id; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java index 8dcba2f..f53936f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java @@ -46,6 +46,9 @@ public class GridCacheTwoStepQuery { private boolean explain; /** */ + private String originalSql; + + /** */ private Collection<String> spaces; /** */ @@ -67,10 +70,12 @@ public class GridCacheTwoStepQuery { private List<Integer> extraCaches; /** + * @param originalSql Original query SQL. * @param schemas Schema names in query. * @param tbls Tables in query. */ - public GridCacheTwoStepQuery(Set<String> schemas, Set<String> tbls) { + public GridCacheTwoStepQuery(String originalSql, Set<String> schemas, Set<String> tbls) { + this.originalSql = originalSql; this.schemas = schemas; this.tbls = tbls; } @@ -196,6 +201,13 @@ public class GridCacheTwoStepQuery { } /** + * @return Original query SQL. + */ + public String originalSql() { + return originalSql; + } + + /** * @return Spaces. */ public Collection<String> spaces() { @@ -223,7 +235,7 @@ public class GridCacheTwoStepQuery { public GridCacheTwoStepQuery copy(Object[] args) { assert !explain; - GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(schemas, tbls); + GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, schemas, tbls); cp.caches = caches; cp.extraCaches = extraCaches; @@ -250,4 +262,4 @@ public class GridCacheTwoStepQuery { @Override public String toString() { return S.toString(GridCacheTwoStepQuery.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index cc281cf..aad524f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -53,6 +53,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.cache.Cache; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; @@ -82,7 +83,7 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; -import org.apache.ignite.internal.processors.query.GridQuery; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; @@ -286,9 +287,15 @@ public class IgniteH2Indexing implements GridQueryIndexing { private final Map<String, String> space2schema = new ConcurrentHashMap8<>(); /** */ + private AtomicLong qryIdGen; + + /** */ private GridSpinBusyLock busyLock; /** */ + private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap8<>(); + + /** */ private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() { @Nullable @Override public ConnectionWrapper get() { ConnectionWrapper c = super.get(); @@ -832,6 +839,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridH2QueryContext.set(ctx); + GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, spaceName, U.currentTimeMillis(), cancel); + + runs.putIfAbsent(run.id(), run); + try { ResultSet rs = executeSqlQueryWithTimer(spaceName, stmt, conn, qry, params, timeout, cancel); @@ -839,6 +850,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { } finally { GridH2QueryContext.clearThreadLocal(); + + runs.remove(run.id()); } } }; @@ -1088,6 +1101,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false)); + GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), spaceName, qry, U.currentTimeMillis(), null); + + runs.put(run.id(), run); + try { ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null); @@ -1095,6 +1112,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { } finally { GridH2QueryContext.clearThreadLocal(); + + runs.remove(run.id()); } } @@ -1735,6 +1754,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { this.busyLock = busyLock; + qryIdGen = new AtomicLong(); + if (SysProperties.serializeJavaObject) { U.warn(log, "Serialization of Java objects in H2 was enabled."); @@ -1785,7 +1806,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { marshaller = ctx.config().getMarshaller(); mapQryExec = new GridMapQueryExecutor(busyLock); - rdcQryExec = new GridReduceQueryExecutor(busyLock); + rdcQryExec = new GridReduceQueryExecutor(qryIdGen, busyLock); mapQryExec.start(ctx, this); rdcQryExec.start(ctx, this); @@ -2239,6 +2260,23 @@ public class IgniteH2Indexing implements GridQueryIndexing { return cols; } + + /** {@inheritDoc} */ + @Override public Collection<GridRunningQueryInfo> runningQueries(long duration) { + return rdcQryExec.longRunningQueries(duration); + } + + /** {@inheritDoc} */ + @Override public void cancelQueries(Set<Long> queries) { + rdcQryExec.cancelQueries(queries); + } + + /** {@inheritDoc} */ + @Override public void cancelAllQueries() { + for (Connection conn : conns) + U.close(conn, log); + } + /** * Wrapper to store connection and flag is schema set or not. */ @@ -3148,19 +3186,31 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } - /** {@inheritDoc} */ - @Override public Collection<GridQuery> runningQueries(long duration) { - return rdcQryExec.longRunningQueries(duration); - } + /** + * Query run. + */ + private static class QueryRun { + /** */ + private final GridRunningQueryInfo qry; - /** {@inheritDoc} */ - @Override public void cancelQueries(Set<UUID> queries) { - rdcQryExec.cancelQueries(queries); - } + /** */ + private final long startTime; - /** {@inheritDoc} */ - @Override public void cancelAllQueries() { - for (Connection conn : conns) - U.close(conn, log); + /** */ + private final GridQueryCancel cancel; + + /** + * + * @param id + * @param qry + * @param cache + * @param startTime + * @param cancel + */ + public QueryRun(Long id, String qry, String cache, long startTime, GridQueryCancel cancel) { + this.qry = new GridRunningQueryInfo(id, qry, cache, startTime, cancel); + this.startTime = startTime; + this.cancel = cancel; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index 09952cf..e164315 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -174,7 +174,7 @@ public class GridSqlQuerySplitter { qry = collectAllTables(qry, schemas, tbls); // Build resulting two step query. - GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(schemas, tbls); + GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(qry.getSQL(), schemas, tbls); // Map query will be direct reference to the original query AST. // Thus all the modifications will be performed on the original AST, so we should be careful when @@ -958,4 +958,4 @@ public class GridSqlQuerySplitter { private static GridSqlFunction function(GridSqlFunctionType type) { return new GridSqlFunction(type); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 39c494d..6f96b8d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -62,7 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; -import org.apache.ignite.internal.processors.query.GridQuery; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; @@ -121,7 +121,7 @@ public class GridReduceQueryExecutor { private IgniteLogger log; /** */ - private final AtomicLong reqIdGen = new AtomicLong(); + private final AtomicLong qryIdGen; /** */ private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>(); @@ -168,9 +168,11 @@ public class GridReduceQueryExecutor { }; /** + * @param qryIdGen Query ID generator. * @param busyLock Busy lock. */ - public GridReduceQueryExecutor(GridSpinBusyLock busyLock) { + public GridReduceQueryExecutor(AtomicLong qryIdGen, GridSpinBusyLock busyLock) { + this.qryIdGen = qryIdGen; this.busyLock = busyLock; } @@ -494,13 +496,11 @@ public class GridReduceQueryExecutor { } } - final long qryReqId = reqIdGen.incrementAndGet(); + final long qryReqId = qryIdGen.incrementAndGet(); final String space = cctx.name(); - final QueryRun r = new QueryRun(UUID.randomUUID(), - F.isEmpty(qry.mapQueries()) ? "" : qry.mapQueries().get(0).query(), - F.first(qry.schemas()), + final QueryRun r = new QueryRun(qryReqId, qry.originalSql(), space, h2.connectionForSpace(space), qry.mapQueries().size(), qry.pageSize(), System.currentTimeMillis(), cancel); @@ -1313,13 +1313,13 @@ public class GridReduceQueryExecutor { * @param duration Duration to check. * @return Collection of IDs and statements of long running queries. */ - public Collection<GridQuery> longRunningQueries(long duration) { - Collection<GridQuery> res = new ArrayList<>(); + public Collection<GridRunningQueryInfo> longRunningQueries(long duration) { + Collection<GridRunningQueryInfo> res = new ArrayList<>(); long curTime = U.currentTimeMillis(); for (QueryRun run : runs.values()) { - if (curTime - run.startTime > duration) + if (run.qry.longQuery(curTime, duration)) res.add(run.qry); } @@ -1331,10 +1331,10 @@ public class GridReduceQueryExecutor { * * @param queries Queries IDs to cancel. */ - public void cancelQueries(Set<UUID> queries) { + public void cancelQueries(Set<Long> queries) { for (QueryRun run : runs.values()) { if (queries.contains(run.qry.id())) - run.cancel.cancel(); + run.qry.cancel(); } } @@ -1343,7 +1343,7 @@ public class GridReduceQueryExecutor { */ private static class QueryRun { /** */ - private final GridQuery qry; + private final GridRunningQueryInfo qry; /** */ private final List<GridMergeIndex> idxs; @@ -1357,12 +1357,6 @@ public class GridReduceQueryExecutor { /** */ private final int pageSize; - /** */ - private final long startTime; - - /** */ - private final GridQueryCancel cancel; - /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */ private final AtomicReference<Object> state = new AtomicReference<>(); @@ -1376,13 +1370,11 @@ public class GridReduceQueryExecutor { * @param startTime Start time. * @param cancel Query cancel handler. */ - private QueryRun(UUID id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) { - this.qry = new GridQuery(id, qry, cache); + private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) { + this.qry = new GridRunningQueryInfo(id, qry, cache, startTime, cancel); this.conn = (JdbcConnection)conn; this.idxs = new ArrayList<>(idxsCnt); this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE; - this.startTime = startTime; - this.cancel = cancel; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java index 98376d7..d6a766d 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java @@ -30,7 +30,6 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; @@ -38,8 +37,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.processors.query.GridQuery; -import org.apache.ignite.internal.processors.query.GridQueryIndexing; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -272,7 +270,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { GridQueryProcessor qryProc = ((IgniteKernal)ignite).context().query(); - Collection<GridQuery> queries = qryProc.runningQueries(500); + Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(500); assertEquals(1, queries.size()); @@ -306,11 +304,11 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { GridQueryProcessor queryProc = ((IgniteKernal)ignite).context().query(); - Collection<GridQuery> queries = queryProc.runningQueries(500); + Collection<GridRunningQueryInfo> queries = queryProc.runningQueries(500); assertEquals(1, queries.size()); - for (GridQuery query : queries) + for (GridRunningQueryInfo query : queries) queryProc.cancelQueries(Collections.singleton(query.id())); Thread.sleep(2000); // Give cluster some time to cancel query and cleanup resources.