Repository: ignite
Updated Branches:
  refs/heads/master f6f731f57 -> 137dd06aa


IGNITE-7165 Re-balancing is cancelled if client node joins

Signed-off-by: Anton Vinogradov <a...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/137dd06a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/137dd06a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/137dd06a

Branch: refs/heads/master
Commit: 137dd06aaee9cc84104e6b4bf48306b050341e3a
Parents: f6f731f
Author: Maxim Muzafarov <maxmu...@gmail.com>
Authored: Wed Aug 1 18:39:54 2018 +0300
Committer: Anton Vinogradov <a...@apache.org>
Committed: Wed Aug 1 18:39:54 2018 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  68 ++++++---
 .../processors/cache/GridCachePreloader.java    |  21 ++-
 .../cache/GridCachePreloaderAdapter.java        |   6 +
 .../dht/preloader/GridDhtPartitionDemander.java |  55 ++++---
 .../dht/preloader/GridDhtPartitionSupplier.java |  26 ++--
 .../dht/preloader/GridDhtPreloader.java         |  60 +++++++-
 .../preloader/GridDhtPreloaderAssignments.java  |   6 +-
 .../ClusterBaselineNodesMetricsSelfTest.java    |   1 -
 .../cache/CacheValidatorMetricsTest.java        |   4 +-
 .../dht/GridCacheDhtPreloadSelfTest.java        |  68 +--------
 .../atomic/IgniteCacheAtomicProtocolTest.java   |   3 -
 .../GridCacheRebalancingAsyncSelfTest.java      |   7 +-
 .../GridCacheRebalancingCancelTest.java         | 106 +++++++++++++
 ...idCacheRebalancingPartitionCountersTest.java |   3 +-
 .../GridCacheRebalancingSyncSelfTest.java       | 149 +++++++------------
 ...lientAffinityAssignmentWithBaselineTest.java |   4 +-
 ...SlowHistoricalRebalanceSmallHistoryTest.java |   5 +-
 ...lFlushMultiNodeFailoverAbstractSelfTest.java |   2 +-
 .../GridMarshallerMappingConsistencyTest.java   |   3 +-
 .../junits/common/GridCommonAbstractTest.java   | 115 +++-----------
 .../testsuites/IgniteCacheTestSuite3.java       |   2 +
 21 files changed, 370 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 05eeee3..824aa67 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -66,7 +66,6 @@ import 
org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -88,6 +87,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Ign
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -179,6 +179,14 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> 
readyFuts =
         new ConcurrentSkipListMap<>();
 
+    /**
+     * Latest started rebalance topology version but possibly not finished 
yet. Value {@code NONE}
+     * means that previous rebalance is undefined and the new one should be 
initiated.
+     *
+     * Should not be used to determine latest rebalanced topology.
+     */
+    private volatile AffinityTopologyVersion rebTopVer = 
AffinityTopologyVersion.NONE;
+
     /** */
     private GridFutureAdapter<?> reconnectExchangeFut;
 
@@ -827,6 +835,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
+     * @return Latest rebalance topology version or {@code NONE} if there is 
no info.
+     */
+    public AffinityTopologyVersion rebalanceTopologyVersion() {
+        return rebTopVer;
+    }
+
+    /**
      * @return Last initialized topology future.
      */
     public GridDhtPartitionsExchangeFuture lastTopologyFuture() {
@@ -2558,6 +2573,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                 if (grp.isLocal())
                                     continue;
 
+                                if 
(grp.preloader().rebalanceRequired(rebTopVer, exchFut))
+                                    rebTopVer = AffinityTopologyVersion.NONE;
+
                                 changed |= 
grp.topology().afterExchange(exchFut);
                             }
 
@@ -2565,7 +2583,11 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                 refreshPartitions();
                         }
 
-                        if (!cctx.kernalContext().clientNode()) {
+                        // Schedule rebalance if force rebalance or force 
reassign occurs.
+                        if (exchFut == null)
+                            rebTopVer = AffinityTopologyVersion.NONE;
+
+                        if (!cctx.kernalContext().clientNode() && 
rebTopVer.equals(AffinityTopologyVersion.NONE)) {
                             assignsMap = new HashMap<>();
 
                             IgniteCacheSnapshotManager snp = cctx.snapshot();
@@ -2582,6 +2604,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                     assigns = 
grp.preloader().generateAssignments(exchId, exchFut);
 
                                 assignsMap.put(grp.groupId(), assigns);
+
+                                if (resVer == null)
+                                    resVer = 
grp.topology().readyTopologyVersion();
                             }
                         }
                     }
@@ -2590,7 +2615,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         busy = false;
                     }
 
-                    if (assignsMap != null) {
+                    if (assignsMap != null && 
rebTopVer.equals(AffinityTopologyVersion.NONE)) {
                         int size = assignsMap.size();
 
                         NavigableMap<Integer, List<Integer>> orderMap = new 
TreeMap<>();
@@ -2628,11 +2653,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                 if (assigns != null)
                                     assignsCancelled |= assigns.cancelled();
 
-                                // Cancels previous rebalance future (in case 
it's not done yet).
-                                // Sends previous rebalance stopped event (if 
necessary).
-                                // Creates new rebalance future.
-                                // Sends current rebalance started event (if 
necessary).
-                                // Finishes cache sync future (on empty 
assignments).
                                 Runnable cur = 
grp.preloader().addAssignments(assigns,
                                     forcePreload,
                                     cnt,
@@ -2650,7 +2670,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         if (forcedRebFut != null)
                             forcedRebFut.markInitialized();
 
-                        if (assignsCancelled) { // Pending exchange.
+                        if (assignsCancelled || hasPendingExchange()) {
                             U.log(log, "Skipping rebalancing (obsolete 
exchange ID) " +
                                 "[top=" + resVer + ", evt=" + 
exchId.discoveryEventName() +
                                 ", node=" + exchId.nodeId() + ']');
@@ -2658,25 +2678,31 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         else if (r != null) {
                             Collections.reverse(rebList);
 
-                            U.log(log, "Rebalancing scheduled [order=" + 
rebList + "]");
+                            U.log(log, "Rebalancing scheduled [order=" + 
rebList +
+                                ", top=" + resVer + ", force=" + (exchFut == 
null) +
+                                ", evt=" + exchId.discoveryEventName() +
+                                ", node=" + exchId.nodeId() + ']');
 
-                            if (!hasPendingExchange()) {
-                                U.log(log, "Rebalancing started " +
-                                    "[top=" + resVer + ", evt=" + 
exchId.discoveryEventName() +
-                                    ", node=" + exchId.nodeId() + ']');
+                            rebTopVer = resVer;
 
-                                r.run(); // Starts rebalancing routine.
-                            }
-                            else
-                                U.log(log, "Skipping rebalancing (obsolete 
exchange ID) " +
-                                    "[top=" + resVer + ", evt=" + 
exchId.discoveryEventName() +
-                                    ", node=" + exchId.nodeId() + ']');
+                            // Start rebalancing cache groups chain. Each 
group will be rebalanced
+                            // sequentially one by one e.g.:
+                            // ignite-sys-cache -> cacheGroupR1 -> 
cacheGroupP2 -> cacheGroupR3
+                            r.run();
                         }
                         else
                             U.log(log, "Skipping rebalancing (nothing 
scheduled) " +
-                                "[top=" + resVer + ", evt=" + 
exchId.discoveryEventName() +
+                                "[top=" + resVer + ", force=" + (exchFut == 
null) +
+                                ", evt=" + exchId.discoveryEventName() +
                                 ", node=" + exchId.nodeId() + ']');
                     }
+                    else
+                        U.log(log, "Skipping rebalancing (no affinity changes) 
" +
+                            "[top=" + resVer +
+                            ", rebTopVer=" + rebTopVer +
+                            ", evt=" + exchId.discoveryEventName() +
+                            ", evtNode=" + exchId.nodeId() +
+                            ", client=" + cctx.kernalContext().clientNode() + 
']');
                 }
                 catch (IgniteInterruptedCheckedException e) {
                     throw e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 5fa7a82..d629e94 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
@@ -63,9 +64,16 @@ public interface GridCachePreloader {
     public void onInitialExchangeComplete(@Nullable Throwable err);
 
     /**
+     * @param rebTopVer Previous rebalance topology version or {@code NONE} if 
there is no info.
+     * @param exchFut Completed exchange future.
+     * @return {@code True} if rebalance should be started (previous will be 
interrupted).
+     */
+    public boolean rebalanceRequired(AffinityTopologyVersion rebTopVer, 
GridDhtPartitionsExchangeFuture exchFut);
+
+    /**
      * @param exchId Exchange ID.
-     * @param exchFut Exchange future.
-     * @return Assignments or {@code null} if detected that there are pending 
exchanges.
+     * @param exchFut Completed exchange future. Can be {@code null} if forced 
or reassigned generation occurs.
+     * @return Partition assignments which will be requested from supplier 
nodes.
      */
     @Nullable public GridDhtPreloaderAssignments 
generateAssignments(GridDhtPartitionExchangeId exchId,
                                                                      @Nullable 
GridDhtPartitionsExchangeFuture exchFut);
@@ -74,10 +82,10 @@ public interface GridCachePreloader {
      * Adds assignments to preloader.
      *
      * @param assignments Assignments to add.
-     * @param forcePreload Force preload flag.
-     * @param rebalanceId Rebalance id.
-     * @param next Runnable responsible for cache rebalancing start.
-     * @param forcedRebFut Rebalance future.
+     * @param forcePreload {@code True} if preload requested by {@link 
ForceRebalanceExchangeTask}.
+     * @param rebalanceId Rebalance id created by exchange thread.
+     * @param next Runnable responsible for cache rebalancing chain.
+     * @param forcedRebFut External future for forced rebalance.
      * @return Rebalancing runnable.
      */
     public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
@@ -114,7 +122,6 @@ public interface GridCachePreloader {
      * Future result is {@code false} in case rebalancing cancelled or 
finished with missed partitions and will be
      * restarted at current or pending topology.
      *
-     * Note that topology change creates new futures and finishes previous.
      */
     public IgniteInternalFuture<Boolean> rebalanceFuture();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index af91679..c5e4a81 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -152,6 +152,12 @@ public class GridCachePreloaderAdapter implements 
GridCachePreloader {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean rebalanceRequired(AffinityTopologyVersion 
rebTopVer,
+        GridDhtPartitionsExchangeFuture exchFut) {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments 
generateAssignments(GridDhtPartitionExchangeId exchId,
                                                                      
GridDhtPartitionsExchangeFuture exchFut) {
         return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 1eeebae..54d3c93 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -235,12 +235,12 @@ public class GridDhtPartitionDemander {
 
     /**
      * @param fut Future.
-     * @return {@code True} if topology changed.
+     * @return {@code True} if rebalance topology version changed by exchange 
thread or force
+     * reassing exchange occurs, see {@link RebalanceReassignExchangeTask} for 
details.
      */
     private boolean topologyChanged(RebalanceFuture fut) {
-        return
-            !grp.affinity().lastVersion().equals(fut.topologyVersion()) || // 
Topology already changed.
-                fut != rebalanceFut; // Same topology, but dummy exchange 
forced because of missing partitions.
+        return !ctx.exchange().rebalanceTopologyVersion().equals(fut.topVer) ||
+            fut != rebalanceFut; // Same topology, but dummy exchange forced 
because of missing partitions.
     }
 
     /**
@@ -253,14 +253,21 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * Initiates new rebalance process from given {@code assignments}.
-     * If previous rebalance is not finished method cancels it.
-     * In case of delayed rebalance method schedules new with configured delay.
+     * @return Collection of supplier nodes. Value {@code empty} means 
rebalance already finished.
+     */
+    Collection<UUID> remainingNodes() {
+        return rebalanceFut.remainingNodes();
+    }
+
+    /**
+     * This method initiates new rebalance process from given {@code 
assignments} by creating new rebalance
+     * future based on them. Cancels previous rebalance future and sends 
rebalance started event.
+     * In case of delayed rebalance method schedules the new one with 
configured delay based on {@code lastExchangeFut}.
      *
-     * @param assignments Assignments.
-     * @param force {@code True} if dummy reassign.
-     * @param rebalanceId Rebalance id.
-     * @param next Runnable responsible for cache rebalancing start.
+     * @param assignments Assignments to process.
+     * @param force {@code True} if preload request by {@link 
ForceRebalanceExchangeTask}.
+     * @param rebalanceId Rebalance id generated from exchange thread.
+     * @param next Runnable responsible for cache rebalancing chain.
      * @param forcedRebFut External future for forced rebalance.
      * @return Rebalancing runnable.
      */
@@ -440,17 +447,7 @@ public class GridDhtPartitionDemander {
             if (fut.isDone())
                 return;
 
-            // Must add all remaining node before send first request, for 
avoid race between add remaining node and
-            // processing response, see checkIsDone(boolean).
-            for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : 
assignments.entrySet()) {
-                UUID nodeId = e.getKey().id();
-
-                IgniteDhtDemandedPartitionsMap parts = 
e.getValue().partitions();
-
-                assert parts != null : "Partitions are null [grp=" + 
grp.cacheOrGroupName() + ", fromNode=" + nodeId + "]";
-
-                fut.remaining.put(nodeId, new T2<>(U.currentTimeMillis(), 
parts));
-            }
+            fut.remaining.forEach((key, value) -> 
value.set1(U.currentTimeMillis()));
         }
 
         final CacheConfiguration cfg = grp.config();
@@ -979,6 +976,13 @@ public class GridDhtPartitionDemander {
             exchId = assignments.exchangeId();
             topVer = assignments.topologyVersion();
 
+            assignments.forEach((k, v) -> {
+                assert v.partitions() != null :
+                    "Partitions are null [grp=" + grp.cacheOrGroupName() + ", 
fromNode=" + k.id() + "]";
+
+                remaining.put(k.id(), new T2<>(U.currentTimeMillis(), 
v.partitions()));
+            });
+
             this.grp = grp;
             this.log = log;
             this.rebalanceId = rebalanceId;
@@ -1218,6 +1222,13 @@ public class GridDhtPartitionDemander {
         }
 
         /**
+         * @return Collection of supplier nodes. Value {@code empty} means 
rebalance already finished.
+         */
+        private synchronized Collection<UUID> remainingNodes() {
+            return remaining.keySet();
+        }
+
+        /**
          *
          */
         private void sendRebalanceStartedEvent() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 4946d7e..ea7f4c9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
@@ -118,19 +120,20 @@ class GridDhtPartitionSupplier {
     }
 
     /**
-     * Handles new topology version and clears supply context map of outdated 
contexts.
-     *
-     * @param topVer Topology version.
+     * Handle topology change and clear supply context map of outdated 
contexts.
      */
-    @SuppressWarnings("ConstantConditions")
-    void onTopologyChanged(AffinityTopologyVersion topVer) {
+    void onTopologyChanged() {
         synchronized (scMap) {
             Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = 
scMap.keySet().iterator();
 
+            Collection<UUID> aliveNodes = 
grp.shared().discovery().aliveServerNodes().stream()
+                .map(ClusterNode::id)
+                .collect(Collectors.toList());
+
             while (it.hasNext()) {
                 T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
 
-                if (topVer.compareTo(t.get3()) > 0) { // Clear all obsolete 
contexts.
+                if (!aliveNodes.contains(t.get1())) { // Clear all obsolete 
contexts.
                     clearContext(scMap.get(t), log);
 
                     it.remove();
@@ -171,17 +174,6 @@ class GridDhtPartitionSupplier {
         AffinityTopologyVersion curTop = grp.affinity().lastVersion();
         AffinityTopologyVersion demTop = d.topologyVersion();
 
-        if (curTop.compareTo(demTop) > 0) {
-            if (log.isDebugEnabled())
-                log.debug("Demand request outdated [grp=" + 
grp.cacheOrGroupName()
-                        + ", currentTopVer=" + curTop
-                        + ", demandTopVer=" + demTop
-                        + ", from=" + nodeId
-                        + ", topicId=" + topicId + "]");
-
-            return;
-        }
-
         T3<UUID, Integer, AffinityTopologyVersion> contextId = new 
T3<>(nodeId, topicId, demTop);
 
         if (d.rebalanceId() < 0) { // Demand node requested context cleanup.

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 77f4866..7cf55a3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -39,7 +40,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
-import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -160,12 +160,68 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture 
lastFut) {
-        supplier.onTopologyChanged(lastFut.initialVersion());
+        supplier.onTopologyChanged();
 
         demander.onTopologyChanged(lastFut);
     }
 
     /** {@inheritDoc} */
+    @Override public boolean rebalanceRequired(AffinityTopologyVersion 
rebTopVer,
+        GridDhtPartitionsExchangeFuture exchFut) {
+        if (ctx.kernalContext().clientNode() || 
rebTopVer.equals(AffinityTopologyVersion.NONE))
+            return false; // No-op.
+
+        if (exchFut.localJoinExchange())
+            return true; // Required, can have outdated updSeq partition 
counter if node reconnects.
+
+        if (!grp.affinity().cachedVersions().contains(rebTopVer)) {
+            assert rebTopVer.compareTo(grp.localStartVersion()) <= 0 :
+                "Empty hisroty allowed only for newly started cache group 
[rebTopVer=" + rebTopVer +
+                    ", localStartTopVer=" + grp.localStartVersion() + ']';
+
+            return true; // Required, since no history info available.
+        }
+
+        final IgniteInternalFuture<Boolean> rebFut = rebalanceFuture();
+
+        if (rebFut.isDone() && !rebFut.result())
+            return true; // Required, previous rebalance cancelled.
+
+        final AffinityTopologyVersion exchTopVer = 
exchFut.context().events().topologyVersion();
+
+        Collection<UUID> aliveNodes = 
ctx.discovery().aliveServerNodes().stream()
+            .map(ClusterNode::id)
+            .collect(Collectors.toList());
+
+        return assignmentsChanged(rebTopVer, exchTopVer) ||
+            !aliveNodes.containsAll(demander.remainingNodes()); // Some of 
nodes left before rabalance compelete.
+    }
+
+    /**
+     * @param oldTopVer Previous topology version.
+     * @param newTopVer New topology version to check result.
+     * @return {@code True} if affinity assignments changed between two 
versions for local node.
+     */
+    private boolean assignmentsChanged(AffinityTopologyVersion oldTopVer, 
AffinityTopologyVersion newTopVer) {
+        final AffinityAssignment aff = grp.affinity().readyAffinity(newTopVer);
+
+        // We should get affinity assignments based on previous rebalance to 
calculate difference.
+        // Whole history size described by IGNITE_AFFINITY_HISTORY_SIZE 
constant.
+        final AffinityAssignment prevAff = 
grp.affinity().cachedVersions().contains(oldTopVer) ?
+            grp.affinity().cachedAffinity(oldTopVer) : null;
+
+        if (prevAff == null)
+            return false;
+
+        boolean assignsChanged = false;
+
+        for (int p = 0; !assignsChanged && p < grp.affinity().partitions(); 
p++)
+            assignsChanged |= aff.get(p).contains(ctx.localNode()) != 
prevAff.get(p).contains(ctx.localNode());
+
+        return assignsChanged;
+    }
+
+    /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments 
generateAssignments(GridDhtPartitionExchangeId exchId, 
GridDhtPartitionsExchangeFuture exchFut) {
         assert exchFut == null || exchFut.isDone();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 41dd076..6e847bb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -20,7 +20,7 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
@@ -73,9 +73,9 @@ public class GridDhtPreloaderAssignments extends 
ConcurrentHashMap<ClusterNode,
     }
 
     /**
-     * @return Topology version.
+     * @return Topology version based on last {@link 
GridDhtPartitionTopologyImpl#readyTopVer}.
      */
-    AffinityTopologyVersion topologyVersion() {
+    public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/ClusterBaselineNodesMetricsSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/ClusterBaselineNodesMetricsSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/ClusterBaselineNodesMetricsSelfTest.java
index 5653177..46b09ac 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/ClusterBaselineNodesMetricsSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/ClusterBaselineNodesMetricsSelfTest.java
@@ -149,7 +149,6 @@ public class ClusterBaselineNodesMetricsSelfTest extends 
GridCommonAbstractTest
     private void resetBlt() throws Exception {
         resetBaselineTopology();
 
-        waitForRebalancing();
         awaitPartitionMapExchange();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheValidatorMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheValidatorMetricsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheValidatorMetricsTest.java
index ba3ad5a..4a950dd 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheValidatorMetricsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheValidatorMetricsTest.java
@@ -98,14 +98,14 @@ public class CacheValidatorMetricsTest extends 
GridCommonAbstractTest implements
 
         startGrid(2);
 
-        waitForRebalancing();
+        awaitPartitionMapExchange();
 
         assertCacheStatus(CACHE_NAME_1, true, true);
         assertCacheStatus(CACHE_NAME_2, true, true);
 
         stopGrid(1);
 
-        waitForRebalancing();
+        awaitPartitionMapExchange();
 
         // Invalid for writing due to invalid topology.
         assertCacheStatus(CACHE_NAME_1, true, false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java
index 83eff89..23ba4b3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java
@@ -22,29 +22,23 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.events.Event;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -58,9 +52,6 @@ import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static 
org.apache.ignite.configuration.CacheConfiguration.DFLT_REBALANCE_BATCH_SIZE;
 import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
 import static org.apache.ignite.events.EventType.EVTS_CACHE_REBALANCE;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
@@ -142,15 +133,6 @@ public class GridCacheDhtPreloadSelfTest extends 
GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-//        resetLog4j(Level.DEBUG, true,
-//            // Categories.
-//            GridDhtPreloader.class.getPackage().getName(),
-//            GridDhtPartitionTopologyImpl.class.getName(),
-//            GridDhtLocalPartition.class.getName());
-    }
-
-    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         backups = DFLT_BACKUPS;
         partitions = DFLT_PARTITIONS;
@@ -227,11 +209,6 @@ public class GridCacheDhtPreloadSelfTest extends 
GridCommonAbstractTest {
      */
     private void checkActivePartitionTransfer(int keyCnt, int nodeCnt, boolean 
sameCoord, boolean shuffle)
         throws Exception {
-//        resetLog4j(Level.DEBUG, true,
-//            // Categories.
-//            GridDhtPreloader.class.getPackage().getName(),
-//            GridDhtPartitionTopologyImpl.class.getName(),
-//            GridDhtLocalPartition.class.getName());
 
         try {
             Ignite ignite1 = startGrid(0);
@@ -270,8 +247,6 @@ public class GridCacheDhtPreloadSelfTest extends 
GridCommonAbstractTest {
             info(">>> Finished checking nodes [keyCnt=" + keyCnt + ", 
nodeCnt=" + nodeCnt + ", grids=" +
                 U.grids2names(ignites) + ']');
 
-            Collection<IgniteFuture<?>> futs = new LinkedList<>();
-
             Ignite last = F.last(ignites);
 
             for (Iterator<Ignite> it = ignites.iterator(); it.hasNext(); ) {
@@ -285,21 +260,8 @@ public class GridCacheDhtPreloadSelfTest extends 
GridCommonAbstractTest {
 
                 checkActiveState(ignites);
 
-                final UUID nodeId = g.cluster().localNode().id();
-
                 it.remove();
 
-                futs.add(waitForLocalEvent(last.events(), new P1<Event>() {
-                    @Override public boolean apply(Event e) {
-                        CacheRebalancingEvent evt = (CacheRebalancingEvent)e;
-
-                        ClusterNode node = evt.discoveryNode();
-
-                        return evt.type() == EVT_CACHE_REBALANCE_STOPPED && 
node.id().equals(nodeId) &&
-                            (evt.discoveryEventType() == EVT_NODE_LEFT || 
evt.discoveryEventType() == EVT_NODE_FAILED);
-                    }
-                }, EVT_CACHE_REBALANCE_STOPPED));
-
                 info("Before grid stop [name=" + g.name() + ", fullTop=" + 
top2string(ignites));
 
                 stopGrid(g.name());
@@ -312,14 +274,6 @@ public class GridCacheDhtPreloadSelfTest extends 
GridCommonAbstractTest {
                 awaitPartitionMapExchange(); // Need wait, otherwise test 
logic is broken if EVT_NODE_FAILED exchanges are merged.
             }
 
-            info("Waiting for preload futures: " + F.view(futs, new 
IgnitePredicate<IgniteFuture<?>>() {
-                @Override public boolean apply(IgniteFuture<?> fut) {
-                    return !fut.isDone();
-                }
-            }));
-
-            X.waitAll(futs);
-
             info("Finished waiting for preload futures.");
 
             assert last != null;
@@ -499,11 +453,6 @@ public class GridCacheDhtPreloadSelfTest extends 
GridCommonAbstractTest {
      */
     private void checkNodes(int keyCnt, int nodeCnt, boolean sameCoord, 
boolean shuffle)
         throws Exception {
-//        resetLog4j(Level.DEBUG, true,
-//            // Categories.
-//            GridDhtPreloader.class.getPackage().getName(),
-//            GridDhtPartitionTopologyImpl.class.getName(),
-//            GridDhtLocalPartition.class.getName());
 
         try {
             Ignite ignite1 = startGrid(0);
@@ -555,28 +504,13 @@ public class GridCacheDhtPreloadSelfTest extends 
GridCommonAbstractTest {
 
                 it.remove();
 
-                Collection<IgniteFuture<?>> futs = new LinkedList<>();
-
-                for (Ignite gg : ignites)
-                    futs.add(waitForLocalEvent(gg.events(), new P1<Event>() {
-                        @Override public boolean apply(Event e) {
-                            CacheRebalancingEvent evt = 
(CacheRebalancingEvent)e;
-
-                            ClusterNode node = evt.discoveryNode();
-
-                            return evt.type() == EVT_CACHE_REBALANCE_STOPPED 
&& node.id().equals(nodeId) &&
-                                (evt.discoveryEventType() == EVT_NODE_LEFT ||
-                                    evt.discoveryEventType() == 
EVT_NODE_FAILED);
-                        }
-                    }, EVT_CACHE_REBALANCE_STOPPED));
-
                 info("Before grid stop [name=" + g.name() + ", fullTop=" + 
top2string(ignites));
 
                 stopGrid(g.name());
 
                 info(">>> Waiting for preload futures [leftNode=" + g.name() + 
", remaining=" + U.grids2names(ignites) + ']');
 
-                X.waitAll(futs);
+                awaitPartitionMapExchange();
 
                 info("After grid stop [name=" + g.name() + ", fullTop=" + 
top2string(ignites));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 20f292b..14c8571 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -807,9 +807,6 @@ public class IgniteCacheAtomicProtocolTest extends 
GridCommonAbstractTest {
         startServers(2);
 
         // Waiting for minor topology changing because of late affinity 
assignment.
-        waitForRebalancing(0, 2, 1);
-        waitForRebalancing(1, 2, 1);
-
         awaitPartitionMapExchange();
 
         Ignite srv0 = ignite(0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
index 4ebcd5d..0a8698a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
@@ -17,10 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
 
-import org.apache.ignite.Ignite;
+import java.util.Collections;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
@@ -43,7 +44,7 @@ public class GridCacheRebalancingAsyncSelfTest extends 
GridCacheRebalancingSyncS
      * @throws Exception Exception.
      */
     public void testNodeFailedAtRebalancing() throws Exception {
-        Ignite ignite = startGrid(0);
+        IgniteEx ignite = startGrid(0);
 
         generateData(ignite, 0, 0);
 
@@ -60,7 +61,7 @@ public class GridCacheRebalancingAsyncSelfTest extends 
GridCacheRebalancingSyncS
 
         
((TestTcpDiscoverySpi)grid(1).configuration().getDiscoverySpi()).simulateNodeFailure();
 
-        waitForRebalancing(0, 3);
+        awaitPartitionMapExchange(false, false, 
Collections.singletonList(ignite.localNode()));
 
         checkSupplyContextMapIsEmpty();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingCancelTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingCancelTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingCancelTest.java
new file mode 100644
index 0000000..3965290
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingCancelTest.java
@@ -0,0 +1,106 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test cases for checking cancellation rebalancing process if some events 
occurs.
+ */
+public class GridCacheRebalancingCancelTest extends GridCommonAbstractTest {
+    /** */
+    private static final String DHT_PARTITIONED_CACHE = "cacheP";
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration dfltCfg = 
super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)dfltCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        dfltCfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        return dfltCfg;
+    }
+
+    /**
+     * Test rebalance not cancelled when client node join to cluster.
+     *
+     * @throws Exception Exception.
+     */
+    public void testClientNodeJoinAtRebalancing() throws Exception {
+        final IgniteEx ignite0 = startGrid(0);
+
+        IgniteCache<Integer, Integer> cache = ignite0.createCache(
+            new CacheConfiguration<Integer, Integer>(DHT_PARTITIONED_CACHE)
+                .setCacheMode(CacheMode.PARTITIONED)
+                .setRebalanceMode(CacheRebalanceMode.ASYNC)
+                .setBackups(1)
+                .setRebalanceOrder(2)
+                .setAffinity(new RendezvousAffinityFunction(false)));
+
+        for (int i = 0; i < 2048; i++)
+            cache.put(i, i);
+
+        TestRecordingCommunicationSpi.spi(ignite0)
+            .blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    return (msg instanceof GridDhtPartitionSupplyMessage)
+                        && ((GridCacheGroupIdMessage)msg).groupId() == 
groupIdForCache(ignite0, DHT_PARTITIONED_CACHE);
+                }
+            });
+
+        final IgniteEx ignite1 = startGrid(1);
+
+        TestRecordingCommunicationSpi.spi(ignite0).waitForBlocked();
+
+        GridDhtPartitionDemander.RebalanceFuture fut = 
(GridDhtPartitionDemander.RebalanceFuture)ignite1.context().
+            
cache().internalCache(DHT_PARTITIONED_CACHE).preloader().rebalanceFuture();
+
+        String igniteClntName = getTestIgniteInstanceName(2);
+
+        startGrid(igniteClntName, 
optimize(getConfiguration(igniteClntName).setClientMode(true)));
+
+        // Resend delayed rebalance messages.
+        TestRecordingCommunicationSpi.spi(ignite0).stopBlock(true);
+
+        awaitPartitionMapExchange();
+
+        // Previous rebalance future should not be cancelled.
+        assertTrue(fut.result());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java
index 1280e87..cb414ed 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java
@@ -141,7 +141,8 @@ public class GridCacheRebalancingPartitionCountersTest 
extends GridCommonAbstrac
         assertTrue(primaryRemoved);
 
         ignite.cluster().active(true);
-        waitForRebalancing();
+
+        awaitPartitionMapExchange();
 
         List<String> issues = new ArrayList<>();
         HashMap<Integer, Long> partMap = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index ed51cf3..a027a41 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -21,10 +21,9 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cluster.ClusterNode;
@@ -42,11 +41,13 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -68,6 +69,9 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
     /** */
     private static final int TEST_SIZE = 100_000;
 
+    /** */
+    private static final long TOPOLOGY_STILLNESS_TIME = 30_000L;
+
     /** partitioned cache name. */
     protected static final String CACHE_NAME_DHT_PARTITIONED = "cacheP";
 
@@ -89,11 +93,11 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
     /** */
     private volatile boolean concurrentStartFinished3;
 
-    /** */
-    private volatile boolean record;
-
-    /** */
-    private final ConcurrentHashMap<Class, AtomicInteger> map = new 
ConcurrentHashMap<>();
+    /**
+     * Time in milliseconds of last received {@link 
GridDhtPartitionsSingleMessage}
+     * or {@link GridDhtPartitionsFullMessage} using {@link 
CollectingCommunicationSpi}.
+     */
+    private static volatile long lastPartMsgTime;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
@@ -102,7 +106,7 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
 
-        TcpCommunicationSpi commSpi = new CountingCommunicationSpi();
+        TcpCommunicationSpi commSpi = new CollectingCommunicationSpi();
 
         commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
         commSpi.setTcpNoDelay(true);
@@ -232,47 +236,35 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
 
         startGrid(1);
 
-        int waitMinorVer = ignite.configuration().isLateAffinityAssignment() ? 
1 : 0;
-
-        waitForRebalancing(0, 2, waitMinorVer);
-        waitForRebalancing(1, 2, waitMinorVer);
-
         awaitPartitionMapExchange(true, true, null, true);
 
         checkPartitionMapExchangeFinished();
 
-        checkPartitionMapMessagesAbsent();
+        awaitPartitionMessagesAbsent();
 
         stopGrid(0);
 
-        waitForRebalancing(1, 3);
-
         awaitPartitionMapExchange(true, true, null, true);
 
         checkPartitionMapExchangeFinished();
 
-        checkPartitionMapMessagesAbsent();
+        awaitPartitionMessagesAbsent();
 
         startGrid(2);
 
-        waitForRebalancing(1, 4, waitMinorVer);
-        waitForRebalancing(2, 4, waitMinorVer);
-
         awaitPartitionMapExchange(true, true, null, true);
 
         checkPartitionMapExchangeFinished();
 
-        checkPartitionMapMessagesAbsent();
+        awaitPartitionMessagesAbsent();
 
         stopGrid(2);
 
-        waitForRebalancing(1, 5);
-
         awaitPartitionMapExchange(true, true, null, true);
 
         checkPartitionMapExchangeFinished();
 
-        checkPartitionMapMessagesAbsent();
+        awaitPartitionMessagesAbsent();
 
         long spend = (System.currentTimeMillis() - start) / 1000;
 
@@ -331,13 +323,10 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
 
         startGrid(4);
 
-        waitForRebalancing(3, 6);
-        waitForRebalancing(4, 6);
+        awaitPartitionMapExchange(true, true, null);
 
         concurrentStartFinished = true;
 
-        awaitPartitionMapExchange(true, true, null);
-
         checkSupplyContextMapIsEmpty();
 
         t1.join();
@@ -442,27 +431,29 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Method checks for {@link GridDhtPartitionsSingleMessage} or {@link 
GridDhtPartitionsFullMessage}
+     * not received within {@code TOPOLOGY_STILLNESS_TIME} bound.
+     *
      * @throws Exception If failed.
      */
-    protected void checkPartitionMapMessagesAbsent() throws Exception {
-        map.clear();
-
-        record = true;
-
-        log.info("Checking GridDhtPartitions*Message absent (it will take 30 
SECONDS) ... ");
-
-        U.sleep(30_000);
-
-        record = false;
-
-        AtomicInteger iF = map.get(GridDhtPartitionsFullMessage.class);
-        AtomicInteger iS = map.get(GridDhtPartitionsSingleMessage.class);
-
-        Integer fullMap = iF != null ? iF.get() : null;
-        Integer singleMap = iS != null ? iS.get() : null;
-
-        assertTrue("Unexpected full map messages: " + fullMap, fullMap == null 
|| fullMap.equals(1)); // 1 message can be sent right after all checks passed.
-        assertNull("Unexpected single map messages", singleMap);
+    protected void awaitPartitionMessagesAbsent() throws Exception {
+        log.info("Checking GridDhtPartitions*Message absent (it will take up 
to " +
+            TOPOLOGY_STILLNESS_TIME + " ms) ... ");
+
+        // Start waiting new messages from current point of time.
+        lastPartMsgTime = U.currentTimeMillis();
+
+        assertTrue("Should not have partition Single or Full messages within 
bound " +
+                TOPOLOGY_STILLNESS_TIME + " ms.",
+            GridTestUtils.waitForCondition(
+                new GridAbsPredicateX() {
+                    @Override public boolean applyx() {
+                        return lastPartMsgTime + TOPOLOGY_STILLNESS_TIME < 
U.currentTimeMillis();
+                    }
+                },
+                2 * TOPOLOGY_STILLNESS_TIME // 30 sec to gain stable topology 
and 30 sec of silence.
+            )
+        );
     }
 
     /** {@inheritDoc} */
@@ -495,11 +486,7 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
                     while (!concurrentStartFinished2)
                         U.sleep(10);
 
-                    waitForRebalancing(0, 5, 0);
-                    waitForRebalancing(1, 5, 0);
-                    waitForRebalancing(2, 5, 0);
-                    waitForRebalancing(3, 5, 0);
-                    waitForRebalancing(4, 5, 0);
+                    awaitPartitionMapExchange();
 
                     //New cache should start rebalancing.
                     CacheConfiguration<Integer, Integer> cacheRCfg = new 
CacheConfiguration<>(DEFAULT_CACHE_NAME);
@@ -552,12 +539,6 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
         t2.join();
         t3.join();
 
-        waitForRebalancing(0, 5, 1);
-        waitForRebalancing(1, 5, 1);
-        waitForRebalancing(2, 5, 1);
-        waitForRebalancing(3, 5, 1);
-        waitForRebalancing(4, 5, 1);
-
         awaitPartitionMapExchange(true, true, null);
 
         checkSupplyContextMapIsEmpty();
@@ -577,35 +558,23 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
 
         stopGrid(1);
 
-        waitForRebalancing(0, 6);
-        waitForRebalancing(2, 6);
-        waitForRebalancing(3, 6);
-        waitForRebalancing(4, 6);
-
         awaitPartitionMapExchange(true, true, null);
 
         checkSupplyContextMapIsEmpty();
 
         stopGrid(0);
 
-        waitForRebalancing(2, 7);
-        waitForRebalancing(3, 7);
-        waitForRebalancing(4, 7);
-
         awaitPartitionMapExchange(true, true, null);
 
         checkSupplyContextMapIsEmpty();
 
         stopGrid(2);
 
-        waitForRebalancing(3, 8);
-        waitForRebalancing(4, 8);
-
         awaitPartitionMapExchange(true, true, null);
 
         checkPartitionMapExchangeFinished();
 
-        checkPartitionMapMessagesAbsent();
+        awaitPartitionMessagesAbsent();
 
         checkSupplyContextMapIsEmpty();
 
@@ -613,7 +582,7 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
 
         stopGrid(3);
 
-        waitForRebalancing(4, 9);
+        awaitPartitionMapExchange();
 
         checkSupplyContextMapIsEmpty();
 
@@ -634,36 +603,26 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
     /**
      *
      */
-    private class CountingCommunicationSpi extends TcpCommunicationSpi {
+    private static class CollectingCommunicationSpi extends 
TcpCommunicationSpi {
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
         /** {@inheritDoc} */
         @Override public void sendMessage(final ClusterNode node, final 
Message msg,
             final IgniteInClosure<IgniteException> ackC) throws 
IgniteSpiException {
             final Object msg0 = ((GridIoMessage)msg).message();
 
-            recordMessage(msg0);
+            if (msg0 instanceof GridDhtPartitionsSingleMessage ||
+                msg0 instanceof GridDhtPartitionsFullMessage) {
+                lastPartMsgTime = U.currentTimeMillis();
 
-            super.sendMessage(node, msg, ackC);
-        }
-
-        /**
-         * @param msg Message.
-         */
-        private void recordMessage(Object msg) {
-            if (record) {
-                Class id = msg.getClass();
-
-                AtomicInteger ai = map.get(id);
-
-                if (ai == null) {
-                    ai = new AtomicInteger();
-
-                    AtomicInteger oldAi = map.putIfAbsent(id, ai);
-
-                    (oldAi != null ? oldAi : ai).incrementAndGet();
-                }
-                else
-                    ai.incrementAndGet();
+                log.info("Last seen time of GridDhtPartitionsSingleMessage or 
GridDhtPartitionsFullMessage updated " +
+                    "[lastPartMsgTime=" + lastPartMsgTime +
+                    ", node=" + node.id() + ']');
             }
+
+            super.sendMessage(node, msg, ackC);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java
index 7e9765c..13a98e4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java
@@ -383,7 +383,7 @@ public class ClientAffinityAssignmentWithBaselineTest 
extends GridCommonAbstract
         startGrid("flaky");
 
         System.out.println("### Starting rebalancing after flaky node join");
-        waitForRebalancing();
+        awaitPartitionMapExchange();
         System.out.println("### Rebalancing is finished after flaky node 
join");
 
         awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
@@ -689,7 +689,7 @@ public class ClientAffinityAssignmentWithBaselineTest 
extends GridCommonAbstract
         ig0.cluster().setBaselineTopology(fullBlt.subList(0, newBaselineSize));
 
         System.out.println("### Starting rebalancing after BLT change: " + 
(newBaselineSize + 1) + " -> " + newBaselineSize);
-        waitForRebalancing();
+        awaitPartitionMapExchange();
         System.out.println("### Rebalancing is finished after BLT change: " + 
(newBaselineSize + 1) + " -> " + newBaselineSize);
 
         awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java
index 8f2e738..3500c8d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java
@@ -157,7 +157,8 @@ public class SlowHistoricalRebalanceSmallHistoryTest 
extends GridCommonAbstractT
 
         SUPPLY_MESSAGE_LATCH.get().countDown();
 
-        waitForRebalancing(); // Partition is OWNING on grid(0) and grid(1)
+        // Partition is OWNING on grid(0) and grid(1)
+        awaitPartitionMapExchange();
 
         for (int i = 0; i < 2; i++) {
             for (int j = 0; i < 500; i++)
@@ -178,7 +179,7 @@ public class SlowHistoricalRebalanceSmallHistoryTest 
extends GridCommonAbstractT
 
         startGrid(0);
 
-        waitForRebalancing();
+        awaitPartitionMapExchange();
 
         assertEquals(2, 
grid(1).context().discovery().aliveServerNodes().size());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
index 4d26823..a28ec5f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
@@ -194,7 +194,7 @@ public abstract class 
IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
 
                     
grid.cluster().setBaselineTopology(grid.cluster().topologyVersion());
 
-                    waitForRebalancing();
+                    awaitPartitionMapExchange();
                 }
                 catch (Throwable expected) {
                     // There can be any exception. Do nothing.

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java
index 78f3c03..9de2702 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java
@@ -120,7 +120,8 @@ public class GridMarshallerMappingConsistencyTest extends 
GridCommonAbstractTest
         c1.put(k, new DummyObject(k));
 
         startGrid(2);
-        waitForRebalancing();
+
+        awaitPartitionMapExchange();
 
         stopAllGrids();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 2c5091c..313cd71 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -688,32 +688,34 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
 
                                 if (affNodesCnt != ownerNodesCnt || 
!affNodes.containsAll(owners) ||
                                     (waitEvicts && loc != null && loc.state() 
!= GridDhtPartitionState.OWNING)) {
+                                    if (i % 50 == 0)
+                                        LT.warn(log(), "Waiting for topology 
map update [" +
+                                            "igniteInstanceName=" + g.name() +
+                                            ", cache=" + cfg.getName() +
+                                            ", cacheId=" + 
dht.context().cacheId() +
+                                            ", topVer=" + 
top.readyTopologyVersion() +
+                                            ", p=" + p +
+                                            ", affNodesCnt=" + affNodesCnt +
+                                            ", ownersCnt=" + ownerNodesCnt +
+                                            ", affNodes=" + 
F.nodeIds(affNodes) +
+                                            ", owners=" + F.nodeIds(owners) +
+                                            ", topFut=" + topFut +
+                                            ", locNode=" + 
g.cluster().localNode() + ']');
+                                }
+                                else
+                                    match = true;
+                            }
+                            else {
+                                if (i % 50 == 0)
                                     LT.warn(log(), "Waiting for topology map 
update [" +
                                         "igniteInstanceName=" + g.name() +
                                         ", cache=" + cfg.getName() +
                                         ", cacheId=" + dht.context().cacheId() 
+
                                         ", topVer=" + 
top.readyTopologyVersion() +
+                                        ", started=" + dht.context().started() 
+
                                         ", p=" + p +
-                                        ", affNodesCnt=" + affNodesCnt +
-                                        ", ownersCnt=" + ownerNodesCnt +
-                                        ", affNodes=" + F.nodeIds(affNodes) +
-                                        ", owners=" + F.nodeIds(owners) +
-                                        ", topFut=" + topFut +
+                                        ", readVer=" + readyVer +
                                         ", locNode=" + g.cluster().localNode() 
+ ']');
-                                }
-                                else
-                                    match = true;
-                            }
-                            else {
-                                LT.warn(log(), "Waiting for topology map 
update [" +
-                                    "igniteInstanceName=" + g.name() +
-                                    ", cache=" + cfg.getName() +
-                                    ", cacheId=" + dht.context().cacheId() +
-                                    ", topVer=" + top.readyTopologyVersion() +
-                                    ", started=" + dht.context().started() +
-                                    ", p=" + p +
-                                    ", readVer=" + readyVer +
-                                    ", locNode=" + g.cluster().localNode() + 
']');
                             }
 
                             if (!match) {
@@ -998,81 +1000,6 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
     }
 
     /**
-     * @param id Node id.
-     * @param major Major ver.
-     * @param minor Minor ver.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected void waitForRebalancing(int id, int major, int minor) throws 
IgniteCheckedException {
-        waitForRebalancing(grid(id), new AffinityTopologyVersion(major, 
minor));
-    }
-
-    /**
-     * @param id Node id.
-     * @param major Major ver.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected void waitForRebalancing(int id, int major) throws 
IgniteCheckedException {
-        waitForRebalancing(grid(id), new AffinityTopologyVersion(major));
-    }
-
-    /**
-     * @throws IgniteCheckedException If failed.
-     */
-    protected void waitForRebalancing() throws IgniteCheckedException {
-        for (Ignite ignite : G.allGrids())
-            waitForRebalancing((IgniteEx)ignite, null);
-    }
-
-    /**
-     * @param ignite Node.
-     * @param top Topology version.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected void waitForRebalancing(IgniteEx ignite, AffinityTopologyVersion 
top) throws IgniteCheckedException {
-        if (ignite.configuration().isClientMode())
-            return;
-
-        boolean finished = false;
-
-        long stopTime = System.currentTimeMillis() + 60_000;
-
-        while (!finished && (System.currentTimeMillis() < stopTime)) {
-            finished = true;
-
-            if (top == null)
-                top = ignite.context().discovery().topologyVersionEx();
-
-            for (GridCacheAdapter c : 
ignite.context().cache().internalCaches()) {
-                GridDhtPartitionDemander.RebalanceFuture fut =
-                    
(GridDhtPartitionDemander.RebalanceFuture)c.preloader().rebalanceFuture();
-
-                if (fut.topologyVersion() == null || 
fut.topologyVersion().compareTo(top) < 0) {
-                    finished = false;
-
-                    log.info("Unexpected future version, will retry [futVer=" 
+ fut.topologyVersion() +
-                        ", expVer=" + top + ']');
-
-                    U.sleep(100);
-
-                    break;
-                }
-                else if (!fut.get()) {
-                    finished = false;
-
-                    log.warning("Rebalancing finished with missed 
partitions.");
-
-                    U.sleep(100);
-
-                    break;
-                }
-            }
-        }
-
-        assertTrue(finished);
-    }
-
-    /**
      * @param ignite Node.
      */
     public void dumpCacheDebugInfo(Ignite ignite) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/137dd06a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index 55436d6..5e94052 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -53,6 +53,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.GridCachePut
 import 
org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRabalancingDelayedPartitionMapExchangeSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingCancelTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncCheckDataTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest;
@@ -152,6 +153,7 @@ public class IgniteCacheTestSuite3 extends TestSuite {
         
suite.addTestSuite(GridCacheRebalancingUnmarshallingFailedSelfTest.class);
         suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class);
         
suite.addTestSuite(GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.class);
+        suite.addTestSuite(GridCacheRebalancingCancelTest.class);
 
         // Test for byte array value special case.
         suite.addTestSuite(GridCacheLocalByteArrayValuesSelfTest.class);

Reply via email to