This is an automated email from the ASF dual-hosted git repository. nizhikov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 6d8b9bb IGNITE-11925: QueryMetrics migration. (#6627) 6d8b9bb is described below commit 6d8b9bb383db9ad4bdbb181ddea5d83f1152a201 Author: Nikolay <nizhi...@apache.org> AuthorDate: Sun Jul 14 10:03:30 2019 +0300 IGNITE-11925: QueryMetrics migration. (#6627) QueryMetrics migration. --- .../apache/ignite/cache/query/QueryMetrics.java | 6 +- .../processors/cache/CacheMetricsImpl.java | 4 +- .../processors/cache/query/CacheQuery.java | 13 -- .../cache/query/GridCacheQueryAdapter.java | 18 -- .../cache/query/GridCacheQueryManager.java | 10 +- .../cache/query/GridCacheQueryMetricsAdapter.java | 206 ++++++++++++++------- .../processors/metric/impl/MetricUtils.java | 26 +++ .../cache/CacheAbstractQueryMetricsSelfTest.java | 12 +- 8 files changed, 176 insertions(+), 119 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java index d0f0a50..493f349 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java @@ -17,12 +17,8 @@ package org.apache.ignite.cache.query; -import org.apache.ignite.internal.processors.cache.query.CacheQuery; - /** - * Cache query metrics used to obtain statistics on query. Metrics for particular query - * can be get via {@link CacheQuery#metrics()} method or aggregated metrics for all queries - * via {@link CacheQuery#metrics()}. + * Cache query metrics used to obtain statistics on query. */ public interface QueryMetrics { /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index ab1c4a1..1353429 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -202,9 +202,7 @@ public class CacheMetricsImpl implements CacheMetrics { delegate = null; - String regName = cacheMetricsRegistryName(cctx.name(), isNear); - - MetricRegistry mreg = cctx.kernalContext().metric().registry(regName); + MetricRegistry mreg = cctx.kernalContext().metric().registry(cacheMetricsRegistryName(cctx.name(), isNear)); reads = mreg.metric("CacheGets", "The total number of gets to the cache."); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index c8f392f..65e46cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.query; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.affinity.AffinityKey; import org.apache.ignite.cache.query.Query; -import org.apache.ignite.cache.query.QueryMetrics; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.cache.query.annotations.QueryTextField; @@ -259,18 +258,6 @@ public interface CacheQuery<T> { public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, @Nullable Object... args); /** - * Gets metrics for this query. - * - * @return Query metrics. - */ - public QueryMetrics metrics(); - - /** - * Resets metrics for this query. - */ - public void resetMetrics(); - - /** * @return Scan query iterator. */ public GridCloseableIterator executeScanQuery() throws IgniteCheckedException; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index e304e05..24f7959 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -35,7 +35,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cache.query.Query; -import org.apache.ignite.cache.query.QueryMetrics; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; @@ -109,9 +108,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { private final boolean incMeta; /** */ - private volatile GridCacheQueryMetricsAdapter metrics; - - /** */ private volatile int pageSize = Query.DFLT_PAGE_SIZE; /** */ @@ -178,8 +174,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { log = cctx.logger(getClass()); - metrics = new GridCacheQueryMetricsAdapter(); - this.incMeta = false; this.clsName = null; this.clause = null; @@ -222,8 +216,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { this.dataPageScanEnabled = dataPageScanEnabled; log = cctx.logger(getClass()); - - metrics = new GridCacheQueryMetricsAdapter(); } /** @@ -485,16 +477,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { return execute0(rmtReducer, args); } - /** {@inheritDoc} */ - @Override public QueryMetrics metrics() { - return metrics.copy(); - } - - /** {@inheritDoc} */ - @Override public void resetMetrics() { - metrics = new GridCacheQueryMetricsAdapter(); - } - /** * @param rmtReducer Optional reducer. * @param args Arguments. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index ae5f7df..a6cbe6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -76,8 +76,8 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate; @@ -185,7 +185,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private int maxIterCnt; /** */ - private volatile GridCacheQueryMetricsAdapter metrics = new GridCacheQueryMetricsAdapter(); + private volatile GridCacheQueryMetricsAdapter metrics; /** */ private int detailMetricsSz; @@ -273,6 +273,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } }; + metrics = new GridCacheQueryMetricsAdapter(cctx.kernalContext().metric(), cctx.name(), cctx.isNear()); + cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); qryTopVer = cctx.startTopologyVersion(); @@ -1678,7 +1680,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @return Cache queries metrics. */ public QueryMetrics metrics() { - return metrics.copy(); + return metrics.snapshot(); } /** @@ -1731,7 +1733,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * Resets metrics. */ public void resetMetrics() { - metrics = new GridCacheQueryMetricsAdapter(); + metrics.reset(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java index ca687bc..8413c83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java @@ -21,40 +21,53 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.concurrent.atomic.LongAdder; import org.apache.ignite.cache.query.QueryMetrics; -import org.apache.ignite.internal.util.GridAtomicLong; +import org.apache.ignite.internal.processors.metric.GridMetricManager; +import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetricImpl; +import org.apache.ignite.internal.processors.metric.impl.LongMetricImpl; +import org.apache.ignite.internal.processors.metric.impl.MetricUtils; import org.apache.ignite.internal.util.typedef.internal.S; /** * Adapter for {@link QueryMetrics}. */ -public class GridCacheQueryMetricsAdapter implements QueryMetrics, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - +public class GridCacheQueryMetricsAdapter implements QueryMetrics { /** Minimum time of execution. */ - private final GridAtomicLong minTime = new GridAtomicLong(Long.MAX_VALUE); + private final LongMetricImpl minTime; /** Maximum time of execution. */ - private final GridAtomicLong maxTime = new GridAtomicLong(); + private final LongMetricImpl maxTime; /** Sum of execution time for all completed queries. */ - private final LongAdder sumTime = new LongAdder(); - - /** Average time of execution. - * If doesn't equal zero then this metrics set is copy from remote node and doesn't actually update. - */ - private double avgTime; + private final LongAdderMetricImpl sumTime; /** Number of executions. */ - private final LongAdder execs = new LongAdder(); + private final LongAdderMetricImpl execs; /** Number of completed executions. */ - private final LongAdder completed = new LongAdder(); + private final LongAdderMetricImpl completed; /** Number of fails. */ - private final LongAdder fails = new LongAdder(); + private final LongAdderMetricImpl fails; + + /** + * @param mmgr Metrics manager. + * @param cacheName Cache name. + * @param isNear Is near flag. + */ + public GridCacheQueryMetricsAdapter(GridMetricManager mmgr, String cacheName, boolean isNear) { + MetricRegistry mreg = mmgr.registry(MetricUtils.cacheMetricsRegistryName(cacheName, isNear)); + + minTime = mreg.metric("QueryMinimalTime", null); + minTime.value(Long.MAX_VALUE); + + maxTime = mreg.metric("QueryMaximumTime", null); + sumTime = mreg.longAdderMetric("QuerySumTime", null); + execs = mreg.longAdderMetric("QueryExecuted", null); + completed = mreg.longAdderMetric("QueryCompleted", null); + fails = mreg.longAdderMetric("QueryFailed", null); + } /** {@inheritDoc} */ @Override public long minimumTime() { @@ -70,33 +83,19 @@ public class GridCacheQueryMetricsAdapter implements QueryMetrics, Externalizabl /** {@inheritDoc} */ @Override public double averageTime() { - if (avgTime > 0) - return avgTime; - else { - double val = completed.sum(); + double val = completed.longValue(); - return val > 0 ? sumTime.sum() / val : 0.0; - } + return val > 0 ? sumTime.longValue() / val : 0.0; } /** {@inheritDoc} */ @Override public int executions() { - return execs.intValue(); - } - - /** - * Gets total number of completed executions of query. - * This value is actual only for local node. - * - * @return Number of completed executions. - */ - public int completedExecutions() { - return completed.intValue(); + return (int)execs.longValue(); } /** {@inheritDoc} */ @Override public int fails() { - return fails.intValue(); + return (int)fails.longValue(); } /** @@ -114,53 +113,120 @@ public class GridCacheQueryMetricsAdapter implements QueryMetrics, Externalizabl execs.increment(); completed.increment(); - minTime.setIfLess(duration); - maxTime.setIfGreater(duration); + MetricUtils.setIfLess(minTime, duration); + MetricUtils.setIfGreater(maxTime, duration); sumTime.add(duration); } } - /** - * Merge with given metrics. - * - * @return Copy. - */ - public GridCacheQueryMetricsAdapter copy() { - GridCacheQueryMetricsAdapter m = new GridCacheQueryMetricsAdapter(); - - // Not synchronized because accuracy isn't critical. - m.fails.add(fails.sum()); - m.minTime.set(minTime.get()); - m.maxTime.set(maxTime.get()); - m.execs.add(execs.sum()); - m.completed.add(completed.sum()); - m.sumTime.add(sumTime.sum()); - m.avgTime = avgTime; - - return m; - } + /** @return Current metrics values. */ + public QueryMetrics snapshot() { + long minTimeVal = minTime.longValue(); - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(minTime.get()); - out.writeLong(maxTime.get()); - out.writeDouble(averageTime()); - out.writeInt(execs.intValue()); - out.writeInt(fails.intValue()); + return new QueryMetricsSnapshot( + minTimeVal == Long.MAX_VALUE ? 0 : minTimeVal, + maxTime.longValue(), + averageTime(), + (int)execs.longValue(), + (int)fails.longValue()); } - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - minTime.set(in.readLong()); - maxTime.set(in.readLong()); - avgTime = in.readDouble(); - execs.add(in.readInt()); - fails.add(in.readInt()); + /** Resets query metrics. */ + public void reset() { + minTime.value(Long.MAX_VALUE); + maxTime.reset(); + sumTime.reset(); + execs.reset(); + completed.reset(); + fails.reset(); } /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheQueryMetricsAdapter.class, this); } + + /** Query metrics snapshot. */ + public static class QueryMetricsSnapshot implements QueryMetrics, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Minimal query execution time. */ + private long minTime; + + /** Maximum query execution time. */ + private long maxTime; + + /** Average query execution time. */ + private double avgTime; + + /** Count of executed queries. */ + private int execs; + + /** Count of failed queries. */ + private int fails; + + /** Required by {@link Externalizable}. */ + public QueryMetricsSnapshot() { + } + + /** + * @param minTime Minimal query execution time. + * @param maxTime Maximum query execution time. + * @param avgTime Average query execution time. + * @param execs Count of executed queries. + * @param fails Count of failed queries. + */ + public QueryMetricsSnapshot(long minTime, long maxTime, double avgTime, int execs, int fails) { + this.minTime = minTime; + this.maxTime = maxTime; + this.avgTime = avgTime; + this.execs = execs; + this.fails = fails; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(minTime); + out.writeLong(maxTime); + out.writeDouble(avgTime); + out.writeInt(execs); + out.writeInt(fails); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + minTime = in.readLong(); + maxTime = in.readLong(); + avgTime = in.readDouble(); + execs = in.readInt(); + fails = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public long minimumTime() { + return minTime; + } + + /** {@inheritDoc} */ + @Override public long maximumTime() { + return maxTime; + } + + /** {@inheritDoc} */ + @Override public double averageTime() { + return avgTime; + } + + /** {@inheritDoc} */ + @Override public int executions() { + return execs; + } + + /** {@inheritDoc} */ + @Override public int fails() { + return fails; + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java index d7fe5d4..42c5d2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java @@ -95,6 +95,32 @@ public class MetricUtils { } /** + * Update metrics value only if current value if less then {@code update}. + * + * @param m Metric to update. + * @param update New value. + */ + public static void setIfLess(LongMetricImpl m, long update) { + long v = m.value(); + + while (v > update && !LongMetricImpl.updater.compareAndSet(m, v, update)) + v = m.value(); + } + + /** + * Update metrics value only if current value if greater then {@code update}. + * + * @param m Metric to update. + * @param update New value. + */ + public static void setIfGreater(LongMetricImpl m, long update) { + long v = m.value(); + + while (v < update && !LongMetricImpl.updater.compareAndSet(m, v, update)) + v = m.value(); + } + + /** * Asserts all arguments are not empty. * * @param names Names. diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java index 5143aa3..352d8a8 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java @@ -22,6 +22,7 @@ import java.util.Collection; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.Query; +import org.apache.ignite.cache.query.QueryMetrics; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; @@ -29,7 +30,6 @@ import org.apache.ignite.cache.query.TextQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMetricsAdapter; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -350,21 +350,21 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra * @param first {@code true} if metrics checked for first query only. */ private void checkMetrics(IgniteCache<Integer, String> cache, int execs, int completions, int failures, boolean first) { - GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics(); + QueryMetrics m = cache.queryMetrics(); assertNotNull(m); info("Metrics: " + m); assertEquals("Executions", execs, m.executions()); - assertEquals("Completions", completions, m.completedExecutions()); + assertEquals("Completions", completions, m.executions() - m.fails()); assertEquals("Failures", failures, m.fails()); assertTrue(m.averageTime() >= 0); assertTrue(m.maximumTime() >= 0); assertTrue(m.minimumTime() >= 0); if (first) - assertTrue("On first execution minTime == maxTime", m.minimumTime() == m.maximumTime()); + assertEquals("On first execution minTime == maxTime", m.minimumTime(), m.maximumTime()); } /** @@ -438,8 +438,8 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra final int exp) throws IgniteInterruptedCheckedException { GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics(); - return m.completedExecutions() == exp; + QueryMetrics m = cache.queryMetrics(); + return m.executions() - m.fails() == exp; } }, 5000); }