This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c4e444b  IGNITE-13327 Add a metric for processed keys when rebuilding 
indexes. - Fixes #8126
c4e444b is described below

commit c4e444b69fce7216ea85326d99abd436076363a3
Author: Kirill Tkalenko <tkalkir...@yandex.ru>
AuthorDate: Tue Aug 18 19:36:33 2020 +0300

    IGNITE-13327 Add a metric for processed keys when rebuilding indexes. - 
Fixes #8126
    
    Signed-off-by: Ivan Rakov <ivan.glu...@gmail.com>
---
 .../java/org/apache/ignite/cache/CacheMetrics.java |  15 ++
 .../cache/CacheClusterMetricsMXBeanImpl.java       |  12 +-
 .../cache/CacheLocalMetricsMXBeanImpl.java         |  12 +-
 .../processors/cache/CacheMetricsImpl.java         |  45 ++++-
 .../processors/cache/CacheMetricsSnapshot.java     |  19 ++
 .../processors/cache/CacheMetricsSnapshotV2.java   |  19 ++
 .../schema/SchemaIndexCachePartitionWorker.java    |   2 +
 .../query/schema/SchemaIndexCacheVisitorImpl.java  |   1 +
 .../apache/ignite/mxbean/CacheMetricsMXBean.java   |   9 +
 .../platform/PlatformCacheWriteMetricsTask.java    |  10 ++
 .../processors/cache/index/IndexMetricsTest.java   | 196 ++++++++++++++++++---
 11 files changed, 308 insertions(+), 32 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java 
b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
index 09bf550..0372ab7 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
@@ -720,4 +720,19 @@ public interface CacheMetrics {
      * @return Key collisions and appropriate queue size string representation.
      */
     @NotNull public String getTxKeyCollisions();
+
+    /**
+     * Return {@code true} if index rebuild is in progress.
+     *
+     * @return {@code true} if index rebuild is in progress.
+     */
+    public boolean isIndexRebuildInProgress();
+
+    /**
+     * Return number of keys processed during index rebuilding.
+     * To get remaining number of keys for rebuilding, subtract current value 
from {@link #getCacheSize}.
+     *
+     * @return Number of keys processed during index rebuilding.
+     */
+    public long getIndexRebuildKeysProcessed();
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
index 9403e5c..32031ec3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
@@ -24,7 +24,7 @@ import org.apache.ignite.mxbean.CacheMetricsMXBean;
 /**
  * Management bean that provides access to {@link IgniteCache IgniteCache}.
  */
-class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean {
+public class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean {
     /** Cache. */
     private GridCacheAdapter<?, ?> cache;
 
@@ -518,4 +518,14 @@ class CacheClusterMetricsMXBeanImpl implements 
CacheMetricsMXBean {
             throw new RuntimeException(e.getMessage());
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean isIndexRebuildInProgress() {
+        return cache.clusterMetrics().isIndexRebuildInProgress();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getIndexRebuildKeysProcessed() {
+        return cache.clusterMetrics().getIndexRebuildKeysProcessed();
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
index 89c7c04..ea2f81b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
@@ -29,7 +29,7 @@ import org.apache.ignite.mxbean.CacheMetricsMXBean;
  * @deprecated Use {@link GridMetricManager} instead.
  */
 @Deprecated
-class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean {
+public class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean {
     /** Cache. */
     private GridCacheAdapter<?, ?> cache;
 
@@ -516,4 +516,14 @@ class CacheLocalMetricsMXBeanImpl implements 
CacheMetricsMXBean {
     @Override public String getTxKeyCollisions() {
         return cache.metrics0().getTxKeyCollisions();
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean isIndexRebuildInProgress() {
+        return cache.metrics0().isIndexRebuildInProgress();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getIndexRebuildKeysProcessed() {
+        return cache.metrics0().getIndexRebuildKeysProcessed();
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index c7ba159..bd82491 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
 import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
 import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
 import org.apache.ignite.internal.processors.metric.impl.LongGauge;
 import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
 import org.apache.ignite.internal.util.collection.ImmutableIntSet;
@@ -228,6 +229,9 @@ public class CacheMetricsImpl implements CacheMetrics {
     /** Cache size. */
     private final LongGauge cacheSize;
 
+    /** Number of keys processed during index rebuilding. */
+    private final LongAdderMetric idxRebuildKeyProcessed;
+
     /**
      * Creates cache metrics.
      *
@@ -365,11 +369,8 @@ public class CacheMetricsImpl implements CacheMetrics {
         rebalanceClearingPartitions = 
mreg.longMetric("RebalanceClearingPartitionsLeft",
             "Number of partitions need to be cleared before actual rebalance 
start.");
 
-        mreg.register("IsIndexRebuildInProgress", () -> {
-            IgniteInternalFuture fut = 
cctx.shared().kernalContext().query().indexRebuildFuture(cctx.cacheId());
-
-            return fut != null && !fut.isDone();
-        }, "True if index rebuild is in progress.");
+        mreg.register("IsIndexRebuildInProgress", 
this::isIndexRebuildInProgress,
+            "True if index rebuild is in progress.");
 
         getTime = mreg.histogram("GetTime", HISTOGRAM_BUCKETS, "Get time in 
nanoseconds.");
 
@@ -399,6 +400,9 @@ public class CacheMetricsImpl implements CacheMetrics {
 
         cacheSize = mreg.register("CacheSize",
             () -> getEntriesStat().cacheSize(), "Local cache size.");
+
+        idxRebuildKeyProcessed = 
mreg.longAdderMetric("IndexRebuildKeyProcessed",
+            "Number of keys processed during index rebuilding.");
     }
 
     /**
@@ -714,6 +718,8 @@ public class CacheMetricsImpl implements CacheMetrics {
             delegate.clear();
 
         txKeyCollisionInfo = null;
+
+        idxRebuildKeyProcessed.reset();
     }
 
     /** {@inheritDoc} */
@@ -898,7 +904,8 @@ public class CacheMetricsImpl implements CacheMetrics {
             delegate.onRead(isHit);
     }
 
-    /** Set callback for tx key collisions detection.
+    /**
+     * Set callback for tx key collisions detection.
      *
      * @param coll Key collisions info holder.
      */
@@ -1544,6 +1551,32 @@ public class CacheMetricsImpl implements CacheMetrics {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean isIndexRebuildInProgress() {
+        IgniteInternalFuture fut = 
cctx.shared().kernalContext().query().indexRebuildFuture(cctx.cacheId());
+
+        return fut != null && !fut.isDone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getIndexRebuildKeysProcessed() {
+        return idxRebuildKeyProcessed.value();
+    }
+
+    /** Reset metric - number of keys processed during index rebuilding. */
+    public void resetIndexRebuildKeyProcessed() {
+        idxRebuildKeyProcessed.reset();
+    }
+
+    /**
+     * Increase number of keys processed during index rebuilding.
+     *
+     * @param val Number of processed keys.
+     */
+    public void addIndexRebuildKeyProcessed(long val) {
+        idxRebuildKeyProcessed.add(val);
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(CacheMetricsImpl.class, this);
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index e4809db..7c7828a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -302,6 +302,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
     /** */
     private boolean isValidForWriting;
 
+    /** Index rebuilding in progress. */
+    private boolean idxRebuildInProgress;
+
+    /** Number of keys processed during index rebuilding. */
+    private long idxRebuildKeyProcessed;
+
     /**
      * Default constructor.
      */
@@ -411,6 +417,9 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
         rebalanceStartTime = m.rebalancingStartTime();
         rebalanceFinishTime = m.estimateRebalancingFinishTime();
         rebalanceClearingPartitionsLeft = 
m.getRebalanceClearingPartitionsLeft();
+
+        idxRebuildInProgress = m.isIndexRebuildInProgress();
+        idxRebuildKeyProcessed = m.getIndexRebuildKeysProcessed();
     }
 
     /**
@@ -1026,6 +1035,16 @@ public class CacheMetricsSnapshot implements 
CacheMetrics, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean isIndexRebuildInProgress() {
+        return idxRebuildInProgress;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getIndexRebuildKeysProcessed() {
+        return idxRebuildKeyProcessed;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(CacheMetricsSnapshot.class, this);
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java
index d1dd6b8..f792499 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java
@@ -323,6 +323,12 @@ public class CacheMetricsSnapshotV2 extends 
IgniteDataTransferObject implements
     /** Tx key collisions with appropriate queue size string representation. */
     private String txKeyCollisions;
 
+    /** Index rebuilding in progress. */
+    private boolean idxRebuildInProgress;
+
+    /** Number of keys processed during index rebuilding. */
+    private long idxRebuildKeyProcessed;
+
     /**
      * Default constructor.
      */
@@ -433,6 +439,9 @@ public class CacheMetricsSnapshotV2 extends 
IgniteDataTransferObject implements
         rebalanceFinishTime = m.estimateRebalancingFinishTime();
         rebalanceClearingPartitionsLeft = 
m.getRebalanceClearingPartitionsLeft();
         txKeyCollisions = m.getTxKeyCollisions();
+
+        idxRebuildInProgress = m.isIndexRebuildInProgress();
+        idxRebuildKeyProcessed = m.getIndexRebuildKeysProcessed();
     }
 
     /**
@@ -1054,6 +1063,16 @@ public class CacheMetricsSnapshotV2 extends 
IgniteDataTransferObject implements
     }
 
     /** {@inheritDoc} */
+    @Override public boolean isIndexRebuildInProgress() {
+        return idxRebuildInProgress;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getIndexRebuildKeysProcessed() {
+        return idxRebuildKeyProcessed;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(CacheMetricsSnapshotV2.class, this);
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
index 860f742..18aa8f9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
@@ -191,6 +191,8 @@ public class SchemaIndexCachePartitionWorker extends 
GridWorker {
                         locked = false;
                     }
 
+                    cctx.cache().metrics0().addIndexRebuildKeyProcessed(1);
+
                     if (locPart.state() == RENTING)
                         break;
                 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
index 25af441..4b519f2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -102,6 +102,7 @@ public class SchemaIndexCacheVisitorImpl implements 
SchemaIndexCacheVisitor {
         }
 
         
cctx.group().metrics().addIndexBuildCountPartitionsLeft(locParts.size());
+        cctx.cache().metrics0().resetIndexRebuildKeyProcessed();
 
         beforeExecute();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java 
b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
index e6fce4a..1db959f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
@@ -343,4 +343,13 @@ public interface CacheMetricsMXBean extends 
CacheStatisticsMXBean, CacheMXBean,
      */
     @MXBeanDescription("Disable statistic collection for the cache.")
     public void disableStatistics();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("True if index rebuilding in progress.")
+    @Override public boolean isIndexRebuildInProgress();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of keys processed during index rebuilding. To 
get remaining number of keys for " +
+        "rebuilding, subtract current value from cache size.")
+    @Override public long getIndexRebuildKeysProcessed();
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
 
b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
index b593bd8..6d9a557 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
@@ -542,5 +542,15 @@ public class PlatformCacheWriteMetricsTask extends 
ComputeTaskAdapter<Long, Obje
         @Override public long getEntryProcessorRemovals() {
             return 78;
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean isIndexRebuildInProgress() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getIndexRebuildKeysProcessed() {
+            return 0;
+        }
     }
 }
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java
index ff6f0ee..62da827 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.cache.index;
 import java.nio.file.Path;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.BooleanSupplier;
+import java.util.function.LongSupplier;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -28,12 +31,19 @@ import 
org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import 
org.apache.ignite.internal.processors.cache.CacheClusterMetricsMXBeanImpl;
+import org.apache.ignite.internal.processors.cache.CacheLocalMetricsMXBeanImpl;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.mxbean.CacheMetricsMXBean;
 import org.apache.ignite.spi.metric.BooleanMetric;
+import org.apache.ignite.spi.metric.Metric;
 import org.junit.Test;
 
+import static java.util.Objects.requireNonNull;
 import static 
org.apache.ignite.internal.processors.cache.index.AbstractSchemaSelfTest.KeyClass;
 import static 
org.apache.ignite.internal.processors.cache.index.AbstractSchemaSelfTest.ValueClass;
 import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName;
@@ -93,21 +103,31 @@ public class IndexMetricsTest extends 
AbstractIndexingCommonTest {
         return ccfg;
     }
 
-    /** @throws Exception If failed. */
+    /**
+     *
+     *
+     * @throws Exception If failed.
+     */
     @Test
     public void testIndexRebuildingMetric() throws Exception {
-        IgniteEx ignite = startGrid(0);
+        IgniteEx n = startGrid(0);
 
-        ignite.cluster().active(true);
+        n.cluster().active(true);
 
         String cacheName1 = "cache1";
         String cacheName2 = "cache2";
 
-        IgniteCache<KeyClass, ValueClass> cache1 = 
ignite.getOrCreateCache(cacheConfiguration(cacheName1));
-        IgniteCache<KeyClass, ValueClass> cache2 = 
ignite.getOrCreateCache(cacheConfiguration(cacheName2));
+        IgniteCache<KeyClass, ValueClass> cache1 = 
n.getOrCreateCache(cacheConfiguration(cacheName1));
+        IgniteCache<KeyClass, ValueClass> cache2 = 
n.getOrCreateCache(cacheConfiguration(cacheName2));
+
+        int entryCnt1 = 100;
+        int entryCnt2 = 200;
 
-        cache1.put(new KeyClass(1), new ValueClass(1L));
-        cache2.put(new KeyClass(1), new ValueClass(1L));
+        for (int i = 0; i < entryCnt1; i++)
+            cache1.put(new KeyClass(i), new ValueClass((long)i));
+
+        for (int i = 0; i < entryCnt2; i++)
+            cache2.put(new KeyClass(i), new ValueClass((long)i));
 
         List<Path> idxPaths = getIndexBinPaths(cacheName1);
 
@@ -119,35 +139,163 @@ public class IndexMetricsTest extends 
AbstractIndexingCommonTest {
 
         GridQueryProcessor.idxCls = BlockingIndexing.class;
 
-        ignite = startGrid(0);
+        n = startGrid(0);
+
+        BooleanMetric idxRebuildInProgress1 = indexRebuildMetric(n, 
cacheName1, "IsIndexRebuildInProgress");
+        BooleanMetric idxRebuildInProgress2 = indexRebuildMetric(n, 
cacheName2, "IsIndexRebuildInProgress");
+
+        LongAdderMetric idxRebuildKeyProcessed1 = indexRebuildMetric(n, 
cacheName1, "IndexRebuildKeyProcessed");
+        LongAdderMetric idxRebuildKeyProcessed2 = indexRebuildMetric(n, 
cacheName2, "IndexRebuildKeyProcessed");
+
+        CacheMetrics cacheMetrics1 = cacheMetrics(n, cacheName1);
+        CacheMetrics cacheMetrics2 = cacheMetrics(n, cacheName2);
+
+        CacheMetricsMXBean cacheMetricsMXBean1 = cacheMetricsMXBean(n, 
cacheName1, CacheLocalMetricsMXBeanImpl.class);
+        CacheMetricsMXBean cacheMetricsMXBean2 = cacheMetricsMXBean(n, 
cacheName2, CacheLocalMetricsMXBeanImpl.class);
+
+        CacheMetricsMXBean cacheClusterMetricsMXBean1 =
+            cacheMetricsMXBean(n, cacheName1, 
CacheClusterMetricsMXBeanImpl.class);
+        CacheMetricsMXBean cacheClusterMetricsMXBean2 =
+            cacheMetricsMXBean(n, cacheName2, 
CacheClusterMetricsMXBeanImpl.class);
+
+        n.cluster().active(true);
 
-        BooleanMetric indexRebuildCache1 = 
isIndexRebuildInProgressMetric(ignite, cacheName1);
-        BooleanMetric indexRebuildCache2 = 
isIndexRebuildInProgressMetric(ignite, cacheName2);
+        BooleanSupplier[] idxRebuildProgressCache1 = {
+            idxRebuildInProgress1::value,
+            cacheMetrics1::isIndexRebuildInProgress,
+            cacheMetricsMXBean1::isIndexRebuildInProgress
+        };
 
-        ignite.cluster().active(true);
+        BooleanSupplier[] idxRebuildProgressCache2 = {
+            idxRebuildInProgress2::value,
+            cacheMetrics2::isIndexRebuildInProgress,
+            cacheMetricsMXBean2::isIndexRebuildInProgress
+        };
 
-        assertTrue(indexRebuildCache1.value());
-        assertTrue(indexRebuildCache2.value());
+        // It must always be false, because metric is only per node.
+        BooleanSupplier[] idxRebuildProgressCluster = {
+            cacheClusterMetricsMXBean1::isIndexRebuildInProgress,
+            cacheClusterMetricsMXBean2::isIndexRebuildInProgress
+        };
 
-        
((BlockingIndexing)ignite.context().query().getIndexing()).stopBlock(cacheName1);
+        LongSupplier[] idxRebuildKeyProcessedCache1 = {
+            idxRebuildKeyProcessed1::value,
+            cacheMetrics1::getIndexRebuildKeysProcessed,
+            cacheMetricsMXBean1::getIndexRebuildKeysProcessed,
+        };
 
-        ignite.cache(cacheName1).indexReadyFuture().get(30_000);
+        LongSupplier[] idxRebuildKeyProcessedCache2 = {
+            idxRebuildKeyProcessed2::value,
+            cacheMetrics2::getIndexRebuildKeysProcessed,
+            cacheMetricsMXBean2::getIndexRebuildKeysProcessed,
+        };
 
-        assertFalse(indexRebuildCache1.value());
-        assertTrue(indexRebuildCache2.value());
+        // It must always be 0, because metric is only per node.
+        LongSupplier[] idxRebuildKeyProcessedCluster = {
+            cacheClusterMetricsMXBean1::getIndexRebuildKeysProcessed,
+            cacheClusterMetricsMXBean2::getIndexRebuildKeysProcessed
+        };
 
-        
((BlockingIndexing)ignite.context().query().getIndexing()).stopBlock(cacheName2);
+        assertEquals(true, idxRebuildProgressCache1);
+        assertEquals(true, idxRebuildProgressCache2);
+        assertEquals(false, idxRebuildProgressCluster);
 
-        ignite.cache(cacheName2).indexReadyFuture().get(30_000);
+        assertEquals(0, idxRebuildKeyProcessedCache1);
+        assertEquals(0, idxRebuildKeyProcessedCache2);
+        assertEquals(0, idxRebuildKeyProcessedCluster);
 
-        assertFalse(indexRebuildCache1.value());
-        assertFalse(indexRebuildCache2.value());
+        
((BlockingIndexing)n.context().query().getIndexing()).stopBlock(cacheName1);
+
+        n.cache(cacheName1).indexReadyFuture().get(30_000);
+
+        assertEquals(false, idxRebuildProgressCache1);
+        assertEquals(true, idxRebuildProgressCache2);
+        assertEquals(false, idxRebuildProgressCluster);
+
+        assertEquals(entryCnt1, idxRebuildKeyProcessedCache1);
+        assertEquals(0, idxRebuildKeyProcessedCache2);
+        assertEquals(0, idxRebuildKeyProcessedCluster);
+
+        
((BlockingIndexing)n.context().query().getIndexing()).stopBlock(cacheName2);
+
+        n.cache(cacheName2).indexReadyFuture().get(30_000);
+
+        assertEquals(false, idxRebuildProgressCache1);
+        assertEquals(false, idxRebuildProgressCache2);
+        assertEquals(false, idxRebuildProgressCluster);
+
+        assertEquals(entryCnt1, idxRebuildKeyProcessedCache1);
+        assertEquals(entryCnt2, idxRebuildKeyProcessedCache2);
+        assertEquals(0, idxRebuildKeyProcessedCluster);
     }
 
-    /** @return Gets {@code IsIndexRebuildInProgress} metric for given cache. 
*/
-    private BooleanMetric isIndexRebuildInProgressMetric(IgniteEx ignite, 
String cacheName) {
+    /**
+     * Get index rebuild metric.
+     *
+     * @param ignite Node.
+     * @param cacheName Cache name.
+     * @param name Name of the metric.
+     * @return Gets {@code IsIndexRebuildInProgress} metric for given cache.
+     */
+    private <M extends Metric> M indexRebuildMetric(IgniteEx ignite, String 
cacheName, String name) {
         MetricRegistry mreg = 
ignite.context().metric().registry(cacheMetricsRegistryName(cacheName, false));
 
-        return mreg.findMetric("IsIndexRebuildInProgress");
+        return mreg.findMetric(name);
+    }
+
+    /**
+     * Get cache metrics.
+     *
+     * @param node Node.
+     * @param cacheName Cache name.
+     * @return Cache metrics.
+     */
+    private CacheMetrics cacheMetrics(IgniteEx node, String cacheName) {
+        requireNonNull(node);
+        requireNonNull(cacheName);
+
+        return 
node.context().cache().cacheGroup(CU.cacheId(cacheName)).singleCacheContext().cache().metrics0();
+    }
+
+    /**
+     * Get cache metrics MXBean.
+     *
+     * @param n Node.
+     * @param cacheName Cache name.
+     * @param cls Cache metrics MXBean implementation.
+     * @return Cache metrics MXBean.
+     */
+    private <T extends CacheMetricsMXBean> T cacheMetricsMXBean(IgniteEx n, 
String cacheName, Class<? super T> cls) {
+        requireNonNull(n);
+        requireNonNull(cacheName);
+        requireNonNull(cls);
+
+        return (T)getMxBean(n.name(), cacheName, cls.getName(), 
CacheMetricsMXBean.class);
+    }
+
+    /**
+     * Assertion that expected value is equal with all actual values.
+     *
+     * @param exp Expected value.
+     * @param actuals Suppliers of actual values.
+     */
+    private void assertEquals(boolean exp, BooleanSupplier... actuals) {
+        requireNonNull(actuals);
+
+        for (int i = 0; i < actuals.length; i++)
+            assertEquals("i=" + i, exp, actuals[i].getAsBoolean());
+    }
+
+    /**
+     * Assertion that expected value is equal with all actual values.
+     *
+     * @param exp Expected value.
+     * @param actuals Suppliers of actual values.
+     */
+    private void assertEquals(long exp, LongSupplier... actuals) {
+        requireNonNull(actuals);
+
+        for (int i = 0; i < actuals.length; i++)
+            assertEquals("i=" + i, exp, actuals[i].getAsLong());
     }
 }

Reply via email to