This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d8b9bb  IGNITE-11925: QueryMetrics migration. (#6627)
6d8b9bb is described below

commit 6d8b9bb383db9ad4bdbb181ddea5d83f1152a201
Author: Nikolay <nizhi...@apache.org>
AuthorDate: Sun Jul 14 10:03:30 2019 +0300

    IGNITE-11925: QueryMetrics migration. (#6627)
    
    QueryMetrics migration.
---
 .../apache/ignite/cache/query/QueryMetrics.java    |   6 +-
 .../processors/cache/CacheMetricsImpl.java         |   4 +-
 .../processors/cache/query/CacheQuery.java         |  13 --
 .../cache/query/GridCacheQueryAdapter.java         |  18 --
 .../cache/query/GridCacheQueryManager.java         |  10 +-
 .../cache/query/GridCacheQueryMetricsAdapter.java  | 206 ++++++++++++++-------
 .../processors/metric/impl/MetricUtils.java        |  26 +++
 .../cache/CacheAbstractQueryMetricsSelfTest.java   |  12 +-
 8 files changed, 176 insertions(+), 119 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java 
b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
index d0f0a50..493f349 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
@@ -17,12 +17,8 @@
 
 package org.apache.ignite.cache.query;
 
-import org.apache.ignite.internal.processors.cache.query.CacheQuery;
-
 /**
- * Cache query metrics used to obtain statistics on query. Metrics for 
particular query
- * can be get via {@link CacheQuery#metrics()} method or aggregated metrics 
for all queries
- * via {@link CacheQuery#metrics()}.
+ * Cache query metrics used to obtain statistics on query.
  */
 public interface QueryMetrics {
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index ab1c4a1..1353429 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -202,9 +202,7 @@ public class CacheMetricsImpl implements CacheMetrics {
 
         delegate = null;
 
-        String regName = cacheMetricsRegistryName(cctx.name(), isNear);
-
-        MetricRegistry mreg = cctx.kernalContext().metric().registry(regName);
+        MetricRegistry mreg = 
cctx.kernalContext().metric().registry(cacheMetricsRegistryName(cctx.name(), 
isNear));
 
         reads = mreg.metric("CacheGets",
             "The total number of gets to the cache.");
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index c8f392f..65e46cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.query;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.affinity.AffinityKey;
 import org.apache.ignite.cache.query.Query;
-import org.apache.ignite.cache.query.QueryMetrics;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.cache.query.annotations.QueryTextField;
@@ -259,18 +258,6 @@ public interface CacheQuery<T> {
     public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, 
@Nullable Object... args);
 
     /**
-     * Gets metrics for this query.
-     *
-     * @return Query metrics.
-     */
-    public QueryMetrics metrics();
-
-    /**
-     * Resets metrics for this query.
-     */
-    public void resetMetrics();
-
-    /**
      * @return Scan query iterator.
      */
     public GridCloseableIterator executeScanQuery() throws 
IgniteCheckedException;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index e304e05..24f7959 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -35,7 +35,6 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cache.query.Query;
-import org.apache.ignite.cache.query.QueryMetrics;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -109,9 +108,6 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
     private final boolean incMeta;
 
     /** */
-    private volatile GridCacheQueryMetricsAdapter metrics;
-
-    /** */
     private volatile int pageSize = Query.DFLT_PAGE_SIZE;
 
     /** */
@@ -178,8 +174,6 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
 
         log = cctx.logger(getClass());
 
-        metrics = new GridCacheQueryMetricsAdapter();
-
         this.incMeta = false;
         this.clsName = null;
         this.clause = null;
@@ -222,8 +216,6 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
         this.dataPageScanEnabled = dataPageScanEnabled;
 
         log = cctx.logger(getClass());
-
-        metrics = new GridCacheQueryMetricsAdapter();
     }
 
     /**
@@ -485,16 +477,6 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
         return execute0(rmtReducer, args);
     }
 
-    /** {@inheritDoc} */
-    @Override public QueryMetrics metrics() {
-        return metrics.copy();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void resetMetrics() {
-        metrics = new GridCacheQueryMetricsAdapter();
-    }
-
     /**
      * @param rmtReducer Optional reducer.
      * @param args Arguments.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index ae5f7df..a6cbe6e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -76,8 +76,8 @@ import 
org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
 import 
org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate;
@@ -185,7 +185,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
     private int maxIterCnt;
 
     /** */
-    private volatile GridCacheQueryMetricsAdapter metrics = new 
GridCacheQueryMetricsAdapter();
+    private volatile GridCacheQueryMetricsAdapter metrics;
 
     /** */
     private int detailMetricsSz;
@@ -273,6 +273,8 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
             }
         };
 
+        metrics = new 
GridCacheQueryMetricsAdapter(cctx.kernalContext().metric(), cctx.name(), 
cctx.isNear());
+
         cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
         qryTopVer = cctx.startTopologyVersion();
@@ -1678,7 +1680,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @return Cache queries metrics.
      */
     public QueryMetrics metrics() {
-        return metrics.copy();
+        return metrics.snapshot();
     }
 
     /**
@@ -1731,7 +1733,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * Resets metrics.
      */
     public void resetMetrics() {
-        metrics = new GridCacheQueryMetricsAdapter();
+        metrics.reset();
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
index ca687bc..8413c83 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
@@ -21,40 +21,53 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.concurrent.atomic.LongAdder;
 import org.apache.ignite.cache.query.QueryMetrics;
-import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetricImpl;
+import org.apache.ignite.internal.processors.metric.impl.LongMetricImpl;
+import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Adapter for {@link QueryMetrics}.
  */
-public class GridCacheQueryMetricsAdapter implements QueryMetrics, 
Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
+public class GridCacheQueryMetricsAdapter implements QueryMetrics {
     /** Minimum time of execution. */
-    private final GridAtomicLong minTime = new GridAtomicLong(Long.MAX_VALUE);
+    private final LongMetricImpl minTime;
 
     /** Maximum time of execution. */
-    private final GridAtomicLong maxTime = new GridAtomicLong();
+    private final LongMetricImpl maxTime;
 
     /** Sum of execution time for all completed queries. */
-    private final LongAdder sumTime = new LongAdder();
-
-    /** Average time of execution.
-     * If doesn't equal zero then this metrics set is copy from remote node 
and doesn't actually update.
-     */
-    private double avgTime;
+    private final LongAdderMetricImpl sumTime;
 
     /** Number of executions. */
-    private final LongAdder execs = new LongAdder();
+    private final LongAdderMetricImpl execs;
 
     /** Number of completed executions. */
-    private final LongAdder completed = new LongAdder();
+    private final LongAdderMetricImpl completed;
 
     /** Number of fails. */
-    private final LongAdder fails = new LongAdder();
+    private final LongAdderMetricImpl fails;
+
+    /**
+     * @param mmgr Metrics manager.
+     * @param cacheName Cache name.
+     * @param isNear Is near flag.
+     */
+    public GridCacheQueryMetricsAdapter(GridMetricManager mmgr, String 
cacheName, boolean isNear) {
+        MetricRegistry mreg = 
mmgr.registry(MetricUtils.cacheMetricsRegistryName(cacheName, isNear));
+
+        minTime = mreg.metric("QueryMinimalTime", null);
+        minTime.value(Long.MAX_VALUE);
+
+        maxTime = mreg.metric("QueryMaximumTime", null);
+        sumTime = mreg.longAdderMetric("QuerySumTime", null);
+        execs = mreg.longAdderMetric("QueryExecuted", null);
+        completed = mreg.longAdderMetric("QueryCompleted", null);
+        fails = mreg.longAdderMetric("QueryFailed", null);
+    }
 
     /** {@inheritDoc} */
     @Override public long minimumTime() {
@@ -70,33 +83,19 @@ public class GridCacheQueryMetricsAdapter implements 
QueryMetrics, Externalizabl
 
     /** {@inheritDoc} */
     @Override public double averageTime() {
-        if (avgTime > 0)
-            return avgTime;
-        else {
-            double val = completed.sum();
+        double val = completed.longValue();
 
-            return val > 0 ? sumTime.sum() / val : 0.0;
-        }
+        return val > 0 ? sumTime.longValue() / val : 0.0;
     }
 
     /** {@inheritDoc} */
     @Override public int executions() {
-        return execs.intValue();
-    }
-
-    /**
-     * Gets total number of completed executions of query.
-     * This value is actual only for local node.
-     *
-     * @return Number of completed executions.
-     */
-    public int completedExecutions() {
-        return completed.intValue();
+        return (int)execs.longValue();
     }
 
     /** {@inheritDoc} */
     @Override public int fails() {
-        return fails.intValue();
+        return (int)fails.longValue();
     }
 
     /**
@@ -114,53 +113,120 @@ public class GridCacheQueryMetricsAdapter implements 
QueryMetrics, Externalizabl
             execs.increment();
             completed.increment();
 
-            minTime.setIfLess(duration);
-            maxTime.setIfGreater(duration);
+            MetricUtils.setIfLess(minTime, duration);
+            MetricUtils.setIfGreater(maxTime, duration);
 
             sumTime.add(duration);
         }
     }
 
-    /**
-     * Merge with given metrics.
-     *
-     * @return Copy.
-     */
-    public GridCacheQueryMetricsAdapter copy() {
-        GridCacheQueryMetricsAdapter m = new GridCacheQueryMetricsAdapter();
-
-        // Not synchronized because accuracy isn't critical.
-        m.fails.add(fails.sum());
-        m.minTime.set(minTime.get());
-        m.maxTime.set(maxTime.get());
-        m.execs.add(execs.sum());
-        m.completed.add(completed.sum());
-        m.sumTime.add(sumTime.sum());
-        m.avgTime = avgTime;
-
-        return m;
-    }
+    /** @return Current metrics values. */
+    public QueryMetrics snapshot() {
+        long minTimeVal = minTime.longValue();
 
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeLong(minTime.get());
-        out.writeLong(maxTime.get());
-        out.writeDouble(averageTime());
-        out.writeInt(execs.intValue());
-        out.writeInt(fails.intValue());
+        return new QueryMetricsSnapshot(
+            minTimeVal == Long.MAX_VALUE ? 0 : minTimeVal,
+            maxTime.longValue(),
+            averageTime(),
+            (int)execs.longValue(),
+            (int)fails.longValue());
     }
 
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        minTime.set(in.readLong());
-        maxTime.set(in.readLong());
-        avgTime = in.readDouble();
-        execs.add(in.readInt());
-        fails.add(in.readInt());
+    /** Resets query metrics. */
+    public void reset() {
+        minTime.value(Long.MAX_VALUE);
+        maxTime.reset();
+        sumTime.reset();
+        execs.reset();
+        completed.reset();
+        fails.reset();
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheQueryMetricsAdapter.class, this);
     }
+
+    /** Query metrics snapshot. */
+    public static class QueryMetricsSnapshot implements QueryMetrics, 
Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Minimal query execution time. */
+        private long minTime;
+
+        /** Maximum query execution time. */
+        private long maxTime;
+
+        /** Average query execution time. */
+        private double avgTime;
+
+        /** Count of executed queries. */
+        private int execs;
+
+        /** Count of failed queries. */
+        private int fails;
+
+        /** Required by {@link Externalizable}. */
+        public QueryMetricsSnapshot() {
+        }
+
+        /**
+         * @param minTime Minimal query execution time.
+         * @param maxTime Maximum query execution time.
+         * @param avgTime Average query execution time.
+         * @param execs  Count of executed queries.
+         * @param fails Count of failed queries.
+         */
+        public QueryMetricsSnapshot(long minTime, long maxTime, double 
avgTime, int execs, int fails) {
+            this.minTime = minTime;
+            this.maxTime = maxTime;
+            this.avgTime = avgTime;
+            this.execs = execs;
+            this.fails = fails;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            out.writeLong(minTime);
+            out.writeLong(maxTime);
+            out.writeDouble(avgTime);
+            out.writeInt(execs);
+            out.writeInt(fails);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            minTime = in.readLong();
+            maxTime = in.readLong();
+            avgTime = in.readDouble();
+            execs = in.readInt();
+            fails = in.readInt();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long minimumTime() {
+            return minTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long maximumTime() {
+            return maxTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public double averageTime() {
+            return avgTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int executions() {
+            return execs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int fails() {
+            return fails;
+        }
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java
index d7fe5d4..42c5d2d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java
@@ -95,6 +95,32 @@ public class MetricUtils {
     }
 
     /**
+     * Update metrics value only if current value if less then {@code update}.
+     *
+     * @param m Metric to update.
+     * @param update New value.
+     */
+    public static void setIfLess(LongMetricImpl m, long update) {
+        long v = m.value();
+
+        while (v > update && !LongMetricImpl.updater.compareAndSet(m, v, 
update))
+            v = m.value();
+    }
+
+    /**
+     * Update metrics value only if current value if greater then {@code 
update}.
+     *
+     * @param m Metric to update.
+     * @param update New value.
+     */
+    public static void setIfGreater(LongMetricImpl m, long update) {
+        long v = m.value();
+
+        while (v < update && !LongMetricImpl.updater.compareAndSet(m, v, 
update))
+            v = m.value();
+    }
+
+    /**
      * Asserts all arguments are not empty.
      *
      * @param names Names.
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
index 5143aa3..352d8a8 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.Query;
+import org.apache.ignite.cache.query.QueryMetrics;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
@@ -29,7 +30,6 @@ import org.apache.ignite.cache.query.TextQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryMetricsAdapter;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -350,21 +350,21 @@ public abstract class CacheAbstractQueryMetricsSelfTest 
extends GridCommonAbstra
      * @param first {@code true} if metrics checked for first query only.
      */
     private void checkMetrics(IgniteCache<Integer, String> cache, int execs, 
int completions, int failures, boolean first) {
-        GridCacheQueryMetricsAdapter m = 
(GridCacheQueryMetricsAdapter)cache.queryMetrics();
+        QueryMetrics m = cache.queryMetrics();
 
         assertNotNull(m);
 
         info("Metrics: " + m);
 
         assertEquals("Executions", execs, m.executions());
-        assertEquals("Completions", completions, m.completedExecutions());
+        assertEquals("Completions", completions, m.executions() - m.fails());
         assertEquals("Failures", failures, m.fails());
         assertTrue(m.averageTime() >= 0);
         assertTrue(m.maximumTime() >= 0);
         assertTrue(m.minimumTime() >= 0);
 
         if (first)
-            assertTrue("On first execution minTime == maxTime", 
m.minimumTime() == m.maximumTime());
+            assertEquals("On first execution minTime == maxTime", 
m.minimumTime(), m.maximumTime());
     }
 
     /**
@@ -438,8 +438,8 @@ public abstract class CacheAbstractQueryMetricsSelfTest 
extends GridCommonAbstra
         final int exp) throws IgniteInterruptedCheckedException {
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
-                GridCacheQueryMetricsAdapter m = 
(GridCacheQueryMetricsAdapter)cache.queryMetrics();
-                return m.completedExecutions() == exp;
+                QueryMetrics m = cache.queryMetrics();
+                return m.executions() - m.fails() == exp;
             }
         }, 5000);
     }

Reply via email to