Repository: ignite
Updated Branches:
  refs/heads/master 49a565cee -> cf09e7691


IGNITE-8554 Cache metrics: expose metrics with rebalance info about keys - 
Fixes #4094.

Signed-off-by: Ivan Rakov <ira...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cf09e769
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cf09e769
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cf09e769

Branch: refs/heads/master
Commit: cf09e76916c38f867cd5cb618e345c74672026db
Parents: 49a565c
Author: Ilya Lantukh <ilant...@gridgain.com>
Authored: Tue Jun 19 18:10:48 2018 +0300
Committer: Ivan Rakov <ira...@apache.org>
Committed: Tue Jun 19 18:10:48 2018 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/cache/CacheMetrics.java   | 10 +++
 .../cache/CacheAffinitySharedManager.java       |  3 +-
 .../cache/CacheClusterMetricsMXBeanImpl.java    | 10 +++
 .../cache/CacheLocalMetricsMXBeanImpl.java      |  8 ++
 .../processors/cache/CacheMetricsImpl.java      | 15 +++-
 .../processors/cache/CacheMetricsSnapshot.java  | 30 ++++++++
 .../GridCachePartitionExchangeManager.java      | 13 +++-
 .../dht/GridClientPartitionTopology.java        | 35 +++++++++
 .../dht/GridDhtPartitionTopology.java           | 11 +++
 .../dht/GridDhtPartitionTopologyImpl.java       | 34 +++++++++
 .../dht/preloader/GridDhtPartitionDemander.java | 28 ++++---
 .../GridDhtPartitionSupplyMessage.java          |  9 +--
 .../GridDhtPartitionsExchangeFuture.java        | 60 +++++++++++++--
 .../preloader/GridDhtPartitionsFullMessage.java | 80 ++++++++++++++++++--
 .../GridDhtPartitionsSingleMessage.java         | 24 +++---
 .../platform/cache/PlatformCache.java           |  2 +
 .../internal/visor/cache/VisorCacheMetrics.java | 35 +++++++++
 .../visor/node/VisorNodeDataCollectorJob.java   | 20 ++++-
 .../cache/CacheGroupsMetricsRebalanceTest.java  | 50 ++++++------
 .../platform/PlatformCacheWriteMetricsTask.java | 10 +++
 .../Apache.Ignite.Core/Cache/ICacheMetrics.cs   | 18 +++++
 .../Impl/Cache/CacheMetricsImpl.cs              | 14 ++++
 22 files changed, 446 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java 
b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
index 99bf2c2..e3e6446 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
@@ -509,6 +509,16 @@ public interface CacheMetrics {
     public int getRebalancingPartitionsCount();
 
     /**
+     * @return Number of already rebalanced keys.
+     */
+    public long getRebalancedKeys();
+
+    /**
+     * @return Number estimated to rebalance keys.
+     */
+    public long getEstimatedRebalancingKeys();
+
+    /**
      * @return Estimated number of keys to be rebalanced on current node.
      */
     public long getKeysToRebalanceLeft();

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 92b8d3e..08eb43f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -472,6 +472,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                                     clientTop.partitionMap(true),
                                     clientTop.fullUpdateCounters(),
                                     Collections.<Integer>emptySet(),
+                                    null,
                                     null);
                             }
 
@@ -530,7 +531,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
                 grp.topology().updateTopologyVersion(topFut, discoCache, -1, 
false);
 
-                grp.topology().update(topVer, partMap, null, 
Collections.<Integer>emptySet(), null);
+                grp.topology().update(topVer, partMap, null, 
Collections.<Integer>emptySet(), null, null);
 
                 topFut.validate(grp, discoCache.allNodes());
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
index 32603cb..3d5278c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
@@ -370,6 +370,16 @@ class CacheClusterMetricsMXBeanImpl implements 
CacheMetricsMXBean {
     }
 
     /** {@inheritDoc} */
+    @Override public long getRebalancedKeys() {
+        return cache.clusterMetrics().getRebalancedKeys();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEstimatedRebalancingKeys() {
+        return cache.clusterMetrics().getEstimatedRebalancingKeys();
+    }
+
+    /** {@inheritDoc} */
     @Override public int getRebalancingPartitionsCount() {
         return cache.clusterMetrics().getRebalancingPartitionsCount();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
index d3060d3..212c7a0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
@@ -370,6 +370,14 @@ class CacheLocalMetricsMXBeanImpl implements 
CacheMetricsMXBean {
         return cache.metrics0().getTotalPartitionsCount();
     }
 
+    @Override public long getRebalancedKeys() {
+        return cache.metrics0().getRebalancedKeys();
+    }
+
+    @Override public long getEstimatedRebalancingKeys() {
+        return cache.metrics0().getEstimatedRebalancingKeys();
+    }
+
     /** {@inheritDoc} */
     @Override public int getRebalancingPartitionsCount() {
         return cache.metrics0().getRebalancingPartitionsCount();

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 96f40bf..0f6d06f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -857,6 +857,16 @@ public class CacheMetricsImpl implements CacheMetrics {
     }
 
     /** {@inheritDoc} */
+    @Override public long getRebalancedKeys() {
+        return rebalancedKeys.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEstimatedRebalancingKeys() {
+        return estimatedRebalancingKeys.get();
+    }
+
+    /** {@inheritDoc} */
     @Override public long getKeysToRebalanceLeft() {
         return Math.max(0, estimatedRebalancingKeys.get() - 
rebalancedKeys.get());
     }
@@ -935,7 +945,10 @@ public class CacheMetricsImpl implements CacheMetrics {
      * First rebalance supply message callback.
      * @param keysCnt Estimated number of keys.
      */
-    public void onRebalancingKeysCountEstimateReceived(long keysCnt) {
+    public void onRebalancingKeysCountEstimateReceived(Long keysCnt) {
+        if (keysCnt == null)
+            return;
+
         estimatedRebalancingKeys.addAndGet(keysCnt);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index 539ad59..8a0f0e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -197,6 +197,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
     /** Rebalancing partitions count. */
     private int rebalancingPartitionsCnt;
 
+    /** Number of already rebalanced keys. */
+    private long rebalancedKeys;
+
+    /** Number estimated to rebalance keys. */
+    private long estimatedRebalancingKeys;
+
     /** Keys to rebalance left. */
     private long keysToRebalanceLeft;
 
@@ -331,6 +337,8 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
         totalPartitionsCnt = entriesStat.totalPartitionsCount();
         rebalancingPartitionsCnt = entriesStat.rebalancingPartitionsCount();
 
+        rebalancedKeys = m.getRebalancedKeys();
+        estimatedRebalancingKeys = m.getEstimatedRebalancingKeys();
         keysToRebalanceLeft = m.getKeysToRebalanceLeft();
         rebalancingBytesRate = m.getRebalancingBytesRate();
         rebalancingKeysRate = m.getRebalancingKeysRate();
@@ -459,6 +467,8 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
             else
                 writeBehindErrorRetryCnt = -1;
 
+            rebalancedKeys += e.getRebalancedKeys();
+            estimatedRebalancingKeys += e.getEstimatedRebalancingKeys();
             totalPartitionsCnt += e.getTotalPartitionsCount();
             rebalancingPartitionsCnt += e.getRebalancingPartitionsCount();
             keysToRebalanceLeft += e.getKeysToRebalanceLeft();
@@ -733,6 +743,14 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
         return totalPartitionsCnt;
     }
 
+    @Override public long getRebalancedKeys() {
+        return rebalancedKeys;
+    }
+
+    @Override public long getEstimatedRebalancingKeys() {
+        return estimatedRebalancingKeys;
+    }
+
     /** {@inheritDoc} */
     @Override public int getRebalancingPartitionsCount() {
         return rebalancingPartitionsCnt;
@@ -926,6 +944,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
         out.writeLong(keysToRebalanceLeft);
         out.writeLong(rebalancingBytesRate);
         out.writeLong(rebalancingKeysRate);
+
+        out.writeLong(rebalancedKeys);
+        out.writeLong(estimatedRebalancingKeys);
+        out.writeLong(rebalanceStartTime);
+        out.writeLong(rebalanceFinishTime);
+        out.writeLong(rebalanceClearingPartitionsLeft);
     }
 
     /** {@inheritDoc} */
@@ -981,5 +1005,11 @@ public class CacheMetricsSnapshot implements 
CacheMetrics, Externalizable {
         keysToRebalanceLeft = in.readLong();
         rebalancingBytesRate = in.readLong();
         rebalancingKeysRate = in.readLong();
+
+        rebalancedKeys = in.readLong();
+        estimatedRebalancingKeys = in.readLong();
+        rebalanceStartTime = in.readLong();
+        rebalanceFinishTime = in.readLong();
+        rebalanceClearingPartitionsLeft = in.readLong();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 38ddaf6..715c290 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1121,6 +1121,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         affCache.similarAffinityKey());
                 }
 
+                m.addPartitionSizes(grp.groupId(), 
grp.topology().globalPartSizes());
+
                 if (exchId != null) {
                     CachePartitionFullCountersMap cntrsMap = 
grp.topology().fullUpdateCounters();
 
@@ -1154,6 +1156,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     m.addPartitionUpdateCounters(top.groupId(), cntrsMap);
                 else
                     m.addPartitionUpdateCounters(top.groupId(), 
CachePartitionFullCountersMap.toCountersMap(cntrsMap));
+
+                m.addPartitionSizes(top.groupId(), top.globalPartSizes());
             }
         }
 
@@ -1264,9 +1268,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                     m.addPartitionUpdateCounters(grp.groupId(),
                         newCntrMap ? cntrsMap : 
CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
-
-                    m.addPartitionSizes(grp.groupId(), 
grp.topology().partitionSizes());
                 }
+
+                m.addPartitionSizes(grp.groupId(), 
grp.topology().partitionSizes());
             }
         }
 
@@ -1288,9 +1292,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                 m.addPartitionUpdateCounters(top.groupId(),
                     newCntrMap ? cntrsMap : 
CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
-
-                m.addPartitionSizes(top.groupId(), top.partitionSizes());
             }
+
+            m.addPartitionSizes(top.groupId(), top.partitionSizes());
         }
 
         return m;
@@ -1482,6 +1486,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                             entry.getValue(),
                             null,
                             msg.partsToReload(cctx.localNodeId(), grpId),
+                            msg.partitionSizes(grpId),
                             msg.topologyVersion());
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index b365233..fc80bbc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -119,6 +119,9 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     /** */
     private final int parts;
 
+    /** */
+    private volatile Map<Integer, Long> globalPartSizes;
+
     /**
      * @param cctx Context.
      * @param discoCache Discovery data cache.
@@ -707,6 +710,7 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
         GridDhtPartitionFullMap partMap,
         @Nullable CachePartitionFullCountersMap cntrMap,
         Set<Integer> partsToReload,
+        @Nullable Map<Integer, Long> partSizes,
         @Nullable AffinityTopologyVersion msgTopVer) {
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchVer=" + exchangeVer + 
", parts=" + fullMapString() + ']');
@@ -810,6 +814,9 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
             if (cntrMap != null)
                 this.cntrMap = new CachePartitionFullCountersMap(cntrMap);
 
+            if (partSizes != null)
+                this.globalPartSizes = partSizes;
+
             consistencyCheck();
 
             if (log.isDebugEnabled())
@@ -1223,6 +1230,34 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override @Nullable public Map<Integer, Long> globalPartSizes() {
+        lock.readLock().lock();
+
+        try {
+            if (globalPartSizes == null)
+                return Collections.emptyMap();
+
+            return Collections.unmodifiableMap(globalPartSizes);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void globalPartSizes(@Nullable Map<Integer, Long> 
partSizes) {
+        lock.writeLock().lock();
+
+        try {
+            this.globalPartSizes = partSizes;
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+
+    /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) 
{
         assert false : "Should not be called on non-affinity node";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d6c5450..b77dbd6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -287,6 +287,7 @@ public interface GridDhtPartitionTopology {
         GridDhtPartitionFullMap partMap,
         @Nullable CachePartitionFullCountersMap cntrMap,
         Set<Integer> partsToReload,
+        @Nullable Map<Integer, Long> partSizes,
         @Nullable AffinityTopologyVersion msgTopVer);
 
     /**
@@ -383,6 +384,16 @@ public interface GridDhtPartitionTopology {
     public void printMemoryStats(int threshold);
 
     /**
+     * @return Sizes of up-to-date partition versions in topology.
+     */
+    Map<Integer, Long> globalPartSizes();
+
+    /**
+     * @param partSizes Sizes of up-to-date partition versions in topology.
+     */
+    void globalPartSizes(@Nullable Map<Integer, Long> partSizes);
+
+    /**
      * @param topVer Topology version.
      * @return {@code True} if rebalance process finished.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 0599c23..cabb0b8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -138,6 +138,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     private final CachePartitionFullCountersMap cntrMap;
 
     /** */
+    private volatile Map<Integer, Long> globalPartSizes;
+
+    /** */
     private volatile AffinityTopologyVersion rebalancedTopVer = 
AffinityTopologyVersion.NONE;
 
     /**
@@ -1329,6 +1332,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         GridDhtPartitionFullMap partMap,
         @Nullable CachePartitionFullCountersMap incomeCntrMap,
         Set<Integer> partsToReload,
+        @Nullable Map<Integer, Long> partSizes,
         @Nullable AffinityTopologyVersion msgTopVer) {
         if (log.isDebugEnabled()) {
             log.debug("Updating full partition map [grp=" + 
grp.cacheOrGroupName() + ", exchVer=" + exchangeVer +
@@ -1544,6 +1548,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                     updateRebalanceVersion(aff.assignment());
                 }
 
+                if (partSizes != null)
+                    this.globalPartSizes = partSizes;
+
                 consistencyCheck();
 
                 if (log.isDebugEnabled()) {
@@ -2625,6 +2632,33 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public Map<Integer, Long> globalPartSizes() {
+        lock.readLock().lock();
+
+        try {
+            if (globalPartSizes == null)
+                return Collections.emptyMap();
+
+            return Collections.unmodifiableMap(globalPartSizes);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void globalPartSizes(@Nullable Map<Integer, Long> 
partSizes) {
+        lock.writeLock().lock();
+
+        try {
+            this.globalPartSizes = partSizes;
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) 
{
         AffinityTopologyVersion curTopVer = this.readyTopVer;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index c94f511..3cfc25f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -307,9 +307,22 @@ public class GridDhtPartitionDemander {
 
                     metrics.clearRebalanceCounters();
 
-                    metrics.startRebalance(0);
+                    for (GridDhtPartitionDemandMessage msg : 
assignments.values()) {
+                        for (Integer partId : msg.partitions().fullSet()) {
+                            
metrics.onRebalancingKeysCountEstimateReceived(grp.topology().globalPartSizes().get(partId));
+                        }
+
+                        CachePartitionPartialCountersMap histMap = 
msg.partitions().historicalMap();
 
-                    rebalanceFut.listen(f -> metrics.clearRebalanceCounters());
+                        for (int i = 0; i < histMap.size(); i++) {
+                            long from = histMap.initialUpdateCounterAt(i);
+                            long to = histMap.updateCounterAt(i);
+
+                            metrics.onRebalancingKeysCountEstimateReceived(to 
- from);
+                        }
+                    }
+
+                    metrics.startRebalance(0);
                 }
             }
 
@@ -714,8 +727,6 @@ public class GridDhtPartitionDemander {
         try {
             AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
 
-            GridCacheContext cctx = grp.sharedGroup() ? null : 
grp.singleCacheContext();
-
             ctx.database().checkpointReadLock();
 
             try {
@@ -749,11 +760,10 @@ public class GridDhtPartitionDemander {
                                         break;
                                     }
 
-                                    if (grp.sharedGroup() && (cctx == null || 
cctx.cacheId() != entry.cacheId()))
-                                        cctx = 
ctx.cacheContext(entry.cacheId());
-
-                                    if (cctx != null && 
cctx.statisticsEnabled())
-                                        
cctx.cache().metrics0().onRebalanceKeyReceived();
+                                    for (GridCacheContext cctx : grp.caches()) 
{
+                                        if (cctx.statisticsEnabled())
+                                            
cctx.cache().metrics0().onRebalanceKeyReceived();
+                                    }
                                 }
 
                                 // If message was last for this partition,

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 77baa38..4ecffc4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -443,7 +443,7 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
      * @return Estimated keys count.
      */
     public long estimatedKeysCount() {
-        return estimatedKeysCnt;
+        return -1;
     }
 
     /**
@@ -457,12 +457,7 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
      * @return Estimated keys count for a given cache ID.
      */
     public long keysForCache(int cacheId) {
-        if (this.keysPerCache == null)
-            return -1;
-
-        Long cnt = this.keysPerCache.get(cacheId);
-
-        return cnt != null ? cnt : 0;
+        return -1;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5b10347..366d8ea 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -897,6 +897,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         clientTop.partitionMap(true),
                         clientTop.fullUpdateCounters(),
                         Collections.emptySet(),
+                        null,
                         null);
                 }
             }
@@ -2363,6 +2364,37 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @param top Topology.
+     */
+    private void assignPartitionSizes(GridDhtPartitionTopology top) {
+        Map<Integer, Long> partSizes = new HashMap<>();
+
+        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
msgs.entrySet()) {
+            GridDhtPartitionsSingleMessage singleMsg = e.getValue();
+
+            GridDhtPartitionMap partMap = 
singleMsg.partitions().get(top.groupId());
+
+            if (partMap == null)
+                continue;
+
+            for (Map.Entry<Integer, GridDhtPartitionState> e0 : 
partMap.entrySet()) {
+                int p = e0.getKey();
+                GridDhtPartitionState state = e0.getValue();
+
+                if (state == GridDhtPartitionState.OWNING)
+                    partSizes.put(p, 
singleMsg.partitionSizes(top.groupId()).get(p));
+            }
+        }
+
+        for (GridDhtLocalPartition locPart : top.currentLocalPartitions()) {
+            if (locPart.state() == GridDhtPartitionState.OWNING)
+                partSizes.put(locPart.id(), locPart.fullSize());
+        }
+
+        top.globalPartSizes(partSizes);
+    }
+
+    /**
      * Collects and determines new owners of partitions for all nodes for 
given {@code top}.
      *
      * @param top Topology to assign.
@@ -2402,7 +2434,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 CounterWithNodes maxCntr = maxCntrs.get(p);
 
                 if (maxCntr == null || cntr > maxCntr.cnt)
-                    maxCntrs.put(p, new CounterWithNodes(cntr, uuid));
+                    maxCntrs.put(p, new CounterWithNodes(cntr, 
e.getValue().partitionSizes(top.groupId()).get(p), uuid));
                 else if (cntr == maxCntr.cnt)
                     maxCntr.nodes.add(uuid);
             }
@@ -2428,7 +2460,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             CounterWithNodes maxCntr = maxCntrs.get(part.id());
 
             if (maxCntr == null && cntr == 0) {
-                CounterWithNodes cntrObj = new CounterWithNodes(0, 
cctx.localNodeId());
+                CounterWithNodes cntrObj = new CounterWithNodes(0, 0L, 
cctx.localNodeId());
 
                 for (UUID nodeId : msgs.keySet()) {
                     if (top.partitionState(nodeId, part.id()) == 
GridDhtPartitionState.OWNING)
@@ -2438,7 +2470,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 maxCntrs.put(part.id(), cntrObj);
             }
             else if (maxCntr == null || cntr > maxCntr.cnt)
-                maxCntrs.put(part.id(), new CounterWithNodes(cntr, 
cctx.localNodeId()));
+                maxCntrs.put(part.id(), new CounterWithNodes(cntr, 
part.fullSize(), cctx.localNodeId()));
             else if (cntr == maxCntr.cnt)
                 maxCntr.nodes.add(cctx.localNodeId());
         }
@@ -2490,6 +2522,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet())
             ownersByUpdCounters.put(e.getKey(), e.getValue().nodes);
 
+        Map<Integer, Long> partSizes = new HashMap<>(maxCntrs.size());
+        for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet())
+            partSizes.put(e.getKey(), e.getValue().size);
+
+        top.globalPartSizes(partSizes);
+
         Map<UUID, Set<Integer>> partitionsToRebalance = 
top.resetOwners(ownersByUpdCounters, haveHistory);
 
         for (Map.Entry<UUID, Set<Integer>> e : 
partitionsToRebalance.entrySet()) {
@@ -2903,16 +2941,16 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             if (grpDesc.config().getCacheMode() == CacheMode.LOCAL)
                 continue;
 
-            if (!CU.isPersistentCache(grpDesc.config(), 
cctx.gridConfig().getDataStorageConfiguration()))
-                continue;
-
             CacheGroupContext grpCtx = cctx.cache().cacheGroup(e.getKey());
 
             GridDhtPartitionTopology top = grpCtx != null ?
                 grpCtx.topology() :
                 cctx.exchange().clientTopology(e.getKey(), 
events().discoveryCache());
 
-            assignPartitionStates(top);
+            if (!CU.isPersistentCache(grpDesc.config(), 
cctx.gridConfig().getDataStorageConfiguration()))
+                assignPartitionSizes(top);
+            else
+                assignPartitionStates(top);
         }
     }
 
@@ -3303,6 +3341,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     entry.getValue(),
                     cntrMap,
                     msg.partsToReload(cctx.localNodeId(), grpId),
+                    msg.partitionSizes(grpId),
                     null);
             }
             else {
@@ -3318,6 +3357,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         entry.getValue(),
                         cntrMap,
                         Collections.emptySet(),
+                        null,
                         null);
                 }
             }
@@ -3903,14 +3943,18 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         private final long cnt;
 
         /** */
+        private final long size;
+
+        /** */
         private final Set<UUID> nodes = new HashSet<>();
 
         /**
          * @param cnt Count.
          * @param firstNode Node ID.
          */
-        private CounterWithNodes(long cnt, UUID firstNode) {
+        private CounterWithNodes(long cnt, @Nullable Long size, UUID 
firstNode) {
             this.cnt = cnt;
+            this.size = size != null ? size : 0;
 
             nodes.add(firstNode);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 4a449d1..5962468 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -93,6 +93,14 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     /** Serialized partitions that must be cleared and re-loaded. */
     private byte[] partsToReloadBytes;
 
+    /** Partitions sizes. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Map<Integer, Map<Integer, Long>> partsSizes;
+
+    /** Serialized partitions sizes. */
+    private byte[] partsSizesBytes;
+
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -164,6 +172,8 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         cp.partHistSuppliersBytes = partHistSuppliersBytes;
         cp.partsToReload = partsToReload;
         cp.partsToReloadBytes = partsToReloadBytes;
+        cp.partsSizes = partsSizes;
+        cp.partsSizesBytes = partsSizesBytes;
         cp.topVer = topVer;
         cp.errs = errs;
         cp.errsBytes = errsBytes;
@@ -331,6 +341,9 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         return partHistSuppliers;
     }
 
+    /**
+     *
+     */
     public Set<Integer> partsToReload(UUID nodeId, int grpId) {
         if (partsToReload == null)
             return Collections.emptySet();
@@ -339,6 +352,35 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     }
 
     /**
+     * Adds partition sizes map for specified {@code grpId} to the current 
message.
+     *
+     * @param grpId Group id.
+     * @param partSizesMap Partition sizes map.
+     */
+    public void addPartitionSizes(int grpId, Map<Integer, Long> partSizesMap) {
+        if (partSizesMap.isEmpty())
+            return;
+
+        if (partsSizes == null)
+            partsSizes = new HashMap<>();
+
+        partsSizes.put(grpId, partSizesMap);
+    }
+
+    /**
+     * Returns partition sizes map for specified {@code grpId}.
+     *
+     * @param grpId Group id.
+     * @return Partition sizes map (partId, partSize).
+     */
+    public Map<Integer, Long> partitionSizes(int grpId) {
+        if (partsSizes == null)
+            return Collections.emptyMap();
+
+        return partsSizes.getOrDefault(grpId, Collections.emptyMap());
+    }
+
+    /**
      * @return Errors map.
      */
     @Nullable Map<UUID, Exception> getErrorsMap() {
@@ -369,6 +411,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             byte[] partCntrsBytes20 = null;
             byte[] partHistSuppliersBytes0 = null;
             byte[] partsToReloadBytes0 = null;
+            byte[] partsSizesBytes0 = null;
             byte[] errsBytes0 = null;
 
             if (!F.isEmpty(parts) && partsBytes == null)
@@ -386,6 +429,9 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             if (partsToReload != null && partsToReloadBytes == null)
                 partsToReloadBytes0 = U.marshal(ctx, partsToReload);
 
+            if (partsSizes != null && partsSizesBytes == null)
+                partsSizesBytes0 = U.marshal(ctx, partsSizes);
+
             if (!F.isEmpty(errs) && errsBytes == null)
                 errsBytes0 = U.marshal(ctx, errs);
 
@@ -398,6 +444,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                     byte[] partCntrsBytes2Zip = U.zip(partCntrsBytes20);
                     byte[] partHistSuppliersBytesZip = 
U.zip(partHistSuppliersBytes0);
                     byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0);
+                    byte[] partsSizesBytesZip = U.zip(partsSizesBytes0);
                     byte[] exsBytesZip = U.zip(errsBytes0);
 
                     partsBytes0 = partsBytesZip;
@@ -405,6 +452,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                     partCntrsBytes20 = partCntrsBytes2Zip;
                     partHistSuppliersBytes0 = partHistSuppliersBytesZip;
                     partsToReloadBytes0 = partsToReloadBytesZip;
+                    partsSizesBytes0 = partsSizesBytesZip;
                     errsBytes0 = exsBytesZip;
 
                     compressed(true);
@@ -419,6 +467,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             partCntrsBytes2 = partCntrsBytes20;
             partHistSuppliersBytes = partHistSuppliersBytes0;
             partsToReloadBytes = partsToReloadBytes0;
+            partsSizesBytes = partsSizesBytes0;
             errsBytes = errsBytes0;
         }
     }
@@ -506,6 +555,13 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                 partsToReload = U.unmarshal(ctx, partsToReloadBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
         }
 
+        if (partsSizesBytes != null && partsSizes == null) {
+            if (compressed())
+                partsSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
+            else
+                partsSizes = U.unmarshal(ctx, partsSizesBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
+
         if (partCntrs == null)
             partCntrs = new IgniteDhtPartitionCountersMap();
 
@@ -584,18 +640,24 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeByteArray("partsToReloadBytes", 
partsToReloadBytes))
+                if (!writer.writeByteArray("partsSizesBytes", partsSizesBytes))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeMessage("resTopVer", resTopVer))
+                if (!writer.writeByteArray("partsToReloadBytes", 
partsToReloadBytes))
                     return false;
 
                 writer.incrementState();
 
             case 15:
+                if (!writer.writeMessage("resTopVer", resTopVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 16:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -682,7 +744,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 13:
-                partsToReloadBytes = 
reader.readByteArray("partsToReloadBytes");
+                partsSizesBytes = reader.readByteArray("partsSizesBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -690,7 +752,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 14:
-                resTopVer = reader.readMessage("resTopVer");
+                partsToReloadBytes = 
reader.readByteArray("partsToReloadBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -698,6 +760,14 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 15:
+                resTopVer = reader.readMessage("resTopVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 16:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -717,7 +787,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 16;
+        return 17;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index b60070e..804cc03 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -70,7 +70,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
     /** Partitions sizes. */
     @GridToStringInclude
     @GridDirectTransient
-    private Map<Integer, Map<Integer, Long>> partSizes;
+    private Map<Integer, Map<Integer, Long>> partsSizes;
 
     /** Serialized partitions counters. */
     private byte[] partsSizesBytes;
@@ -237,10 +237,10 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
         if (partSizesMap.isEmpty())
             return;
 
-        if (partSizes == null)
-            partSizes = new HashMap<>();
+        if (partsSizes == null)
+            partsSizes = new HashMap<>();
 
-        partSizes.put(grpId, partSizesMap);
+        partsSizes.put(grpId, partSizesMap);
     }
 
     /**
@@ -250,10 +250,10 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
      * @return Partition sizes map (partId, partSize).
      */
     public Map<Integer, Long> partitionSizes(int grpId) {
-        if (partSizes == null)
+        if (partsSizes == null)
             return Collections.emptyMap();
 
-        return partSizes.getOrDefault(grpId, Collections.emptyMap());
+        return partsSizes.getOrDefault(grpId, Collections.emptyMap());
     }
 
     /**
@@ -324,7 +324,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
         boolean marshal = (parts != null && partsBytes == null) ||
             (partCntrs != null && partCntrsBytes == null) ||
             (partHistCntrs != null && partHistCntrsBytes == null) ||
-            (partSizes != null && partsSizesBytes == null) ||
+            (partsSizes != null && partsSizesBytes == null) ||
             (err != null && errBytes == null);
 
         if (marshal) {
@@ -343,8 +343,8 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
             if (partHistCntrs != null && partHistCntrsBytes == null)
                 partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs);
 
-            if (partSizes != null && partsSizesBytes == null)
-                partSizesBytes0 = U.marshal(ctx, partSizes);
+            if (partsSizes != null && partsSizesBytes == null)
+                partSizesBytes0 = U.marshal(ctx, partsSizes);
 
             if (err != null && errBytes == null)
                 errBytes0 = U.marshal(ctx, err);
@@ -405,11 +405,11 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
         }
 
-        if (partsSizesBytes != null && partSizes == null) {
+        if (partsSizesBytes != null && partsSizes == null) {
             if (compressed())
-                partSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
+                partsSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
             else
-                partSizes = U.unmarshal(ctx, partsSizesBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
+                partsSizes = U.unmarshal(ctx, partsSizesBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
         }
 
         if (errBytes != null && err == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index 818fd67..d930c6b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -1504,6 +1504,8 @@ public class PlatformCache extends PlatformAbstractTarget 
{
         writer.writeLong(metrics.getRebalancingStartTime());
         writer.writeLong(metrics.getRebalanceClearingPartitionsLeft());
         writer.writeLong(metrics.getCacheSize());
+        writer.writeLong(metrics.getRebalancedKeys());
+        writer.writeLong(metrics.getEstimatedRebalancingKeys());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
index 59f16b2..854bbd7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
@@ -175,6 +175,12 @@ public class VisorCacheMetrics extends 
VisorDataTransferObject {
     /** Total number of partitions on current node. */
     private int totalPartsCnt;
 
+    /** Number of already rebalanced keys. */
+    private long rebalancedKeys;
+
+    /** Number estimated to rebalance keys. */
+    private long estimatedRebalancingKeys;
+
     /** Number of currently rebalancing partitions on current node. */
     private int rebalancingPartsCnt;
 
@@ -276,6 +282,8 @@ public class VisorCacheMetrics extends 
VisorDataTransferObject {
         offHeapPrimaryEntriesCnt = m.getOffHeapPrimaryEntriesCount();
 
         totalPartsCnt = m.getTotalPartitionsCount();
+        rebalancedKeys = m.getRebalancedKeys();
+        estimatedRebalancingKeys = m.getEstimatedRebalancingKeys();
         rebalancingPartsCnt = m.getRebalancingPartitionsCount();
         keysToRebalanceLeft = m.getKeysToRebalanceLeft();
         rebalancingKeysRate = m.getRebalancingKeysRate();
@@ -623,6 +631,20 @@ public class VisorCacheMetrics extends 
VisorDataTransferObject {
     }
 
     /**
+     * @return Number of already rebalanced keys.
+     */
+    public long getRebalancedKeys() {
+        return rebalancedKeys;
+    }
+
+    /**
+     * @return Number estimated to rebalance keys.
+     */
+    public long getEstimatedRebalancingKeys() {
+        return estimatedRebalancingKeys;
+    }
+
+    /**
      * @return Number of currently rebalancing partitions on current node.
      */
     public int getRebalancingPartitionsCount() {
@@ -651,6 +673,11 @@ public class VisorCacheMetrics extends 
VisorDataTransferObject {
     }
 
     /** {@inheritDoc} */
+    @Override public byte getProtocolVersion() {
+        return V2;
+    }
+
+    /** {@inheritDoc} */
     @Override protected void writeExternalData(ObjectOutput out) throws 
IOException {
         U.writeString(out, name);
         U.writeEnum(out, mode);
@@ -708,6 +735,9 @@ public class VisorCacheMetrics extends 
VisorDataTransferObject {
         out.writeObject(qryMetrics);
 
         out.writeLong(cacheSize);
+
+        out.writeLong(rebalancedKeys);
+        out.writeLong(estimatedRebalancingKeys);
     }
 
     /** {@inheritDoc} */
@@ -768,6 +798,11 @@ public class VisorCacheMetrics extends 
VisorDataTransferObject {
 
         if (in.available() > 0)
             cacheSize = in.readLong();
+
+        if (protoVer > V1) {
+            rebalancedKeys = in.readLong();
+            estimatedRebalancingKeys = in.readLong();
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
index fda23a2..14b9281 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
@@ -183,8 +183,9 @@ public class VisorNodeDataCollectorJob extends 
VisorJob<VisorNodeDataCollectorTa
 
             List<VisorCache> resCaches = res.getCaches();
 
+            int partitions = 0;
             double total = 0;
-            double moving = 0;
+            double ready = 0;
 
             for (String cacheName : cacheProc.cacheNames()) {
                 if (proxyCache(cacheName))
@@ -201,8 +202,16 @@ public class VisorNodeDataCollectorJob extends 
VisorJob<VisorNodeDataCollectorTa
 
                         CacheMetrics cm = ca.localMetrics();
 
-                        total += cm.getTotalPartitionsCount();
-                        moving += cm.getRebalancingPartitionsCount();
+                        partitions += cm.getTotalPartitionsCount();
+
+                        long partTotal = cm.getEstimatedRebalancingKeys();
+                        long partReady = cm.getRebalancedKeys();
+
+                        if (partReady >= partTotal)
+                            partReady = Math.max(partTotal - 1, 0);
+
+                        total += partTotal;
+                        ready += partReady;
 
                         resCaches.add(new VisorCache(ignite, ca, 
arg.isCollectCacheMetrics()));
                     }
@@ -217,7 +226,10 @@ public class VisorNodeDataCollectorJob extends 
VisorJob<VisorNodeDataCollectorTa
                 }
             }
 
-            res.setRebalance(total > 0 ? (total - moving) / total : -1);
+            if (partitions == 0)
+                res.setRebalance(-1);
+            else
+                res.setRebalance(total > 0 ? ready / total : 1);
         }
         catch (Exception e) {
             res.setCachesEx(new VisorExceptionWrapper(e));

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
index a055909..af2dc63 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -159,12 +160,12 @@ public class CacheGroupsMetricsRebalanceTest extends 
GridCommonAbstractTest {
         assertTrue(rate1 > 0);
         assertTrue(rate2 > 0);
 
-        // rate1 has to be roughly twice more than rate2.
-        double ratio = ((double)rate2 / rate1) * 100;
+        // rate1 has to be roughly the same as rate2
+        double ratio = ((double)rate2 / rate1);
 
         log.info("Ratio: " + ratio);
 
-        assertTrue(ratio > 40 && ratio < 60);
+        assertTrue(ratio > 0.9 && ratio < 1.1);
     }
 
     /**
@@ -225,29 +226,27 @@ public class CacheGroupsMetricsRebalanceTest extends 
GridCommonAbstractTest {
 
                 log.info("Wait until keys left will be less than: " + 
keysLine);
 
-                try {
-                    while (finishRebalanceLatch.getCount() != 0) {
-                        CacheMetrics m = ig2.cache(CACHE1).localMetrics();
+                while (true) {
+                    CacheMetrics m = ig2.cache(CACHE1).localMetrics();
 
-                        long keyLeft = m.getKeysToRebalanceLeft();
+                    long keyLeft = m.getKeysToRebalanceLeft();
 
-                        if (keyLeft > 0 && keyLeft < keysLine)
-                            latch.countDown();
+                    if (keyLeft > 0 && keyLeft < keysLine) {
+                        latch.countDown();
 
-                        log.info("Keys left: " + m.getKeysToRebalanceLeft());
+                        break;
+                    }
 
-                        try {
-                            Thread.sleep(1_000);
-                        }
-                        catch (InterruptedException e) {
-                            log.warning("Interrupt thread", e);
+                    log.info("Keys left: " + m.getKeysToRebalanceLeft());
 
-                            Thread.currentThread().interrupt();
-                        }
+                    try {
+                        Thread.sleep(1_000);
+                    }
+                    catch (InterruptedException e) {
+                        log.warning("Interrupt thread", e);
+
+                        Thread.currentThread().interrupt();
                     }
-                }
-                finally {
-                    latch.countDown();
                 }
             }
         });
@@ -270,8 +269,15 @@ public class CacheGroupsMetricsRebalanceTest extends 
GridCommonAbstractTest {
         long timePassed = currTime - startTime;
         long timeLeft = finishTime - currTime;
 
-        assertTrue("Got timeout while waiting for rebalancing. Estimated left 
time: " + timeLeft,
-            finishRebalanceLatch.await(timeLeft + 2_000L, 
TimeUnit.MILLISECONDS));
+        // TODO: finishRebalanceLatch gets countdown much earlier because of 
ForceRebalanceExchangeTask triggered by cache with delay
+//        assertTrue("Got timeout while waiting for rebalancing. Estimated 
left time: " + timeLeft,
+//            finishRebalanceLatch.await(timeLeft + 10_000L, 
TimeUnit.MILLISECONDS));
+
+        waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return 
ig2.cache(CACHE1).localMetrics().getKeysToRebalanceLeft() == 0;
+            }
+        }, timeLeft + 10_000L);
 
         log.info("[timePassed=" + timePassed + ", timeLeft=" + timeLeft +
                 ", Time to rebalance=" + (finishTime - startTime) +

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
 
b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
index 44a15d5..fe61e35 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
@@ -473,5 +473,15 @@ public class PlatformCacheWriteMetricsTask extends 
ComputeTaskAdapter<Long, Obje
         @Override public long getCacheSize() {
             return 65;
         }
+
+        /** {@inheritDoc} */
+        @Override public long getRebalancedKeys() {
+            return 66;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getEstimatedRebalancingKeys() {
+            return 67;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs 
b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs
index d775c05..e0e7301 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheMetrics.cs
@@ -654,5 +654,23 @@ namespace Apache.Ignite.Core.Cache
         /// Number of clearing partitions for rebalance.
         /// </returns>
         long RebalanceClearingPartitionsLeft { get; }
+
+        /// <summary>
+        /// Gets number of already rebalanced keys.
+        /// need to be cleared before actual rebalance start.
+        /// </summary>
+        /// <returns>
+        /// Number of already rebalanced keys.
+        /// </returns>
+        long RebalancedKeys { get; }
+
+        /// <summary>
+        /// Gets number of estimated keys to rebalance.
+        /// need to be cleared before actual rebalance start.
+        /// </summary>
+        /// <returns>
+        /// Number of estimated keys to rebalance.
+        /// </returns>
+        long EstimatedRebalancingKeys { get; }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf09e769/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs
----------------------------------------------------------------------
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs
index 1fdc877..be6980d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs
@@ -247,6 +247,12 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** */
         private readonly long _rebalancingClearingPartitionsLeft;
 
+        /** */
+        private readonly long _rebalancedKeys;
+
+        /** */
+        private readonly long _estimatedRebalancedKeys;
+
         /// <summary>
         /// Initializes a new instance of the <see cref="CacheMetricsImpl"/> 
class.
         /// </summary>
@@ -327,6 +333,8 @@ namespace Apache.Ignite.Core.Impl.Cache
             _rebalancingStartTime = reader.ReadLong();
             _rebalancingClearingPartitionsLeft = reader.ReadLong();
             _cacheSize = reader.ReadLong();
+            _rebalancedKeys = reader.ReadLong();
+            _estimatedRebalancedKeys = reader.ReadLong();
         }
 
         /** <inheritDoc /> */
@@ -550,5 +558,11 @@ namespace Apache.Ignite.Core.Impl.Cache
 
         /** <inheritDoc /> */
         public long RebalanceClearingPartitionsLeft { get { return 
_rebalancingClearingPartitionsLeft; } }
+
+        /** <inheritDoc /> */
+        public long RebalancedKeys { get { return _rebalancedKeys; } }
+        
+        /** <inheritDoc /> */
+        public long EstimatedRebalancingKeys { get { return 
_estimatedRebalancedKeys; } }
     }
 }
\ No newline at end of file

Reply via email to