IGNITE-3443 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2dcbe38b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2dcbe38b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2dcbe38b Branch: refs/heads/ignite-3443 Commit: 2dcbe38b7625a3488043a460849866d4a3480e38 Parents: 920005d Author: Alexey Kuznetsov <akuznet...@apache.org> Authored: Thu Jul 7 21:01:30 2016 +0700 Committer: Alexey Kuznetsov <akuznet...@apache.org> Committed: Thu Jul 7 21:01:30 2016 +0700 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 6 ++-- .../cache/query/GridCacheQueryAdapter.java | 4 +-- .../query/GridCacheQueryFutureAdapter.java | 2 +- .../cache/query/GridCacheQueryManager.java | 14 ++++---- .../processors/query/GridQueryProcessor.java | 34 ++++++++++---------- 5 files changed, 31 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2dcbe38b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 12ec8b8..84e5a49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -467,7 +467,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (grp != null) qry.projection(grp); - final GridCloseableIterator<Entry<K, V>> iter = ctx.kernalContext().query().executeQuery(ctx, + final GridCloseableIterator<Entry<K, V>> iter = ctx.kernalContext().query().executeQuery("ScanQuery", ctx, new IgniteOutClosureX<GridCloseableIterator<Entry<K,V>>>() { @Override public GridCloseableIterator<Entry<K,V>> applyx() throws IgniteCheckedException { final GridCloseableIterator<Map.Entry> iter0 = qry.executeScanQuery(); @@ -503,7 +503,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (grp != null) qry.projection(grp); - fut = ctx.kernalContext().query().executeQuery(ctx, + fut = ctx.kernalContext().query().executeQuery("FullTextQuery", ctx, new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() { @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException { return qry.execute(); @@ -516,7 +516,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (grp != null) qry.projection(grp); - fut = ctx.kernalContext().query().executeQuery(ctx, + fut = ctx.kernalContext().query().executeQuery("SpiQuery", ctx, new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() { @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException { return qry.execute(((SpiQuery)filter).getArgs()); http://git-wip-us.apache.org/repos/asf/ignite/blob/2dcbe38b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 90e14f4..6a93056 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -396,8 +396,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param startTime Start time. * @param duration Duration. */ - public void onCompleted(Object res, Throwable err, long startTime, long duration) { - GridQueryProcessor.onCompleted(cctx, res, err, startTime, duration, log); + public void onCompleted(String qry, Object res, Throwable err, long startTime, long duration) { + GridQueryProcessor.onCompleted(qry, cctx, res, err, startTime, duration, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2dcbe38b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java index db519f5..779e7da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java @@ -155,7 +155,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda @Override public boolean onDone(Collection<R> res, Throwable err) { cctx.time().removeTimeoutObject(this); - qry.query().onCompleted(res, err, startTime(), duration()); + qry.query().onCompleted("onDone [clause=" + qry.query().clause() + ", type=" + qry.query().type() + "]", res, err, startTime(), duration()); return super.onDone(res, err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2dcbe38b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index d376df5..dec3273 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1160,9 +1160,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte key = (K)cctx.unwrapBinaryIfNeeded(key, keepBinary); - if (filter != null || locNode) { + if (filter != null || locNode) val = (V)cctx.unwrapBinaryIfNeeded(val, keepBinary); - } if (filter != null && !filter.apply(key, val)) continue; @@ -1723,6 +1722,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte throw new IllegalStateException("Failed to process query request (grid is stopping)."); final boolean statsEnabled = cctx.config().isStatisticsEnabled(); + final String namex = cctx.namex(); boolean needUpdStatistics = updStatisticsIfNeeded && statsEnabled; @@ -1736,7 +1736,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash()); final IgniteBiPredicate filter = qry.scanFilter(); - final String namex = cctx.namex(); final ClusterNode locNode = cctx.localNode(); final UUID subjId = qry.subjectId(); @@ -1761,7 +1760,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (updStatisticsIfNeeded) { needUpdStatistics = false; - cctx.queries().onCompleted(U.currentTimeMillis() - startTime, false); + cctx.queries().onCompleted("SCAN cache " + namex, U.currentTimeMillis() - startTime, false); } final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); @@ -1814,7 +1813,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } catch (Exception e) { if (needUpdStatistics) - cctx.queries().onCompleted(U.currentTimeMillis() - startTime, true); + cctx.queries().onCompleted("SCAN cache " + namex, U.currentTimeMillis() - startTime, true); throw e; } @@ -2106,10 +2105,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** + * @qry Query that was completed. * @param duration Execution duration. * @param fail {@code true} if execution failed. */ - public void onCompleted(long duration, boolean fail) { + public void onCompleted(String qry, long duration, boolean fail) { + log.warning(qry); + metrics.onQueryCompleted(duration, fail); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2dcbe38b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index a42eb98..87d9b71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -743,7 +743,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { try { final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context(); - return executeQuery(cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() { + return executeQuery("Query [space=" + space + ", clause=" + clause + "]", cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() { @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException { TypeDescriptor type = typesByName.get(new TypeName(space, resType)); @@ -773,7 +773,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { try { final GridCacheContext<Object, Object> cctx = ctx.cache().internalCache(space).context(); - return executeQuery(cctx, new IgniteOutClosureX<Iterable<List<?>>>() { + return executeQuery("TwoStepQuery", cctx, new IgniteOutClosureX<Iterable<List<?>>>() { @Override public Iterable<List<?>> applyx() throws IgniteCheckedException { return idx.queryTwoStep( cctx, @@ -802,7 +802,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - return executeQuery(cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() { + return executeQuery("SqlFieldsQuery [sql=" + qry.getSql() + "]", cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() { @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException { return idx.queryTwoStep(cctx, qry); } @@ -828,7 +828,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - return executeQuery(cctx, new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() { + return executeQuery("SqlQuery [type=" + qry.getType() + ", sql=" + qry.getSql() + "]", cctx, new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() { @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException { return idx.queryTwoStep(cctx, qry); } @@ -856,7 +856,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - return executeQuery( + return executeQuery("SqlQuery [type=" + qry.getType() + ", sql=" + qry.getSql() + "]", cctx, new IgniteOutClosureX<Iterator<Cache.Entry<K, V>>>() { @Override public Iterator<Cache.Entry<K, V>> applyx() throws IgniteCheckedException { @@ -957,7 +957,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { try { final boolean keepBinary = cctx.keepBinary(); - return executeQuery(cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() { + return executeQuery("SqlFieldsQuery [" + qry.getSql() + "]", cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() { @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException { String space = cctx.name(); String sql = qry.getSql(); @@ -1073,14 +1073,13 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @return Type name. */ public static String typeName(String clsName) { - int packageEnd = clsName.lastIndexOf('.'); + int pkgEnd = clsName.lastIndexOf('.'); - if (packageEnd >= 0 && packageEnd < clsName.length() - 1) - clsName = clsName.substring(packageEnd + 1); + if (pkgEnd >= 0 && pkgEnd < clsName.length() - 1) + clsName = clsName.substring(pkgEnd + 1); - if (clsName.endsWith("[]")) { + if (clsName.endsWith("[]")) clsName = clsName.substring(0, clsName.length() - 2) + "_array"; - } int parentEnd = clsName.lastIndexOf('$'); @@ -1111,7 +1110,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { try { final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context(); - return executeQuery(cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() { + return executeQuery("TextQuery [space=" + space + ", clause=" + clause + "]", cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() { @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException { TypeDescriptor type = typesByName.get(new TypeName(space, resType)); @@ -1149,7 +1148,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { try { final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context(); - return executeQuery(cctx, new IgniteOutClosureX<GridQueryFieldsResult>() { + return executeQuery("QueryFields [space=" + space + ", clause=" + clause + "]", cctx, new IgniteOutClosureX<GridQueryFieldsResult>() { @Override public GridQueryFieldsResult applyx() throws IgniteCheckedException { return idx.queryFields(space, clause, params, filters); } @@ -1754,11 +1753,12 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * @param qry * @param cctx Cache context. * @param clo Closure. * @param complete Complete. */ - public <R> R executeQuery(GridCacheContext<?, ?> cctx, IgniteOutClosureX<R> clo, boolean complete) + public <R> R executeQuery(String qry, GridCacheContext<?, ?> cctx, IgniteOutClosureX<R> clo, boolean complete) throws IgniteCheckedException { final long startTime = U.currentTimeMillis(); @@ -1791,7 +1791,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { cctx.queries().onExecuted(err != null); if (complete && err == null) - onCompleted(cctx, res, null, startTime, U.currentTimeMillis() - startTime, log); + onCompleted(qry, cctx, res, null, startTime, U.currentTimeMillis() - startTime, log); } } @@ -1803,11 +1803,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param duration Duration. * @param log Logger. */ - public static void onCompleted(GridCacheContext<?, ?> cctx, Object res, Throwable err, + public static void onCompleted(String qry, GridCacheContext<?, ?> cctx, Object res, Throwable err, long startTime, long duration, IgniteLogger log) { boolean fail = err != null; - cctx.queries().onCompleted(duration, fail); + cctx.queries().onCompleted(qry, duration, fail); if (log.isTraceEnabled()) log.trace("Query execution completed [startTime=" + startTime +