Repository: ignite Updated Branches: refs/heads/master 49a565cee -> cf09e7691
IGNITE-8554 Cache metrics: expose metrics with rebalance info about keys - Fixes #4094. Signed-off-by: Ivan Rakov <ira...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cf09e769 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cf09e769 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cf09e769 Branch: refs/heads/master Commit: cf09e76916c38f867cd5cb618e345c74672026db Parents: 49a565c Author: Ilya Lantukh <ilant...@gridgain.com> Authored: Tue Jun 19 18:10:48 2018 +0300 Committer: Ivan Rakov <ira...@apache.org> Committed: Tue Jun 19 18:10:48 2018 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/cache/CacheMetrics.java | 10 +++ .../cache/CacheAffinitySharedManager.java | 3 +- .../cache/CacheClusterMetricsMXBeanImpl.java | 10 +++ .../cache/CacheLocalMetricsMXBeanImpl.java | 8 ++ .../processors/cache/CacheMetricsImpl.java | 15 +++- .../processors/cache/CacheMetricsSnapshot.java | 30 ++++++++ .../GridCachePartitionExchangeManager.java | 13 +++- .../dht/GridClientPartitionTopology.java | 35 +++++++++ .../dht/GridDhtPartitionTopology.java | 11 +++ .../dht/GridDhtPartitionTopologyImpl.java | 34 +++++++++ .../dht/preloader/GridDhtPartitionDemander.java | 28 ++++--- .../GridDhtPartitionSupplyMessage.java | 9 +-- .../GridDhtPartitionsExchangeFuture.java | 60 +++++++++++++-- .../preloader/GridDhtPartitionsFullMessage.java | 80 ++++++++++++++++++-- .../GridDhtPartitionsSingleMessage.java | 24 +++--- .../platform/cache/PlatformCache.java | 2 + .../internal/visor/cache/VisorCacheMetrics.java | 35 +++++++++ .../visor/node/VisorNodeDataCollectorJob.java | 20 ++++- .../cache/CacheGroupsMetricsRebalanceTest.java | 50 ++++++------ .../platform/PlatformCacheWriteMetricsTask.java | 10 +++ .../Apache.Ignite.Core/Cache/ICacheMetrics.cs | 18 +++++ .../Impl/Cache/CacheMetricsImpl.cs | 14 ++++ 22 files changed, 446 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index 99bf2c2..e3e6446 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -509,6 +509,16 @@ public interface CacheMetrics { public int getRebalancingPartitionsCount(); /** + * @return Number of already rebalanced keys. + */ + public long getRebalancedKeys(); + + /** + * @return Number estimated to rebalance keys. + */ + public long getEstimatedRebalancingKeys(); + + /** * @return Estimated number of keys to be rebalanced on current node. */ public long getKeysToRebalanceLeft(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 92b8d3e..08eb43f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -472,6 +472,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap clientTop.partitionMap(true), clientTop.fullUpdateCounters(), Collections.<Integer>emptySet(), + null, null); } @@ -530,7 +531,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap grp.topology().updateTopologyVersion(topFut, discoCache, -1, false); - grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet(), null); + grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet(), null, null); topFut.validate(grp, discoCache.allNodes()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java index 32603cb..3d5278c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java @@ -370,6 +370,16 @@ class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean { } /** {@inheritDoc} */ + @Override public long getRebalancedKeys() { + return cache.clusterMetrics().getRebalancedKeys(); + } + + /** {@inheritDoc} */ + @Override public long getEstimatedRebalancingKeys() { + return cache.clusterMetrics().getEstimatedRebalancingKeys(); + } + + /** {@inheritDoc} */ @Override public int getRebalancingPartitionsCount() { return cache.clusterMetrics().getRebalancingPartitionsCount(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java index d3060d3..212c7a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java @@ -370,6 +370,14 @@ class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean { return cache.metrics0().getTotalPartitionsCount(); } + @Override public long getRebalancedKeys() { + return cache.metrics0().getRebalancedKeys(); + } + + @Override public long getEstimatedRebalancingKeys() { + return cache.metrics0().getEstimatedRebalancingKeys(); + } + /** {@inheritDoc} */ @Override public int getRebalancingPartitionsCount() { return cache.metrics0().getRebalancingPartitionsCount(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 96f40bf..0f6d06f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -857,6 +857,16 @@ public class CacheMetricsImpl implements CacheMetrics { } /** {@inheritDoc} */ + @Override public long getRebalancedKeys() { + return rebalancedKeys.get(); + } + + /** {@inheritDoc} */ + @Override public long getEstimatedRebalancingKeys() { + return estimatedRebalancingKeys.get(); + } + + /** {@inheritDoc} */ @Override public long getKeysToRebalanceLeft() { return Math.max(0, estimatedRebalancingKeys.get() - rebalancedKeys.get()); } @@ -935,7 +945,10 @@ public class CacheMetricsImpl implements CacheMetrics { * First rebalance supply message callback. * @param keysCnt Estimated number of keys. */ - public void onRebalancingKeysCountEstimateReceived(long keysCnt) { + public void onRebalancingKeysCountEstimateReceived(Long keysCnt) { + if (keysCnt == null) + return; + estimatedRebalancingKeys.addAndGet(keysCnt); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index 539ad59..8a0f0e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -197,6 +197,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** Rebalancing partitions count. */ private int rebalancingPartitionsCnt; + /** Number of already rebalanced keys. */ + private long rebalancedKeys; + + /** Number estimated to rebalance keys. */ + private long estimatedRebalancingKeys; + /** Keys to rebalance left. */ private long keysToRebalanceLeft; @@ -331,6 +337,8 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { totalPartitionsCnt = entriesStat.totalPartitionsCount(); rebalancingPartitionsCnt = entriesStat.rebalancingPartitionsCount(); + rebalancedKeys = m.getRebalancedKeys(); + estimatedRebalancingKeys = m.getEstimatedRebalancingKeys(); keysToRebalanceLeft = m.getKeysToRebalanceLeft(); rebalancingBytesRate = m.getRebalancingBytesRate(); rebalancingKeysRate = m.getRebalancingKeysRate(); @@ -459,6 +467,8 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { else writeBehindErrorRetryCnt = -1; + rebalancedKeys += e.getRebalancedKeys(); + estimatedRebalancingKeys += e.getEstimatedRebalancingKeys(); totalPartitionsCnt += e.getTotalPartitionsCount(); rebalancingPartitionsCnt += e.getRebalancingPartitionsCount(); keysToRebalanceLeft += e.getKeysToRebalanceLeft(); @@ -733,6 +743,14 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { return totalPartitionsCnt; } + @Override public long getRebalancedKeys() { + return rebalancedKeys; + } + + @Override public long getEstimatedRebalancingKeys() { + return estimatedRebalancingKeys; + } + /** {@inheritDoc} */ @Override public int getRebalancingPartitionsCount() { return rebalancingPartitionsCnt; @@ -926,6 +944,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { out.writeLong(keysToRebalanceLeft); out.writeLong(rebalancingBytesRate); out.writeLong(rebalancingKeysRate); + + out.writeLong(rebalancedKeys); + out.writeLong(estimatedRebalancingKeys); + out.writeLong(rebalanceStartTime); + out.writeLong(rebalanceFinishTime); + out.writeLong(rebalanceClearingPartitionsLeft); } /** {@inheritDoc} */ @@ -981,5 +1005,11 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { keysToRebalanceLeft = in.readLong(); rebalancingBytesRate = in.readLong(); rebalancingKeysRate = in.readLong(); + + rebalancedKeys = in.readLong(); + estimatedRebalancingKeys = in.readLong(); + rebalanceStartTime = in.readLong(); + rebalanceFinishTime = in.readLong(); + rebalanceClearingPartitionsLeft = in.readLong(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 38ddaf6..715c290 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1121,6 +1121,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana affCache.similarAffinityKey()); } + m.addPartitionSizes(grp.groupId(), grp.topology().globalPartSizes()); + if (exchId != null) { CachePartitionFullCountersMap cntrsMap = grp.topology().fullUpdateCounters(); @@ -1154,6 +1156,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana m.addPartitionUpdateCounters(top.groupId(), cntrsMap); else m.addPartitionUpdateCounters(top.groupId(), CachePartitionFullCountersMap.toCountersMap(cntrsMap)); + + m.addPartitionSizes(top.groupId(), top.globalPartSizes()); } } @@ -1264,9 +1268,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana m.addPartitionUpdateCounters(grp.groupId(), newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); - - m.addPartitionSizes(grp.groupId(), grp.topology().partitionSizes()); } + + m.addPartitionSizes(grp.groupId(), grp.topology().partitionSizes()); } } @@ -1288,9 +1292,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana m.addPartitionUpdateCounters(top.groupId(), newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); - - m.addPartitionSizes(top.groupId(), top.partitionSizes()); } + + m.addPartitionSizes(top.groupId(), top.partitionSizes()); } return m; @@ -1482,6 +1486,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), grpId), + msg.partitionSizes(grpId), msg.topologyVersion()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index b365233..fc80bbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -119,6 +119,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** */ private final int parts; + /** */ + private volatile Map<Integer, Long> globalPartSizes; + /** * @param cctx Context. * @param discoCache Discovery data cache. @@ -707,6 +710,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap cntrMap, Set<Integer> partsToReload, + @Nullable Map<Integer, Long> partSizes, @Nullable AffinityTopologyVersion msgTopVer) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']'); @@ -810,6 +814,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (cntrMap != null) this.cntrMap = new CachePartitionFullCountersMap(cntrMap); + if (partSizes != null) + this.globalPartSizes = partSizes; + consistencyCheck(); if (log.isDebugEnabled()) @@ -1223,6 +1230,34 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override @Nullable public Map<Integer, Long> globalPartSizes() { + lock.readLock().lock(); + + try { + if (globalPartSizes == null) + return Collections.emptyMap(); + + return Collections.unmodifiableMap(globalPartSizes); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void globalPartSizes(@Nullable Map<Integer, Long> partSizes) { + lock.writeLock().lock(); + + try { + this.globalPartSizes = partSizes; + } + finally { + lock.writeLock().unlock(); + } + } + + + /** {@inheritDoc} */ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { assert false : "Should not be called on non-affinity node"; http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index d6c5450..b77dbd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -287,6 +287,7 @@ public interface GridDhtPartitionTopology { GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap cntrMap, Set<Integer> partsToReload, + @Nullable Map<Integer, Long> partSizes, @Nullable AffinityTopologyVersion msgTopVer); /** @@ -383,6 +384,16 @@ public interface GridDhtPartitionTopology { public void printMemoryStats(int threshold); /** + * @return Sizes of up-to-date partition versions in topology. + */ + Map<Integer, Long> globalPartSizes(); + + /** + * @param partSizes Sizes of up-to-date partition versions in topology. + */ + void globalPartSizes(@Nullable Map<Integer, Long> partSizes); + + /** * @param topVer Topology version. * @return {@code True} if rebalance process finished. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 0599c23..cabb0b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -138,6 +138,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private final CachePartitionFullCountersMap cntrMap; /** */ + private volatile Map<Integer, Long> globalPartSizes; + + /** */ private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE; /** @@ -1329,6 +1332,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap incomeCntrMap, Set<Integer> partsToReload, + @Nullable Map<Integer, Long> partSizes, @Nullable AffinityTopologyVersion msgTopVer) { if (log.isDebugEnabled()) { log.debug("Updating full partition map [grp=" + grp.cacheOrGroupName() + ", exchVer=" + exchangeVer + @@ -1544,6 +1548,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { updateRebalanceVersion(aff.assignment()); } + if (partSizes != null) + this.globalPartSizes = partSizes; + consistencyCheck(); if (log.isDebugEnabled()) { @@ -2625,6 +2632,33 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public Map<Integer, Long> globalPartSizes() { + lock.readLock().lock(); + + try { + if (globalPartSizes == null) + return Collections.emptyMap(); + + return Collections.unmodifiableMap(globalPartSizes); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void globalPartSizes(@Nullable Map<Integer, Long> partSizes) { + lock.writeLock().lock(); + + try { + this.globalPartSizes = partSizes; + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { AffinityTopologyVersion curTopVer = this.readyTopVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index c94f511..3cfc25f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -307,9 +307,22 @@ public class GridDhtPartitionDemander { metrics.clearRebalanceCounters(); - metrics.startRebalance(0); + for (GridDhtPartitionDemandMessage msg : assignments.values()) { + for (Integer partId : msg.partitions().fullSet()) { + metrics.onRebalancingKeysCountEstimateReceived(grp.topology().globalPartSizes().get(partId)); + } + + CachePartitionPartialCountersMap histMap = msg.partitions().historicalMap(); - rebalanceFut.listen(f -> metrics.clearRebalanceCounters()); + for (int i = 0; i < histMap.size(); i++) { + long from = histMap.initialUpdateCounterAt(i); + long to = histMap.updateCounterAt(i); + + metrics.onRebalancingKeysCountEstimateReceived(to - from); + } + } + + metrics.startRebalance(0); } } @@ -714,8 +727,6 @@ public class GridDhtPartitionDemander { try { AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); - GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext(); - ctx.database().checkpointReadLock(); try { @@ -749,11 +760,10 @@ public class GridDhtPartitionDemander { break; } - if (grp.sharedGroup() && (cctx == null || cctx.cacheId() != entry.cacheId())) - cctx = ctx.cacheContext(entry.cacheId()); - - if (cctx != null && cctx.statisticsEnabled()) - cctx.cache().metrics0().onRebalanceKeyReceived(); + for (GridCacheContext cctx : grp.caches()) { + if (cctx.statisticsEnabled()) + cctx.cache().metrics0().onRebalanceKeyReceived(); + } } // If message was last for this partition, http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 77baa38..4ecffc4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -443,7 +443,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple * @return Estimated keys count. */ public long estimatedKeysCount() { - return estimatedKeysCnt; + return -1; } /** @@ -457,12 +457,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple * @return Estimated keys count for a given cache ID. */ public long keysForCache(int cacheId) { - if (this.keysPerCache == null) - return -1; - - Long cnt = this.keysPerCache.get(cacheId); - - return cnt != null ? cnt : 0; + return -1; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 5b10347..366d8ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -897,6 +897,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte clientTop.partitionMap(true), clientTop.fullUpdateCounters(), Collections.emptySet(), + null, null); } } @@ -2363,6 +2364,37 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @param top Topology. + */ + private void assignPartitionSizes(GridDhtPartitionTopology top) { + Map<Integer, Long> partSizes = new HashMap<>(); + + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { + GridDhtPartitionsSingleMessage singleMsg = e.getValue(); + + GridDhtPartitionMap partMap = singleMsg.partitions().get(top.groupId()); + + if (partMap == null) + continue; + + for (Map.Entry<Integer, GridDhtPartitionState> e0 : partMap.entrySet()) { + int p = e0.getKey(); + GridDhtPartitionState state = e0.getValue(); + + if (state == GridDhtPartitionState.OWNING) + partSizes.put(p, singleMsg.partitionSizes(top.groupId()).get(p)); + } + } + + for (GridDhtLocalPartition locPart : top.currentLocalPartitions()) { + if (locPart.state() == GridDhtPartitionState.OWNING) + partSizes.put(locPart.id(), locPart.fullSize()); + } + + top.globalPartSizes(partSizes); + } + + /** * Collects and determines new owners of partitions for all nodes for given {@code top}. * * @param top Topology to assign. @@ -2402,7 +2434,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte CounterWithNodes maxCntr = maxCntrs.get(p); if (maxCntr == null || cntr > maxCntr.cnt) - maxCntrs.put(p, new CounterWithNodes(cntr, uuid)); + maxCntrs.put(p, new CounterWithNodes(cntr, e.getValue().partitionSizes(top.groupId()).get(p), uuid)); else if (cntr == maxCntr.cnt) maxCntr.nodes.add(uuid); } @@ -2428,7 +2460,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte CounterWithNodes maxCntr = maxCntrs.get(part.id()); if (maxCntr == null && cntr == 0) { - CounterWithNodes cntrObj = new CounterWithNodes(0, cctx.localNodeId()); + CounterWithNodes cntrObj = new CounterWithNodes(0, 0L, cctx.localNodeId()); for (UUID nodeId : msgs.keySet()) { if (top.partitionState(nodeId, part.id()) == GridDhtPartitionState.OWNING) @@ -2438,7 +2470,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte maxCntrs.put(part.id(), cntrObj); } else if (maxCntr == null || cntr > maxCntr.cnt) - maxCntrs.put(part.id(), new CounterWithNodes(cntr, cctx.localNodeId())); + maxCntrs.put(part.id(), new CounterWithNodes(cntr, part.fullSize(), cctx.localNodeId())); else if (cntr == maxCntr.cnt) maxCntr.nodes.add(cctx.localNodeId()); } @@ -2490,6 +2522,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet()) ownersByUpdCounters.put(e.getKey(), e.getValue().nodes); + Map<Integer, Long> partSizes = new HashMap<>(maxCntrs.size()); + for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet()) + partSizes.put(e.getKey(), e.getValue().size); + + top.globalPartSizes(partSizes); + Map<UUID, Set<Integer>> partitionsToRebalance = top.resetOwners(ownersByUpdCounters, haveHistory); for (Map.Entry<UUID, Set<Integer>> e : partitionsToRebalance.entrySet()) { @@ -2903,16 +2941,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (grpDesc.config().getCacheMode() == CacheMode.LOCAL) continue; - if (!CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration())) - continue; - CacheGroupContext grpCtx = cctx.cache().cacheGroup(e.getKey()); GridDhtPartitionTopology top = grpCtx != null ? grpCtx.topology() : cctx.exchange().clientTopology(e.getKey(), events().discoveryCache()); - assignPartitionStates(top); + if (!CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration())) + assignPartitionSizes(top); + else + assignPartitionStates(top); } } @@ -3303,6 +3341,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte entry.getValue(), cntrMap, msg.partsToReload(cctx.localNodeId(), grpId), + msg.partitionSizes(grpId), null); } else { @@ -3318,6 +3357,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte entry.getValue(), cntrMap, Collections.emptySet(), + null, null); } } @@ -3903,14 +3943,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private final long cnt; /** */ + private final long size; + + /** */ private final Set<UUID> nodes = new HashSet<>(); /** * @param cnt Count. * @param firstNode Node ID. */ - private CounterWithNodes(long cnt, UUID firstNode) { + private CounterWithNodes(long cnt, @Nullable Long size, UUID firstNode) { this.cnt = cnt; + this.size = size != null ? size : 0; nodes.add(firstNode); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 4a449d1..5962468 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -93,6 +93,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** Serialized partitions that must be cleared and re-loaded. */ private byte[] partsToReloadBytes; + /** Partitions sizes. */ + @GridToStringInclude + @GridDirectTransient + private Map<Integer, Map<Integer, Long>> partsSizes; + + /** Serialized partitions sizes. */ + private byte[] partsSizesBytes; + /** Topology version. */ private AffinityTopologyVersion topVer; @@ -164,6 +172,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa cp.partHistSuppliersBytes = partHistSuppliersBytes; cp.partsToReload = partsToReload; cp.partsToReloadBytes = partsToReloadBytes; + cp.partsSizes = partsSizes; + cp.partsSizesBytes = partsSizesBytes; cp.topVer = topVer; cp.errs = errs; cp.errsBytes = errsBytes; @@ -331,6 +341,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa return partHistSuppliers; } + /** + * + */ public Set<Integer> partsToReload(UUID nodeId, int grpId) { if (partsToReload == null) return Collections.emptySet(); @@ -339,6 +352,35 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } /** + * Adds partition sizes map for specified {@code grpId} to the current message. + * + * @param grpId Group id. + * @param partSizesMap Partition sizes map. + */ + public void addPartitionSizes(int grpId, Map<Integer, Long> partSizesMap) { + if (partSizesMap.isEmpty()) + return; + + if (partsSizes == null) + partsSizes = new HashMap<>(); + + partsSizes.put(grpId, partSizesMap); + } + + /** + * Returns partition sizes map for specified {@code grpId}. + * + * @param grpId Group id. + * @return Partition sizes map (partId, partSize). + */ + public Map<Integer, Long> partitionSizes(int grpId) { + if (partsSizes == null) + return Collections.emptyMap(); + + return partsSizes.getOrDefault(grpId, Collections.emptyMap()); + } + + /** * @return Errors map. */ @Nullable Map<UUID, Exception> getErrorsMap() { @@ -369,6 +411,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa byte[] partCntrsBytes20 = null; byte[] partHistSuppliersBytes0 = null; byte[] partsToReloadBytes0 = null; + byte[] partsSizesBytes0 = null; byte[] errsBytes0 = null; if (!F.isEmpty(parts) && partsBytes == null) @@ -386,6 +429,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (partsToReload != null && partsToReloadBytes == null) partsToReloadBytes0 = U.marshal(ctx, partsToReload); + if (partsSizes != null && partsSizesBytes == null) + partsSizesBytes0 = U.marshal(ctx, partsSizes); + if (!F.isEmpty(errs) && errsBytes == null) errsBytes0 = U.marshal(ctx, errs); @@ -398,6 +444,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa byte[] partCntrsBytes2Zip = U.zip(partCntrsBytes20); byte[] partHistSuppliersBytesZip = U.zip(partHistSuppliersBytes0); byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0); + byte[] partsSizesBytesZip = U.zip(partsSizesBytes0); byte[] exsBytesZip = U.zip(errsBytes0); partsBytes0 = partsBytesZip; @@ -405,6 +452,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa partCntrsBytes20 = partCntrsBytes2Zip; partHistSuppliersBytes0 = partHistSuppliersBytesZip; partsToReloadBytes0 = partsToReloadBytesZip; + partsSizesBytes0 = partsSizesBytesZip; errsBytes0 = exsBytesZip; compressed(true); @@ -419,6 +467,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa partCntrsBytes2 = partCntrsBytes20; partHistSuppliersBytes = partHistSuppliersBytes0; partsToReloadBytes = partsToReloadBytes0; + partsSizesBytes = partsSizesBytes0; errsBytes = errsBytes0; } } @@ -506,6 +555,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa partsToReload = U.unmarshal(ctx, partsToReloadBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } + if (partsSizesBytes != null && partsSizes == null) { + if (compressed()) + partsSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + partsSizes = U.unmarshal(ctx, partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } + if (partCntrs == null) partCntrs = new IgniteDhtPartitionCountersMap(); @@ -584,18 +640,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa writer.incrementState(); case 13: - if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes)) + if (!writer.writeByteArray("partsSizesBytes", partsSizesBytes)) return false; writer.incrementState(); case 14: - if (!writer.writeMessage("resTopVer", resTopVer)) + if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes)) return false; writer.incrementState(); case 15: + if (!writer.writeMessage("resTopVer", resTopVer)) + return false; + + writer.incrementState(); + + case 16: if (!writer.writeMessage("topVer", topVer)) return false; @@ -682,7 +744,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 13: - partsToReloadBytes = reader.readByteArray("partsToReloadBytes"); + partsSizesBytes = reader.readByteArray("partsSizesBytes"); if (!reader.isLastRead()) return false; @@ -690,7 +752,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 14: - resTopVer = reader.readMessage("resTopVer"); + partsToReloadBytes = reader.readByteArray("partsToReloadBytes"); if (!reader.isLastRead()) return false; @@ -698,6 +760,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 15: + resTopVer = reader.readMessage("resTopVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 16: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -717,7 +787,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 16; + return 17; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index b60070e..804cc03 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -70,7 +70,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Partitions sizes. */ @GridToStringInclude @GridDirectTransient - private Map<Integer, Map<Integer, Long>> partSizes; + private Map<Integer, Map<Integer, Long>> partsSizes; /** Serialized partitions counters. */ private byte[] partsSizesBytes; @@ -237,10 +237,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partSizesMap.isEmpty()) return; - if (partSizes == null) - partSizes = new HashMap<>(); + if (partsSizes == null) + partsSizes = new HashMap<>(); - partSizes.put(grpId, partSizesMap); + partsSizes.put(grpId, partSizesMap); } /** @@ -250,10 +250,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes * @return Partition sizes map (partId, partSize). */ public Map<Integer, Long> partitionSizes(int grpId) { - if (partSizes == null) + if (partsSizes == null) return Collections.emptyMap(); - return partSizes.getOrDefault(grpId, Collections.emptyMap()); + return partsSizes.getOrDefault(grpId, Collections.emptyMap()); } /** @@ -324,7 +324,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes boolean marshal = (parts != null && partsBytes == null) || (partCntrs != null && partCntrsBytes == null) || (partHistCntrs != null && partHistCntrsBytes == null) || - (partSizes != null && partsSizesBytes == null) || + (partsSizes != null && partsSizesBytes == null) || (err != null && errBytes == null); if (marshal) { @@ -343,8 +343,8 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partHistCntrs != null && partHistCntrsBytes == null) partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs); - if (partSizes != null && partsSizesBytes == null) - partSizesBytes0 = U.marshal(ctx, partSizes); + if (partsSizes != null && partsSizesBytes == null) + partSizesBytes0 = U.marshal(ctx, partsSizes); if (err != null && errBytes == null) errBytes0 = U.marshal(ctx, err); @@ -405,11 +405,11 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } - if (partsSizesBytes != null && partSizes == null) { + if (partsSizesBytes != null && partsSizes == null) { if (compressed()) - partSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + partsSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); else - partSizes = U.unmarshal(ctx, partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + partsSizes = U.unmarshal(ctx, partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } if (errBytes != null && err == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java index 818fd67..d930c6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -1504,6 +1504,8 @@ public class PlatformCache extends PlatformAbstractTarget { writer.writeLong(metrics.getRebalancingStartTime()); writer.writeLong(metrics.getRebalanceClearingPartitionsLeft()); writer.writeLong(metrics.getCacheSize()); + writer.writeLong(metrics.getRebalancedKeys()); + writer.writeLong(metrics.getEstimatedRebalancingKeys()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java index 59f16b2..854bbd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java @@ -175,6 +175,12 @@ public class VisorCacheMetrics extends VisorDataTransferObject { /** Total number of partitions on current node. */ private int totalPartsCnt; + /** Number of already rebalanced keys. */ + private long rebalancedKeys; + + /** Number estimated to rebalance keys. */ + private long estimatedRebalancingKeys; + /** Number of currently rebalancing partitions on current node. */ private int rebalancingPartsCnt; @@ -276,6 +282,8 @@ public class VisorCacheMetrics extends VisorDataTransferObject { offHeapPrimaryEntriesCnt = m.getOffHeapPrimaryEntriesCount(); totalPartsCnt = m.getTotalPartitionsCount(); + rebalancedKeys = m.getRebalancedKeys(); + estimatedRebalancingKeys = m.getEstimatedRebalancingKeys(); rebalancingPartsCnt = m.getRebalancingPartitionsCount(); keysToRebalanceLeft = m.getKeysToRebalanceLeft(); rebalancingKeysRate = m.getRebalancingKeysRate(); @@ -623,6 +631,20 @@ public class VisorCacheMetrics extends VisorDataTransferObject { } /** + * @return Number of already rebalanced keys. + */ + public long getRebalancedKeys() { + return rebalancedKeys; + } + + /** + * @return Number estimated to rebalance keys. + */ + public long getEstimatedRebalancingKeys() { + return estimatedRebalancingKeys; + } + + /** * @return Number of currently rebalancing partitions on current node. */ public int getRebalancingPartitionsCount() { @@ -651,6 +673,11 @@ public class VisorCacheMetrics extends VisorDataTransferObject { } /** {@inheritDoc} */ + @Override public byte getProtocolVersion() { + return V2; + } + + /** {@inheritDoc} */ @Override protected void writeExternalData(ObjectOutput out) throws IOException { U.writeString(out, name); U.writeEnum(out, mode); @@ -708,6 +735,9 @@ public class VisorCacheMetrics extends VisorDataTransferObject { out.writeObject(qryMetrics); out.writeLong(cacheSize); + + out.writeLong(rebalancedKeys); + out.writeLong(estimatedRebalancingKeys); } /** {@inheritDoc} */ @@ -768,6 +798,11 @@ public class VisorCacheMetrics extends VisorDataTransferObject { if (in.available() > 0) cacheSize = in.readLong(); + + if (protoVer > V1) { + rebalancedKeys = in.readLong(); + estimatedRebalancingKeys = in.readLong(); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java index fda23a2..14b9281 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java @@ -183,8 +183,9 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa List<VisorCache> resCaches = res.getCaches(); + int partitions = 0; double total = 0; - double moving = 0; + double ready = 0; for (String cacheName : cacheProc.cacheNames()) { if (proxyCache(cacheName)) @@ -201,8 +202,16 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa CacheMetrics cm = ca.localMetrics(); - total += cm.getTotalPartitionsCount(); - moving += cm.getRebalancingPartitionsCount(); + partitions += cm.getTotalPartitionsCount(); + + long partTotal = cm.getEstimatedRebalancingKeys(); + long partReady = cm.getRebalancedKeys(); + + if (partReady >= partTotal) + partReady = Math.max(partTotal - 1, 0); + + total += partTotal; + ready += partReady; resCaches.add(new VisorCache(ignite, ca, arg.isCollectCacheMetrics())); } @@ -217,7 +226,10 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa } } - res.setRebalance(total > 0 ? (total - moving) / total : -1); + if (partitions == 0) + res.setRebalance(-1); + else + res.setRebalance(total > 0 ? ready / total : 1); } catch (Exception e) { res.setCachesEx(new VisorExceptionWrapper(e)); http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java index a055909..af2dc63 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; @@ -159,12 +160,12 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest { assertTrue(rate1 > 0); assertTrue(rate2 > 0); - // rate1 has to be roughly twice more than rate2. - double ratio = ((double)rate2 / rate1) * 100; + // rate1 has to be roughly the same as rate2 + double ratio = ((double)rate2 / rate1); log.info("Ratio: " + ratio); - assertTrue(ratio > 40 && ratio < 60); + assertTrue(ratio > 0.9 && ratio < 1.1); } /** @@ -225,29 +226,27 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest { log.info("Wait until keys left will be less than: " + keysLine); - try { - while (finishRebalanceLatch.getCount() != 0) { - CacheMetrics m = ig2.cache(CACHE1).localMetrics(); + while (true) { + CacheMetrics m = ig2.cache(CACHE1).localMetrics(); - long keyLeft = m.getKeysToRebalanceLeft(); + long keyLeft = m.getKeysToRebalanceLeft(); - if (keyLeft > 0 && keyLeft < keysLine) - latch.countDown(); + if (keyLeft > 0 && keyLeft < keysLine) { + latch.countDown(); - log.info("Keys left: " + m.getKeysToRebalanceLeft()); + break; + } - try { - Thread.sleep(1_000); - } - catch (InterruptedException e) { - log.warning("Interrupt thread", e); + log.info("Keys left: " + m.getKeysToRebalanceLeft()); - Thread.currentThread().interrupt(); - } + try { + Thread.sleep(1_000); + } + catch (InterruptedException e) { + log.warning("Interrupt thread", e); + + Thread.currentThread().interrupt(); } - } - finally { - latch.countDown(); } } }); @@ -270,8 +269,15 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest { long timePassed = currTime - startTime; long timeLeft = finishTime - currTime; - assertTrue("Got timeout while waiting for rebalancing. Estimated left time: " + timeLeft, - finishRebalanceLatch.await(timeLeft + 2_000L, TimeUnit.MILLISECONDS)); + // TODO: finishRebalanceLatch gets countdown much earlier because of ForceRebalanceExchangeTask triggered by cache with delay +// assertTrue("Got timeout while waiting for rebalancing. Estimated left time: " + timeLeft, +// finishRebalanceLatch.await(timeLeft + 10_000L, TimeUnit.MILLISECONDS)); + + waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ig2.cache(CACHE1).localMetrics().getKeysToRebalanceLeft() == 0; + } + }, timeLeft + 10_000L); log.info("[timePassed=" + timePassed + ", timeLeft=" + timeLeft + ", Time to rebalance=" + (finishTime - startTime) + http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java index 44a15d5..fe61e35 100644 --- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java @@ -473,5 +473,15 @@ public class PlatformCacheWriteMetricsTask extends ComputeTaskAdapter<Long, Obje @Override public long getCacheSize() { return 65; } + + /** {@inheritDoc} */ + @Override public long getRebalancedKeys() { + return 66; + } + + /** {@inheritDoc} */ + @Override public long getEstimatedRebalancingKeys() { + return 67; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs index d775c05..e0e7301 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs @@ -654,5 +654,23 @@ namespace Apache.Ignite.Core.Cache /// Number of clearing partitions for rebalance. /// </returns> long RebalanceClearingPartitionsLeft { get; } + + /// <summary> + /// Gets number of already rebalanced keys. + /// need to be cleared before actual rebalance start. + /// </summary> + /// <returns> + /// Number of already rebalanced keys. + /// </returns> + long RebalancedKeys { get; } + + /// <summary> + /// Gets number of estimated keys to rebalance. + /// need to be cleared before actual rebalance start. + /// </summary> + /// <returns> + /// Number of estimated keys to rebalance. + /// </returns> + long EstimatedRebalancingKeys { get; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs index 1fdc877..be6980d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs @@ -247,6 +247,12 @@ namespace Apache.Ignite.Core.Impl.Cache /** */ private readonly long _rebalancingClearingPartitionsLeft; + /** */ + private readonly long _rebalancedKeys; + + /** */ + private readonly long _estimatedRebalancedKeys; + /// <summary> /// Initializes a new instance of the <see cref="CacheMetricsImpl"/> class. /// </summary> @@ -327,6 +333,8 @@ namespace Apache.Ignite.Core.Impl.Cache _rebalancingStartTime = reader.ReadLong(); _rebalancingClearingPartitionsLeft = reader.ReadLong(); _cacheSize = reader.ReadLong(); + _rebalancedKeys = reader.ReadLong(); + _estimatedRebalancedKeys = reader.ReadLong(); } /** <inheritDoc /> */ @@ -550,5 +558,11 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public long RebalanceClearingPartitionsLeft { get { return _rebalancingClearingPartitionsLeft; } } + + /** <inheritDoc /> */ + public long RebalancedKeys { get { return _rebalancedKeys; } } + + /** <inheritDoc /> */ + public long EstimatedRebalancingKeys { get { return _estimatedRebalancedKeys; } } } } \ No newline at end of file