This is an automated email from the ASF dual-hosted git repository. irakov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new c4e444b IGNITE-13327 Add a metric for processed keys when rebuilding indexes. - Fixes #8126 c4e444b is described below commit c4e444b69fce7216ea85326d99abd436076363a3 Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Tue Aug 18 19:36:33 2020 +0300 IGNITE-13327 Add a metric for processed keys when rebuilding indexes. - Fixes #8126 Signed-off-by: Ivan Rakov <ivan.glu...@gmail.com> --- .../java/org/apache/ignite/cache/CacheMetrics.java | 15 ++ .../cache/CacheClusterMetricsMXBeanImpl.java | 12 +- .../cache/CacheLocalMetricsMXBeanImpl.java | 12 +- .../processors/cache/CacheMetricsImpl.java | 45 ++++- .../processors/cache/CacheMetricsSnapshot.java | 19 ++ .../processors/cache/CacheMetricsSnapshotV2.java | 19 ++ .../schema/SchemaIndexCachePartitionWorker.java | 2 + .../query/schema/SchemaIndexCacheVisitorImpl.java | 1 + .../apache/ignite/mxbean/CacheMetricsMXBean.java | 9 + .../platform/PlatformCacheWriteMetricsTask.java | 10 ++ .../processors/cache/index/IndexMetricsTest.java | 196 ++++++++++++++++++--- 11 files changed, 308 insertions(+), 32 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index 09bf550..0372ab7 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -720,4 +720,19 @@ public interface CacheMetrics { * @return Key collisions and appropriate queue size string representation. */ @NotNull public String getTxKeyCollisions(); + + /** + * Return {@code true} if index rebuild is in progress. + * + * @return {@code true} if index rebuild is in progress. + */ + public boolean isIndexRebuildInProgress(); + + /** + * Return number of keys processed during index rebuilding. + * To get remaining number of keys for rebuilding, subtract current value from {@link #getCacheSize}. + * + * @return Number of keys processed during index rebuilding. + */ + public long getIndexRebuildKeysProcessed(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java index 9403e5c..32031ec3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java @@ -24,7 +24,7 @@ import org.apache.ignite.mxbean.CacheMetricsMXBean; /** * Management bean that provides access to {@link IgniteCache IgniteCache}. */ -class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean { +public class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean { /** Cache. */ private GridCacheAdapter<?, ?> cache; @@ -518,4 +518,14 @@ class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean { throw new RuntimeException(e.getMessage()); } } + + /** {@inheritDoc} */ + @Override public boolean isIndexRebuildInProgress() { + return cache.clusterMetrics().isIndexRebuildInProgress(); + } + + /** {@inheritDoc} */ + @Override public long getIndexRebuildKeysProcessed() { + return cache.clusterMetrics().getIndexRebuildKeysProcessed(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java index 89c7c04..ea2f81b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java @@ -29,7 +29,7 @@ import org.apache.ignite.mxbean.CacheMetricsMXBean; * @deprecated Use {@link GridMetricManager} instead. */ @Deprecated -class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean { +public class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean { /** Cache. */ private GridCacheAdapter<?, ?> cache; @@ -516,4 +516,14 @@ class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean { @Override public String getTxKeyCollisions() { return cache.metrics0().getTxKeyCollisions(); } + + /** {@inheritDoc} */ + @Override public boolean isIndexRebuildInProgress() { + return cache.metrics0().isIndexRebuildInProgress(); + } + + /** {@inheritDoc} */ + @Override public long getIndexRebuildKeysProcessed() { + return cache.metrics0().getIndexRebuildKeysProcessed(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index c7ba159..bd82491 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl; import org.apache.ignite.internal.processors.metric.impl.HitRateMetric; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.internal.processors.metric.impl.LongGauge; import org.apache.ignite.internal.processors.metric.impl.MetricUtils; import org.apache.ignite.internal.util.collection.ImmutableIntSet; @@ -228,6 +229,9 @@ public class CacheMetricsImpl implements CacheMetrics { /** Cache size. */ private final LongGauge cacheSize; + /** Number of keys processed during index rebuilding. */ + private final LongAdderMetric idxRebuildKeyProcessed; + /** * Creates cache metrics. * @@ -365,11 +369,8 @@ public class CacheMetricsImpl implements CacheMetrics { rebalanceClearingPartitions = mreg.longMetric("RebalanceClearingPartitionsLeft", "Number of partitions need to be cleared before actual rebalance start."); - mreg.register("IsIndexRebuildInProgress", () -> { - IgniteInternalFuture fut = cctx.shared().kernalContext().query().indexRebuildFuture(cctx.cacheId()); - - return fut != null && !fut.isDone(); - }, "True if index rebuild is in progress."); + mreg.register("IsIndexRebuildInProgress", this::isIndexRebuildInProgress, + "True if index rebuild is in progress."); getTime = mreg.histogram("GetTime", HISTOGRAM_BUCKETS, "Get time in nanoseconds."); @@ -399,6 +400,9 @@ public class CacheMetricsImpl implements CacheMetrics { cacheSize = mreg.register("CacheSize", () -> getEntriesStat().cacheSize(), "Local cache size."); + + idxRebuildKeyProcessed = mreg.longAdderMetric("IndexRebuildKeyProcessed", + "Number of keys processed during index rebuilding."); } /** @@ -714,6 +718,8 @@ public class CacheMetricsImpl implements CacheMetrics { delegate.clear(); txKeyCollisionInfo = null; + + idxRebuildKeyProcessed.reset(); } /** {@inheritDoc} */ @@ -898,7 +904,8 @@ public class CacheMetricsImpl implements CacheMetrics { delegate.onRead(isHit); } - /** Set callback for tx key collisions detection. + /** + * Set callback for tx key collisions detection. * * @param coll Key collisions info holder. */ @@ -1544,6 +1551,32 @@ public class CacheMetricsImpl implements CacheMetrics { } /** {@inheritDoc} */ + @Override public boolean isIndexRebuildInProgress() { + IgniteInternalFuture fut = cctx.shared().kernalContext().query().indexRebuildFuture(cctx.cacheId()); + + return fut != null && !fut.isDone(); + } + + /** {@inheritDoc} */ + @Override public long getIndexRebuildKeysProcessed() { + return idxRebuildKeyProcessed.value(); + } + + /** Reset metric - number of keys processed during index rebuilding. */ + public void resetIndexRebuildKeyProcessed() { + idxRebuildKeyProcessed.reset(); + } + + /** + * Increase number of keys processed during index rebuilding. + * + * @param val Number of processed keys. + */ + public void addIndexRebuildKeyProcessed(long val) { + idxRebuildKeyProcessed.add(val); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheMetricsImpl.class, this); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index e4809db..7c7828a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -302,6 +302,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** */ private boolean isValidForWriting; + /** Index rebuilding in progress. */ + private boolean idxRebuildInProgress; + + /** Number of keys processed during index rebuilding. */ + private long idxRebuildKeyProcessed; + /** * Default constructor. */ @@ -411,6 +417,9 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { rebalanceStartTime = m.rebalancingStartTime(); rebalanceFinishTime = m.estimateRebalancingFinishTime(); rebalanceClearingPartitionsLeft = m.getRebalanceClearingPartitionsLeft(); + + idxRebuildInProgress = m.isIndexRebuildInProgress(); + idxRebuildKeyProcessed = m.getIndexRebuildKeysProcessed(); } /** @@ -1026,6 +1035,16 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { } /** {@inheritDoc} */ + @Override public boolean isIndexRebuildInProgress() { + return idxRebuildInProgress; + } + + /** {@inheritDoc} */ + @Override public long getIndexRebuildKeysProcessed() { + return idxRebuildKeyProcessed; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheMetricsSnapshot.class, this); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java index d1dd6b8..f792499 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java @@ -323,6 +323,12 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements /** Tx key collisions with appropriate queue size string representation. */ private String txKeyCollisions; + /** Index rebuilding in progress. */ + private boolean idxRebuildInProgress; + + /** Number of keys processed during index rebuilding. */ + private long idxRebuildKeyProcessed; + /** * Default constructor. */ @@ -433,6 +439,9 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements rebalanceFinishTime = m.estimateRebalancingFinishTime(); rebalanceClearingPartitionsLeft = m.getRebalanceClearingPartitionsLeft(); txKeyCollisions = m.getTxKeyCollisions(); + + idxRebuildInProgress = m.isIndexRebuildInProgress(); + idxRebuildKeyProcessed = m.getIndexRebuildKeysProcessed(); } /** @@ -1054,6 +1063,16 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements } /** {@inheritDoc} */ + @Override public boolean isIndexRebuildInProgress() { + return idxRebuildInProgress; + } + + /** {@inheritDoc} */ + @Override public long getIndexRebuildKeysProcessed() { + return idxRebuildKeyProcessed; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheMetricsSnapshotV2.class, this); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java index 860f742..18aa8f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java @@ -191,6 +191,8 @@ public class SchemaIndexCachePartitionWorker extends GridWorker { locked = false; } + cctx.cache().metrics0().addIndexRebuildKeyProcessed(1); + if (locPart.state() == RENTING) break; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java index 25af441..4b519f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java @@ -102,6 +102,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor { } cctx.group().metrics().addIndexBuildCountPartitionsLeft(locParts.size()); + cctx.cache().metrics0().resetIndexRebuildKeyProcessed(); beforeExecute(); diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java index e6fce4a..1db959f 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java @@ -343,4 +343,13 @@ public interface CacheMetricsMXBean extends CacheStatisticsMXBean, CacheMXBean, */ @MXBeanDescription("Disable statistic collection for the cache.") public void disableStatistics(); + + /** {@inheritDoc} */ + @MXBeanDescription("True if index rebuilding in progress.") + @Override public boolean isIndexRebuildInProgress(); + + /** {@inheritDoc} */ + @MXBeanDescription("Number of keys processed during index rebuilding. To get remaining number of keys for " + + "rebuilding, subtract current value from cache size.") + @Override public long getIndexRebuildKeysProcessed(); } diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java index b593bd8..6d9a557 100644 --- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java @@ -542,5 +542,15 @@ public class PlatformCacheWriteMetricsTask extends ComputeTaskAdapter<Long, Obje @Override public long getEntryProcessorRemovals() { return 78; } + + /** {@inheritDoc} */ + @Override public boolean isIndexRebuildInProgress() { + return false; + } + + /** {@inheritDoc} */ + @Override public long getIndexRebuildKeysProcessed() { + return 0; + } } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java index ff6f0ee..62da827 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java @@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.cache.index; import java.nio.file.Path; import java.util.Collections; import java.util.List; +import java.util.function.BooleanSupplier; +import java.util.function.LongSupplier; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.configuration.CacheConfiguration; @@ -28,12 +31,19 @@ import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.CacheClusterMetricsMXBeanImpl; +import org.apache.ignite.internal.processors.cache.CacheLocalMetricsMXBeanImpl; import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.mxbean.CacheMetricsMXBean; import org.apache.ignite.spi.metric.BooleanMetric; +import org.apache.ignite.spi.metric.Metric; import org.junit.Test; +import static java.util.Objects.requireNonNull; import static org.apache.ignite.internal.processors.cache.index.AbstractSchemaSelfTest.KeyClass; import static org.apache.ignite.internal.processors.cache.index.AbstractSchemaSelfTest.ValueClass; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName; @@ -93,21 +103,31 @@ public class IndexMetricsTest extends AbstractIndexingCommonTest { return ccfg; } - /** @throws Exception If failed. */ + /** + * + * + * @throws Exception If failed. + */ @Test public void testIndexRebuildingMetric() throws Exception { - IgniteEx ignite = startGrid(0); + IgniteEx n = startGrid(0); - ignite.cluster().active(true); + n.cluster().active(true); String cacheName1 = "cache1"; String cacheName2 = "cache2"; - IgniteCache<KeyClass, ValueClass> cache1 = ignite.getOrCreateCache(cacheConfiguration(cacheName1)); - IgniteCache<KeyClass, ValueClass> cache2 = ignite.getOrCreateCache(cacheConfiguration(cacheName2)); + IgniteCache<KeyClass, ValueClass> cache1 = n.getOrCreateCache(cacheConfiguration(cacheName1)); + IgniteCache<KeyClass, ValueClass> cache2 = n.getOrCreateCache(cacheConfiguration(cacheName2)); + + int entryCnt1 = 100; + int entryCnt2 = 200; - cache1.put(new KeyClass(1), new ValueClass(1L)); - cache2.put(new KeyClass(1), new ValueClass(1L)); + for (int i = 0; i < entryCnt1; i++) + cache1.put(new KeyClass(i), new ValueClass((long)i)); + + for (int i = 0; i < entryCnt2; i++) + cache2.put(new KeyClass(i), new ValueClass((long)i)); List<Path> idxPaths = getIndexBinPaths(cacheName1); @@ -119,35 +139,163 @@ public class IndexMetricsTest extends AbstractIndexingCommonTest { GridQueryProcessor.idxCls = BlockingIndexing.class; - ignite = startGrid(0); + n = startGrid(0); + + BooleanMetric idxRebuildInProgress1 = indexRebuildMetric(n, cacheName1, "IsIndexRebuildInProgress"); + BooleanMetric idxRebuildInProgress2 = indexRebuildMetric(n, cacheName2, "IsIndexRebuildInProgress"); + + LongAdderMetric idxRebuildKeyProcessed1 = indexRebuildMetric(n, cacheName1, "IndexRebuildKeyProcessed"); + LongAdderMetric idxRebuildKeyProcessed2 = indexRebuildMetric(n, cacheName2, "IndexRebuildKeyProcessed"); + + CacheMetrics cacheMetrics1 = cacheMetrics(n, cacheName1); + CacheMetrics cacheMetrics2 = cacheMetrics(n, cacheName2); + + CacheMetricsMXBean cacheMetricsMXBean1 = cacheMetricsMXBean(n, cacheName1, CacheLocalMetricsMXBeanImpl.class); + CacheMetricsMXBean cacheMetricsMXBean2 = cacheMetricsMXBean(n, cacheName2, CacheLocalMetricsMXBeanImpl.class); + + CacheMetricsMXBean cacheClusterMetricsMXBean1 = + cacheMetricsMXBean(n, cacheName1, CacheClusterMetricsMXBeanImpl.class); + CacheMetricsMXBean cacheClusterMetricsMXBean2 = + cacheMetricsMXBean(n, cacheName2, CacheClusterMetricsMXBeanImpl.class); + + n.cluster().active(true); - BooleanMetric indexRebuildCache1 = isIndexRebuildInProgressMetric(ignite, cacheName1); - BooleanMetric indexRebuildCache2 = isIndexRebuildInProgressMetric(ignite, cacheName2); + BooleanSupplier[] idxRebuildProgressCache1 = { + idxRebuildInProgress1::value, + cacheMetrics1::isIndexRebuildInProgress, + cacheMetricsMXBean1::isIndexRebuildInProgress + }; - ignite.cluster().active(true); + BooleanSupplier[] idxRebuildProgressCache2 = { + idxRebuildInProgress2::value, + cacheMetrics2::isIndexRebuildInProgress, + cacheMetricsMXBean2::isIndexRebuildInProgress + }; - assertTrue(indexRebuildCache1.value()); - assertTrue(indexRebuildCache2.value()); + // It must always be false, because metric is only per node. + BooleanSupplier[] idxRebuildProgressCluster = { + cacheClusterMetricsMXBean1::isIndexRebuildInProgress, + cacheClusterMetricsMXBean2::isIndexRebuildInProgress + }; - ((BlockingIndexing)ignite.context().query().getIndexing()).stopBlock(cacheName1); + LongSupplier[] idxRebuildKeyProcessedCache1 = { + idxRebuildKeyProcessed1::value, + cacheMetrics1::getIndexRebuildKeysProcessed, + cacheMetricsMXBean1::getIndexRebuildKeysProcessed, + }; - ignite.cache(cacheName1).indexReadyFuture().get(30_000); + LongSupplier[] idxRebuildKeyProcessedCache2 = { + idxRebuildKeyProcessed2::value, + cacheMetrics2::getIndexRebuildKeysProcessed, + cacheMetricsMXBean2::getIndexRebuildKeysProcessed, + }; - assertFalse(indexRebuildCache1.value()); - assertTrue(indexRebuildCache2.value()); + // It must always be 0, because metric is only per node. + LongSupplier[] idxRebuildKeyProcessedCluster = { + cacheClusterMetricsMXBean1::getIndexRebuildKeysProcessed, + cacheClusterMetricsMXBean2::getIndexRebuildKeysProcessed + }; - ((BlockingIndexing)ignite.context().query().getIndexing()).stopBlock(cacheName2); + assertEquals(true, idxRebuildProgressCache1); + assertEquals(true, idxRebuildProgressCache2); + assertEquals(false, idxRebuildProgressCluster); - ignite.cache(cacheName2).indexReadyFuture().get(30_000); + assertEquals(0, idxRebuildKeyProcessedCache1); + assertEquals(0, idxRebuildKeyProcessedCache2); + assertEquals(0, idxRebuildKeyProcessedCluster); - assertFalse(indexRebuildCache1.value()); - assertFalse(indexRebuildCache2.value()); + ((BlockingIndexing)n.context().query().getIndexing()).stopBlock(cacheName1); + + n.cache(cacheName1).indexReadyFuture().get(30_000); + + assertEquals(false, idxRebuildProgressCache1); + assertEquals(true, idxRebuildProgressCache2); + assertEquals(false, idxRebuildProgressCluster); + + assertEquals(entryCnt1, idxRebuildKeyProcessedCache1); + assertEquals(0, idxRebuildKeyProcessedCache2); + assertEquals(0, idxRebuildKeyProcessedCluster); + + ((BlockingIndexing)n.context().query().getIndexing()).stopBlock(cacheName2); + + n.cache(cacheName2).indexReadyFuture().get(30_000); + + assertEquals(false, idxRebuildProgressCache1); + assertEquals(false, idxRebuildProgressCache2); + assertEquals(false, idxRebuildProgressCluster); + + assertEquals(entryCnt1, idxRebuildKeyProcessedCache1); + assertEquals(entryCnt2, idxRebuildKeyProcessedCache2); + assertEquals(0, idxRebuildKeyProcessedCluster); } - /** @return Gets {@code IsIndexRebuildInProgress} metric for given cache. */ - private BooleanMetric isIndexRebuildInProgressMetric(IgniteEx ignite, String cacheName) { + /** + * Get index rebuild metric. + * + * @param ignite Node. + * @param cacheName Cache name. + * @param name Name of the metric. + * @return Gets {@code IsIndexRebuildInProgress} metric for given cache. + */ + private <M extends Metric> M indexRebuildMetric(IgniteEx ignite, String cacheName, String name) { MetricRegistry mreg = ignite.context().metric().registry(cacheMetricsRegistryName(cacheName, false)); - return mreg.findMetric("IsIndexRebuildInProgress"); + return mreg.findMetric(name); + } + + /** + * Get cache metrics. + * + * @param node Node. + * @param cacheName Cache name. + * @return Cache metrics. + */ + private CacheMetrics cacheMetrics(IgniteEx node, String cacheName) { + requireNonNull(node); + requireNonNull(cacheName); + + return node.context().cache().cacheGroup(CU.cacheId(cacheName)).singleCacheContext().cache().metrics0(); + } + + /** + * Get cache metrics MXBean. + * + * @param n Node. + * @param cacheName Cache name. + * @param cls Cache metrics MXBean implementation. + * @return Cache metrics MXBean. + */ + private <T extends CacheMetricsMXBean> T cacheMetricsMXBean(IgniteEx n, String cacheName, Class<? super T> cls) { + requireNonNull(n); + requireNonNull(cacheName); + requireNonNull(cls); + + return (T)getMxBean(n.name(), cacheName, cls.getName(), CacheMetricsMXBean.class); + } + + /** + * Assertion that expected value is equal with all actual values. + * + * @param exp Expected value. + * @param actuals Suppliers of actual values. + */ + private void assertEquals(boolean exp, BooleanSupplier... actuals) { + requireNonNull(actuals); + + for (int i = 0; i < actuals.length; i++) + assertEquals("i=" + i, exp, actuals[i].getAsBoolean()); + } + + /** + * Assertion that expected value is equal with all actual values. + * + * @param exp Expected value. + * @param actuals Suppliers of actual values. + */ + private void assertEquals(long exp, LongSupplier... actuals) { + requireNonNull(actuals); + + for (int i = 0; i < actuals.length; i++) + assertEquals("i=" + i, exp, actuals[i].getAsLong()); } }