Repository: ignite Updated Branches: refs/heads/ignite-3443 e13e6aed1 -> 71455f51d
IGNITE-3443 WIP. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/71455f51 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/71455f51 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/71455f51 Branch: refs/heads/ignite-3443 Commit: 71455f51d8e946b4607b4ad2bc5e8497aa29734d Parents: e13e6ae Author: Alexey Kuznetsov <akuznet...@apache.org> Authored: Mon Oct 10 15:45:54 2016 +0700 Committer: Alexey Kuznetsov <akuznet...@apache.org> Committed: Mon Oct 10 15:45:54 2016 +0700 ---------------------------------------------------------------------- .../GridCacheQueryDetailsMetricsAdapter.java | 11 ++ .../query/GridCacheQueryDetailsMetricsKey.java | 61 -------- .../cache/query/GridCacheQueryManager.java | 153 ++++++------------- .../visor/cache/VisorCacheQueryBaseMetrics.java | 99 ------------ .../VisorCacheQueryMetricsCollectorTask.java | 124 +++++++++++++++ .../query/VisorQueryMetricsCollectorTask.java | 72 --------- 6 files changed, 179 insertions(+), 341 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/71455f51/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java index 1fa90b5..1f99b35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java @@ -60,6 +60,17 @@ public class GridCacheQueryDetailsMetricsAdapter implements QueryDetailsMetrics, private long lastStartTime; /** + * Calculate hash code for query. + * + * @param qryType Query type. + * @param qry Textual query representation. + * @return Hash code. + */ + public static Integer queryHashCode(GridCacheQueryType qryType, String qry) { + return 31 * qryType.hashCode() + qry.hashCode(); + } + + /** * Required by {@link Externalizable}. */ public GridCacheQueryDetailsMetricsAdapter() { http://git-wip-us.apache.org/repos/asf/ignite/blob/71455f51/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsKey.java deleted file mode 100644 index 2011b65..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsKey.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey; - -/** - * Key for query details metrics to store in system cache. - */ -public class GridCacheQueryDetailsMetricsKey extends GridCacheUtilityKey<GridCacheQueryDetailsMetricsKey> - implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - public static final GridCacheQueryDetailsMetricsKey INSTANCE = new GridCacheQueryDetailsMetricsKey(); - - /** {@inheritDoc} */ - @Override public int hashCode() { - return getClass().getName().hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equalsx(GridCacheQueryDetailsMetricsKey that) { - return true; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "GridCacheQueryDetailsMetricsKey []"; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/71455f51/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 42d87fa..0d7190f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -39,14 +39,13 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import javax.cache.Cache; import javax.cache.expiry.ExpiryPolicy; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.QueryDetailsMetrics; import org.apache.ignite.cache.query.QueryMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryExecutedEvent; @@ -150,6 +149,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private volatile GridCacheQueryMetricsAdapter metrics = new GridCacheQueryMetricsAdapter(); /** */ + private final ConcurrentSkipListMap<Integer, QueryDetailsMetrics> qryHist = + new ConcurrentSkipListMap<>(); + + /** */ + private int qryHistSz; + + /** */ private final ConcurrentMap<UUID, RequestFutureMap> qryIters = new ConcurrentHashMap8<>(); @@ -177,6 +183,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte qryProc = cctx.kernalContext().query(); space = cctx.name(); maxIterCnt = cctx.config().getMaxQueryIteratorsCount(); + qryHistSz = cctx.config().getQueryMetricsHistorySize(); lsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -2076,6 +2083,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return metrics.copy(); } + public Collection<QueryDetailsMetrics> detailsMetrics() { + return qryHist.values(); + } + /** * Resets metrics. */ @@ -2096,11 +2107,37 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (failed || completed) { metrics.update(duration, failed, completed); - IgniteInternalCache<GridCacheQueryDetailsMetricsKey, Map<Integer, GridCacheQueryDetailsMetricsAdapter>> cache = - cctx.grid().utilityCache(); + if (qryHistSz > 0) { + Integer qryHash = GridCacheQueryDetailsMetricsAdapter.queryHashCode(qryType, qry); + + QueryDetailsMetrics qryMetrics = qryHist.get(qryHash); + + if (qryMetrics == null) { + qryMetrics = new GridCacheQueryDetailsMetricsAdapter(qryType, qry); + + QueryDetailsMetrics oldMetrics = qryHist.putIfAbsent(qryHash, qryMetrics); - cache.invokeAsync(GridCacheQueryDetailsMetricsKey.INSTANCE, - new AddMetricsProcessor(qryType, qry, startTime, duration, failed, completed)); + if (oldMetrics != null) + qryMetrics = oldMetrics; + } + + ((GridCacheQueryDetailsMetricsAdapter)qryMetrics).update(startTime, duration, failed, completed); + + while (qryHist.size() > qryHistSz) { + Map.Entry<Integer, QueryDetailsMetrics> firstEntry = qryHist.firstEntry(); + qryHash = firstEntry.getKey(); + qryMetrics = firstEntry.getValue(); + + for (Map.Entry<Integer, QueryDetailsMetrics> entry : qryHist.entrySet()) { + if (qryMetrics.lastStartTime() > entry.getValue().lastStartTime()) { + qryHash = entry.getKey(); + qryMetrics = entry.getValue(); + } + } + + qryHist.remove(qryHash); + } + } } } @@ -3552,106 +3589,4 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return canceled != null && canceled.contains(key); } } - - /** - * Entry processor to add metrics. - */ - private static class AddMetricsProcessor implements - EntryProcessor<GridCacheQueryDetailsMetricsKey, Map<Integer, GridCacheQueryDetailsMetricsAdapter>, Void>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private GridCacheQueryType qryType; - - /** */ - private String qry; - - /** */ - private long startTime; - - /** */ - private long duration; - - /** */ - private boolean failed; - - /** */ - private boolean completed; - - /** - * Required by {@link Externalizable}. - */ - public AddMetricsProcessor() { - // No-op. - } - - /** - * Full constructor. - * - * @param qryType Query type. - * @param qry Query description. - * @param startTime Query started time. - * @param duration Execution duration. - * @param failed {@code true} if execution failed. - * @param completed {@code true} if query completed. - */ - public AddMetricsProcessor(GridCacheQueryType qryType, String qry, long startTime, long duration, - boolean failed, boolean completed) { - this.qryType = qryType; - this.qry = qry; - this.startTime = startTime; - this.duration = duration; - this.failed = failed; - this.completed = completed; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeEnum(out, qryType); - U.writeString(out, qry); - out.writeLong(startTime); - out.writeLong(duration); - out.writeBoolean(failed); - out.writeBoolean(completed); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - qryType = GridCacheQueryType.fromOrdinal(in.readByte()); - qry = U.readString(in); - startTime = in.readLong(); - duration = in.readLong(); - failed = in.readBoolean(); - completed = in.readBoolean(); - } - - /** {@inheritDoc} */ - @Override public Void process( - MutableEntry<GridCacheQueryDetailsMetricsKey, Map<Integer, GridCacheQueryDetailsMetricsAdapter>> entry, - Object... arguments) throws EntryProcessorException { - Map<Integer, GridCacheQueryDetailsMetricsAdapter> map = entry.getValue(); - - if (map == null) - map = new HashMap<>(); - else - map = new HashMap<>(map); - - Integer qryHash = 31 * qryType.hashCode() + qry.hashCode(); - - GridCacheQueryDetailsMetricsAdapter qryMetrics = map.get(qryHash); - - if (qryMetrics == null) - qryMetrics = new GridCacheQueryDetailsMetricsAdapter(qryType, qry); - - qryMetrics.update(startTime, duration, failed, completed); - - map.put(qryHash, qryMetrics); - - entry.setValue(map); - - return null; - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/71455f51/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryBaseMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryBaseMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryBaseMetrics.java deleted file mode 100644 index d4ef43d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryBaseMetrics.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.visor.cache; - -import java.io.Serializable; -import org.apache.ignite.internal.LessNamingBean; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Base class for data transfer object with cache query metrics. - */ -public abstract class VisorCacheQueryBaseMetrics implements Serializable, LessNamingBean { - /** Minimum execution time of query. */ - private long minTime; - - /** Maximum execution time of query. */ - private long maxTime; - - /** Average execution time of query. */ - private double avgTime; - - /** Number of executions. */ - private int execs; - - /** Number of executions failed. */ - private int fails; - - /** - * Initialize metrics. - * - * @param minTime Minimum execution time of query. - * @param maxTime Maximum execution time of query. - * @param avgTime Average execution time of query. - * @param execs Number of executions. - * @param fails Number of executions failed. - */ - protected void init(long minTime, long maxTime, double avgTime, int execs, int fails) { - this.minTime = minTime; - this.maxTime = maxTime; - this.avgTime = avgTime; - this.execs = execs; - this.fails = fails; - } - - /** - * @return Minimum execution time of query. - */ - public long minimumTime() { - return minTime; - } - - /** - * @return Maximum execution time of query. - */ - public long maximumTime() { - return maxTime; - } - - /** - * @return Average execution time of query. - */ - public double averageTime() { - return avgTime; - } - - /** - * @return Number of executions. - */ - public int executions() { - return execs; - } - - /** - * @return Total number of times a query execution failed. - */ - public int fails() { - return fails; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(VisorCacheQueryBaseMetrics.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/71455f51/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java new file mode 100644 index 0000000..89bdeb9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.cache; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.QueryDetailsMetrics; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryDetailsMetricsAdapter; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; +import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorMultiNodeTask; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isIgfsCache; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isSystemCache; + +/** + * Task to collect cache query metrics. + */ +@GridInternal +public class VisorCacheQueryMetricsCollectorTask extends VisorMultiNodeTask<Void, + Map<String, Collection<QueryDetailsMetrics>>, Map<String, Collection<QueryDetailsMetrics>>> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorCacheQueryMetricsCollectorJob job(Void arg) { + return new VisorCacheQueryMetricsCollectorJob(arg, debug); + } + + /** {@inheritDoc} */ + @Nullable @Override protected Map<String, Collection<QueryDetailsMetrics>> reduce0(List<ComputeJobResult> results) + throws IgniteException { + Map<String, Collection<QueryDetailsMetrics>> taskRes = U.newHashMap(results.size()); + + Map<Integer, GridCacheQueryDetailsMetricsAdapter> aggMetrics = new HashMap<>(); + + for (ComputeJobResult res : results) { + if (res.getException() == null) { + Map<String, Collection<QueryDetailsMetrics>> dm = res.getData(); + + for (VisorCacheMetrics cm : cms) { + VisorCacheAggregatedMetrics am = grpAggrMetrics.get(cm.name()); + + if (am == null) { + am = VisorCacheAggregatedMetrics.from(cm); + + grpAggrMetrics.put(cm.name(), am); + } + + am.metrics().put(res.getNode().id(), cm); + } + } + } + } + + /** + * Job that will actually collect query metrics. + */ + private static class VisorCacheQueryMetricsCollectorJob extends VisorJob<Void, Map<String, Collection<QueryDetailsMetrics>>> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Create job with specified argument. + * + * @param arg Job argument. + * @param debug Debug flag. + */ + protected VisorCacheQueryMetricsCollectorJob(@Nullable Void arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected Map<String, Collection<QueryDetailsMetrics>> run(@Nullable Void arg) throws IgniteException { + IgniteConfiguration cfg = ignite.configuration(); + + GridCacheProcessor cacheProc = ignite.context().cache(); + + Collection<String> cacheNames = cacheProc.cacheNames(); + + Map<String, Collection<QueryDetailsMetrics>> res = new HashMap<>(cacheNames.size()); + + for (String cacheName : cacheNames) { + if (!isSystemCache(cacheName) && isIgfsCache(cfg, cacheName)) { + GridCacheQueryManager<Object, Object> qryMgr = cacheProc.cache(cacheName).context().queries(); + + res.put(cacheName, qryMgr.detailsMetrics()); + } + } + + return null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VisorCacheQueryMetricsCollectorJob.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/71455f51/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryMetricsCollectorTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryMetricsCollectorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryMetricsCollectorTask.java deleted file mode 100644 index 9e352ad..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryMetricsCollectorTask.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.visor.query; - -import java.util.Iterator; -import javax.cache.Cache; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.query.GridCacheQueryDetailsMetricsAdapter; -import org.apache.ignite.internal.processors.cache.query.GridCacheQueryDetailsMetricsKey; -import org.apache.ignite.internal.processors.task.GridInternal; -import org.apache.ignite.internal.visor.VisorJob; -import org.apache.ignite.internal.visor.VisorOneNodeTask; -import org.jetbrains.annotations.Nullable; - -/** - * Task to collect query metrics. - */ -@GridInternal -public class VisorQueryMetricsCollectorTask extends VisorOneNodeTask<Void, String> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override protected VisorQueryMetricsCollectorJob job(Void arg) { - return new VisorQueryMetricsCollectorJob(arg, debug); - } - - /** - * Job that will actually collect query metrics. - */ - private static class VisorQueryMetricsCollectorJob extends VisorJob<Void, String> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Create job with specified argument. - * - * @param arg Job argument. - * @param debug Debug flag. - */ - protected VisorQueryMetricsCollectorJob(@Nullable Void arg, boolean debug) { - super(arg, debug); - } - - /** {@inheritDoc} */ - @Override protected String run(@Nullable Void arg) throws IgniteException { -// IgniteInternalCache<GridCacheQueryDetailsMetricsKey, GridCacheQueryDetailsMetricsAdapter> cache = ignite.utilityCache(); -// -// for (Cache.Entry<GridCacheQueryDetailsMetricsKey, GridCacheQueryDetailsMetricsAdapter> entry : cache) { -// entry. -// } - - return "IGNITE-3443: TODO Metrics"; - } - } -}