Exchange future cleanup, added special tasks for reassign/force rebalance.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1be9b40c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1be9b40c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1be9b40c

Branch: refs/heads/ignite-5578
Commit: 1be9b40c37efcbf332ebeeefc865c2fe864339e7
Parents: ecfbc2c
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Jul 11 12:42:54 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Jul 11 12:42:54 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 140 +++++++++++--------
 .../processors/cache/GridCachePreloader.java    |   8 +-
 .../cache/GridCachePreloaderAdapter.java        |   4 +-
 .../preloader/ForceRebalanceExchangeTask.java   |  58 ++++++++
 .../dht/preloader/GridDhtPartitionDemander.java |  22 +--
 .../preloader/GridDhtPartitionExchangeId.java   |  61 +++++++-
 .../GridDhtPartitionsExchangeFuture.java        | 136 +-----------------
 .../dht/preloader/GridDhtPreloader.java         |  33 ++---
 .../preloader/GridDhtPreloaderAssignments.java  |  21 ++-
 .../RebalanceReassignExchangeTask.java          |  44 ++++++
 10 files changed, 291 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index de0adc7..d4fe93f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -65,6 +65,7 @@ import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCach
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -77,6 +78,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
@@ -371,7 +373,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 "Node joined with smaller-than-local " +
                     "order [newOrder=" + n.order() + ", locOrder=" + 
loc.order() + ']';
 
-            exchId = exchangeId(n.id(), affinityTopologyVersion(evt), 
evt.type());
+            exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);
 
             exchFut = exchangeFuture(exchId, evt, cache,null, null);
         }
@@ -384,7 +386,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 ExchangeActions exchActions = stateChangeMsg.exchangeActions();
 
                 if (exchActions != null) {
-                    exchId = exchangeId(n.id(), affinityTopologyVersion(evt), 
evt.type());
+                    exchId = exchangeId(n.id(), affinityTopologyVersion(evt), 
evt);
 
                     exchFut = exchangeFuture(exchId, evt, cache, exchActions, 
null);
                 }
@@ -395,7 +397,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 ExchangeActions exchActions = batch.exchangeActions();
 
                 if (exchActions != null) {
-                    exchId = exchangeId(n.id(), affinityTopologyVersion(evt), 
evt.type());
+                    exchId = exchangeId(n.id(), affinityTopologyVersion(evt), 
evt);
 
                     exchFut = exchangeFuture(exchId, evt, cache, exchActions, 
null);
                 }
@@ -405,7 +407,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                 if (msg.exchangeId() == null) {
                     if (msg.exchangeNeeded()) {
-                        exchId = exchangeId(n.id(), 
affinityTopologyVersion(evt), evt.type());
+                        exchId = exchangeId(n.id(), 
affinityTopologyVersion(evt), evt);
 
                         exchFut = exchangeFuture(exchId, evt, cache, null, 
msg);
                     }
@@ -416,7 +418,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             }
             else if (customMsg instanceof SnapshotDiscoveryMessage
                 && ((SnapshotDiscoveryMessage) customMsg).needExchange()) {
-                exchId = exchangeId(n.id(), affinityTopologyVersion(evt), 
evt.type());
+                exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);
 
                 exchFut = exchangeFuture(exchId, evt, null, null, null);
             }
@@ -480,7 +482,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         assert discoEvt.topologyVersion() == startTopVer.topologyVersion();
 
-        return exchangeId(cctx.localNode().id(), startTopVer, EVT_NODE_JOINED);
+        return exchangeId(cctx.localNode().id(), startTopVer, discoEvt);
     }
 
     /**
@@ -845,27 +847,18 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
-     * @param exchFut Exchange future.
-     * @param reassign Dummy reassign flag.
+     * @param exchId Exchange ID.
      */
-    public void forceDummyExchange(boolean reassign,
-        GridDhtPartitionsExchangeFuture exchFut) {
-        exchWorker.addExchangeFuture(
-            new GridDhtPartitionsExchangeFuture(cctx, reassign, 
exchFut.discoveryEvent(), exchFut.exchangeId()));
+    public void forceReassign(GridDhtPartitionExchangeId exchId) {
+        exchWorker.forceReassign(exchId);
     }
 
     /**
-     * Forces preload exchange.
-     *
-     * @param exchFut Exchange future.
+     * @param exchId Exchange ID.
+     * @return Rebalance future.
      */
-    public IgniteInternalFuture<Boolean> 
forceRebalance(GridDhtPartitionsExchangeFuture exchFut) {
-        GridCompoundFuture<Boolean, Boolean> fut = new 
GridCompoundFuture<>(CU.boolReducer());
-
-        exchWorker.addExchangeFuture(
-            new GridDhtPartitionsExchangeFuture(cctx, 
exchFut.discoveryEvent(), exchFut.exchangeId(), fut));
-
-        return fut;
+    public IgniteInternalFuture<Boolean> 
forceRebalance(GridDhtPartitionExchangeId exchId) {
+        return exchWorker.forceRebalance(exchId);
     }
 
     /**
@@ -1185,10 +1178,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /**
      * @param nodeId Cause node ID.
      * @param topVer Topology version.
-     * @param evt Event type.
-     * @return Activity future ID.
+     * @param evt Event.
+     * @return Exchange ID instance.
      */
-    private GridDhtPartitionExchangeId exchangeId(UUID nodeId, 
AffinityTopologyVersion topVer, int evt) {
+    private GridDhtPartitionExchangeId exchangeId(UUID nodeId, 
AffinityTopologyVersion topVer, DiscoveryEvent evt) {
         return new GridDhtPartitionExchangeId(nodeId, evt, topVer);
     }
 
@@ -1635,7 +1628,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @return {@code True} if this is exchange task.
      */
     private static boolean isExchangeTask(CachePartitionExchangeWorkerTask 
task) {
-        return task instanceof GridDhtPartitionsExchangeFuture;
+        return task instanceof GridDhtPartitionsExchangeFuture ||
+            task instanceof RebalanceReassignExchangeTask ||
+            task instanceof ForceRebalanceExchangeTask;
     }
 
     /**
@@ -1745,13 +1740,32 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         }
 
         /**
+         * @param exchId Exchange ID.
+         */
+        void forceReassign(GridDhtPartitionExchangeId exchId) {
+            if (!hasPendingExchange() && !busy)
+                futQ.add(new RebalanceReassignExchangeTask(exchId));
+        }
+
+        /**
+         * @param exchId Exchange ID.
+         * @return Rebalance future.
+         */
+        IgniteInternalFuture<Boolean> 
forceRebalance(GridDhtPartitionExchangeId exchId) {
+            GridCompoundFuture<Boolean, Boolean> fut = new 
GridCompoundFuture<>(CU.boolReducer());
+
+            futQ.add(new ForceRebalanceExchangeTask(exchId, fut));
+
+            return fut;
+        }
+
+        /**
          * @param exchFut Exchange future.
          */
         void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
             assert exchFut != null;
 
-            if (!exchFut.dummy() || (!hasPendingExchange() && !busy))
-                futQ.offer(exchFut);
+            futQ.offer(exchFut);
 
             if (log.isDebugEnabled())
                 log.debug("Added exchange future to exchange worker: " + 
exchFut);
@@ -1880,22 +1894,37 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         continue;
                     }
 
-                    assert task instanceof GridDhtPartitionsExchangeFuture;
-
-                    GridDhtPartitionsExchangeFuture exchFut = 
(GridDhtPartitionsExchangeFuture)task;
-
                     busy = true;
 
                     Map<Integer, GridDhtPreloaderAssignments> assignsMap = 
null;
 
-                    boolean dummyReassign = exchFut.dummyReassign();
-                    boolean forcePreload = exchFut.forcePreload();
+                    boolean forcePreload = false;
+
+                    GridDhtPartitionExchangeId exchId;
+
+                    GridDhtPartitionsExchangeFuture exchFut = null;
 
                     try {
                         if (isCancelled())
                             break;
 
-                        if (!exchFut.dummy() && !exchFut.forcePreload()) {
+                        if (task instanceof RebalanceReassignExchangeTask) {
+                            exchId = ((RebalanceReassignExchangeTask) 
task).exchangeId();
+                        }
+                        else if (task instanceof ForceRebalanceExchangeTask) {
+                            forcePreload = true;
+
+                            timeout = 0; // Force refresh.
+
+                            exchId = 
((ForceRebalanceExchangeTask)task).exchangeId();
+                        }
+                        else {
+                            assert task instanceof 
GridDhtPartitionsExchangeFuture : task;
+
+                            exchFut = (GridDhtPartitionsExchangeFuture)task;
+
+                            exchId = exchFut.exchangeId();
+
                             lastInitializedFut = exchFut;
 
                             exchFut.init();
@@ -1961,18 +1990,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                             if (!cctx.kernalContext().clientNode() && changed 
&& !hasPendingExchange())
                                 refreshPartitions();
                         }
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Got dummy exchange (will 
reassign)");
 
-                            if (!dummyReassign) {
-                                timeout = 0; // Force refresh.
-
-                                continue;
-                            }
-                        }
-
-                        if (!exchFut.skipPreload() ) {
+                        if (!cctx.kernalContext().clientNode()) {
                             assignsMap = new HashMap<>();
 
                             for (CacheGroupContext grp : 
cctx.cache().cacheGroups()) {
@@ -1982,7 +2001,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                                 // Don't delay for dummy reassigns to avoid 
infinite recursion.
                                 if (delay == 0 || forcePreload)
-                                    assigns = grp.preloader().assign(exchFut);
+                                    assigns = grp.preloader().assign(exchId, 
exchFut);
 
                                 assignsMap.put(grp.groupId(), assigns);
                             }
@@ -2017,6 +2036,11 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                         boolean assignsCancelled = false;
 
+                        GridCompoundFuture<Boolean, Boolean> forcedRebFut = 
null;
+
+                        if (task instanceof ForceRebalanceExchangeTask)
+                            forcedRebFut = 
((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
+
                         for (Integer order : orderMap.descendingKeySet()) {
                             for (Integer grpId : orderMap.get(order)) {
                                 CacheGroupContext grp = 
cctx.cache().cacheGroup(grpId);
@@ -2035,7 +2059,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                     forcePreload,
                                     cnt,
                                     r,
-                                    exchFut.forcedRebalanceFuture());
+                                    forcedRebFut);
 
                                 if (cur != null) {
                                     rebList.add(grp.cacheOrGroupName());
@@ -2045,13 +2069,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                             }
                         }
 
-                        if (exchFut.forcedRebalanceFuture() != null)
-                            exchFut.forcedRebalanceFuture().markInitialized();
+                        if (forcedRebFut != null)
+                            forcedRebFut.markInitialized();
 
                         if (assignsCancelled) { // Pending exchange.
                             U.log(log, "Skipping rebalancing (obsolete 
exchange ID) " +
-                                "[top=" + exchFut.topologyVersion() + ", evt=" 
+ exchFut.discoveryEvent().name() +
-                                ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
+                                "[top=" + exchId.topologyVersion() + ", evt=" 
+ exchId.discoveryEventName() +
+                                ", node=" + exchId.nodeId() + ']');
                         }
                         else if (r != null) {
                             Collections.reverse(rebList);
@@ -2060,20 +2084,20 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                             if (!hasPendingExchange()) {
                                 U.log(log, "Rebalancing started " +
-                                    "[top=" + exchFut.topologyVersion() + ", 
evt=" + exchFut.discoveryEvent().name() +
-                                    ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
+                                    "[top=" + exchId.topologyVersion() + ", 
evt=" + exchId.discoveryEventName() +
+                                    ", node=" + exchId.nodeId() + ']');
 
                                 r.run(); // Starts rebalancing routine.
                             }
                             else
                                 U.log(log, "Skipping rebalancing (obsolete 
exchange ID) " +
-                                    "[top=" + exchFut.topologyVersion() + ", 
evt=" + exchFut.discoveryEvent().name() +
-                                    ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
+                                    "[top=" + exchId.topologyVersion() + ", 
evt=" + exchId.discoveryEventName() +
+                                    ", node=" + exchId.nodeId() + ']');
                         }
                         else
                             U.log(log, "Skipping rebalancing (nothing 
scheduled) " +
-                                "[top=" + exchFut.topologyVersion() + ", evt=" 
+ exchFut.discoveryEvent().name() +
-                                ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
+                                "[top=" + exchId.topologyVersion() + ", evt=" 
+ exchId.discoveryEventName() +
+                                ", node=" + exchId.nodeId() + ']');
                     }
                 }
                 catch (IgniteInterruptedCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 4e74532..dc1624a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -26,6 +26,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
@@ -63,10 +64,12 @@ public interface GridCachePreloader {
     public void onInitialExchangeComplete(@Nullable Throwable err);
 
     /**
-     * @param exchFut Exchange future to assign.
+     * @param exchId Exchange ID.
+     * @param exchFut Exchange future.
      * @return Assignments or {@code null} if detected that there are pending 
exchanges.
      */
-    @Nullable public GridDhtPreloaderAssignments 
assign(GridDhtPartitionsExchangeFuture exchFut);
+    @Nullable public GridDhtPreloaderAssignments 
assign(GridDhtPartitionExchangeId exchId,
+        @Nullable GridDhtPartitionsExchangeFuture exchFut);
 
     /**
      * Adds assignments to preloader.
@@ -75,6 +78,7 @@ public interface GridCachePreloader {
      * @param forcePreload Force preload flag.
      * @param cnt Counter.
      * @param next Runnable responsible for cache rebalancing start.
+     * @param forcedRebFut Rebalance future.
      * @return Rebalancing runnable.
      */
     public Runnable addAssignments(GridDhtPreloaderAssignments assignments,

http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index d2ca229..c0accf6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -27,6 +27,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
@@ -152,7 +153,8 @@ public class GridCachePreloaderAdapter implements 
GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments 
assign(GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public GridDhtPreloaderAssignments 
assign(GridDhtPartitionExchangeId exchId,
+        GridDhtPartitionsExchangeFuture exchFut) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java
new file mode 100644
index 0000000..c820175
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import 
org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+
+/**
+ *
+ */
+public class ForceRebalanceExchangeTask implements 
CachePartitionExchangeWorkerTask {
+    /** */
+    private final GridDhtPartitionExchangeId exchId;
+
+    /** */
+    private final GridCompoundFuture<Boolean, Boolean> forcedRebFut;
+
+    /**
+     * @param exchId Exchange ID.
+     * @param forcedRebFut Rebalance future.
+     */
+    public ForceRebalanceExchangeTask(GridDhtPartitionExchangeId exchId, 
GridCompoundFuture<Boolean, Boolean> forcedRebFut) {
+        assert exchId != null;
+        assert forcedRebFut != null;
+
+        this.exchId = exchId;
+        this.forcedRebFut = forcedRebFut;
+    }
+
+    /**
+     * @return Exchange ID.
+     */
+    public GridDhtPartitionExchangeId exchangeId() {
+        return exchId;
+    }
+
+    /**
+     * @return Rebalance future.
+     */
+    public GridCompoundFuture<Boolean, Boolean> forcedRebalanceFuture() {
+        return forcedRebFut;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 4f34aba..248b739 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -189,7 +189,7 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * Force Rebalance.
+     * @return Rebalance future.
      */
     IgniteInternalFuture<Boolean> forceRebalance() {
         GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
@@ -207,7 +207,7 @@ public class GridDhtPartitionDemander {
 
             exchFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                 @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                    IgniteInternalFuture<Boolean> fut0 = 
ctx.exchange().forceRebalance(exchFut);
+                    IgniteInternalFuture<Boolean> fut0 = 
ctx.exchange().forceRebalance(exchFut.exchangeId());
 
                     fut0.listen(new 
IgniteInClosure<IgniteInternalFuture<Boolean>>() {
                         @Override public void 
apply(IgniteInternalFuture<Boolean> future) {
@@ -363,7 +363,7 @@ public class GridDhtPartitionDemander {
                 @Override public void onTimeout() {
                     exchFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
-                            ctx.exchange().forceRebalance(exchFut);
+                            
ctx.exchange().forceRebalance(exchFut.exchangeId());
                         }
                     });
                 }
@@ -861,9 +861,9 @@ public class GridDhtPartitionDemander {
         /** Missed. */
         private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
 
-        /** Exchange future. */
+        /** Exchange ID. */
         @GridToStringExclude
-        private final GridDhtPartitionsExchangeFuture exchFut;
+        private final GridDhtPartitionExchangeId exchId;
 
         /** Topology version. */
         private final AffinityTopologyVersion topVer;
@@ -884,7 +884,7 @@ public class GridDhtPartitionDemander {
             long updateSeq) {
             assert assigns != null;
 
-            exchFut = assigns.exchangeFuture();
+            exchId = assigns.exchangeId();
             topVer = assigns.topologyVersion();
 
             this.grp = grp;
@@ -898,7 +898,7 @@ public class GridDhtPartitionDemander {
          * Dummy future. Will be done by real one.
          */
         RebalanceFuture() {
-            this.exchFut = null;
+            this.exchId = null;
             this.topVer = null;
             this.ctx = null;
             this.grp = null;
@@ -1032,7 +1032,7 @@ public class GridDhtPartitionDemander {
                     return;
 
                 if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
-                    rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, 
exchFut.discoveryEvent());
+                    rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, 
exchId.discoveryEvent());
 
                 T2<Long, Collection<Integer>> t = remaining.get(nodeId);
 
@@ -1108,7 +1108,7 @@ public class GridDhtPartitionDemander {
 
                     onDone(false); //Finished but has missed partitions, will 
force dummy exchange
 
-                    ctx.exchange().forceDummyExchange(true, exchFut);
+                    ctx.exchange().forceReassign(exchId);
 
                     return;
                 }
@@ -1125,7 +1125,7 @@ public class GridDhtPartitionDemander {
          */
         private void sendRebalanceStartedEvent() {
             if (grp.eventRecordable(EVT_CACHE_REBALANCE_STARTED))
-                rebalanceEvent(EVT_CACHE_REBALANCE_STARTED, 
exchFut.discoveryEvent());
+                rebalanceEvent(EVT_CACHE_REBALANCE_STARTED, 
exchId.discoveryEvent());
         }
 
         /**
@@ -1133,7 +1133,7 @@ public class GridDhtPartitionDemander {
          */
         private void sendRebalanceFinishedEvent() {
             if (grp.eventRecordable(EVT_CACHE_REBALANCE_STOPPED))
-                rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, 
exchFut.discoveryEvent());
+                rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, 
exchId.discoveryEvent());
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
index 012634e..0a49415 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
@@ -23,6 +23,9 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.nio.ByteBuffer;
 import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -55,20 +58,27 @@ public class GridDhtPartitionExchangeId implements Message, 
Comparable<GridDhtPa
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
+    /** */
+    @GridDirectTransient
+    private DiscoveryEvent discoEvt;
+
     /**
      * @param nodeId Node ID.
-     * @param evt Event.
+     * @param discoEvt Event.
      * @param topVer Topology version.
      */
-    public GridDhtPartitionExchangeId(UUID nodeId, int evt, @NotNull 
AffinityTopologyVersion topVer) {
+    public GridDhtPartitionExchangeId(UUID nodeId, DiscoveryEvent discoEvt, 
AffinityTopologyVersion topVer) {
         assert nodeId != null;
-        assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == 
EVT_NODE_JOINED ||
-            evt == EVT_DISCOVERY_CUSTOM_EVT;
-        assert topVer.topologyVersion() > 0;
+        assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+        assert discoEvt != null;
 
         this.nodeId = nodeId;
-        this.evt = evt;
+        this.evt = discoEvt.type();
         this.topVer = topVer;
+        this.discoEvt = discoEvt;
+
+        assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == 
EVT_NODE_JOINED ||
+            evt == EVT_DISCOVERY_CUSTOM_EVT;
     }
 
     /**
@@ -93,6 +103,45 @@ public class GridDhtPartitionExchangeId implements Message, 
Comparable<GridDhtPa
     }
 
     /**
+     * @return Discovery event timestamp.
+     */
+    long eventTimestamp() {
+        assert discoEvt != null;
+
+        return discoEvt.timestamp();
+    }
+
+    /**
+     * @param discoEvt Discovery event.
+     */
+    void discoveryEvent(DiscoveryEvent discoEvt) {
+        this.discoEvt = discoEvt;
+    }
+
+    /**
+     * @return Discovery event.
+     */
+    DiscoveryEvent discoveryEvent() {
+        assert discoEvt != null;
+
+        return discoEvt;
+    }
+
+    /**
+     * @return Discovery event node.
+     */
+    public ClusterNode eventNode() {
+        return discoEvt.eventNode();
+    }
+
+    /**
+     * @return Discovery event name.
+     */
+    public String discoveryEventName() {
+        return U.gridEventName(evt);
+    }
+
+    /**
      * @return Order.
      */
     public AffinityTopologyVersion topologyVersion() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5760f87..8989aaa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -75,7 +75,6 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -110,14 +109,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     implements Comparable<GridDhtPartitionsExchangeFuture>, 
CachePartitionExchangeWorkerTask, IgniteDiagnosticAware {
     /** */
     public static final String EXCHANGE_LOG = 
"org.apache.ignite.internal.exchange.time";
-    /** Dummy flag. */
-    private final boolean dummy;
-
-    /** Force preload flag. */
-    private final boolean forcePreload;
-
-    /** Dummy reassign flag. */
-    private final boolean reassign;
 
     /** */
     @GridToStringExclude
@@ -196,9 +187,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /** */
     private CacheAffinityChangeMessage affChangeMsg;
 
-    /** Skip preload flag. */
-    private boolean skipPreload;
-
     /** */
     private boolean clientOnlyExchange;
 
@@ -221,9 +209,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = 
new IgniteDhtPartitionHistorySuppliersMap();
 
-    /** Forced Rebalance future. */
-    private GridCompoundFuture<Boolean, Boolean> forcedRebFut;
-
     /** */
     private volatile Map<Integer, Map<Integer, Long>> partHistReserved;
 
@@ -235,62 +220,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     private final AtomicBoolean done = new AtomicBoolean();
 
     /**
-     * Dummy future created to trigger reassignments if partition
-     * topology changed while preloading.
-     *
-     * @param cctx Cache context.
-     * @param reassign Dummy reassign flag.
-     * @param discoEvt Discovery event.
-     * @param exchId Exchange id.
-     */
-    public GridDhtPartitionsExchangeFuture(
-        GridCacheSharedContext cctx,
-        boolean reassign,
-        DiscoveryEvent discoEvt,
-        GridDhtPartitionExchangeId exchId
-    ) {
-        dummy = true;
-        forcePreload = false;
-
-        this.exchId = exchId;
-        this.reassign = reassign;
-        this.discoEvt = discoEvt;
-        this.cctx = cctx;
-
-        log = cctx.logger(getClass());
-        exchLog = cctx.logger(EXCHANGE_LOG);
-
-        onDone(exchId.topologyVersion());
-    }
-
-    /**
-     * Force preload future created to trigger reassignments if partition
-     * topology changed while preloading.
-     *
-     * @param cctx Cache context.
-     * @param discoEvt Discovery event.
-     * @param exchId Exchange id.
-     * @param forcedRebFut Forced Rebalance future.
-     */
-    public GridDhtPartitionsExchangeFuture(GridCacheSharedContext cctx, 
DiscoveryEvent discoEvt,
-        GridDhtPartitionExchangeId exchId, GridCompoundFuture<Boolean, 
Boolean> forcedRebFut) {
-        dummy = false;
-        forcePreload = true;
-
-        this.exchId = exchId;
-        this.discoEvt = discoEvt;
-        this.cctx = cctx;
-        this.forcedRebFut = forcedRebFut;
-
-        log = cctx.logger(getClass());
-        exchLog = cctx.logger(EXCHANGE_LOG);
-
-        reassign = true;
-
-        onDone(exchId.topologyVersion());
-    }
-
-    /**
      * @param cctx Cache context.
      * @param busyLock Busy lock.
      * @param exchId Exchange ID.
@@ -309,10 +238,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         assert exchId.topologyVersion() != null;
         assert exchActions == null || !exchActions.empty();
 
-        dummy = false;
-        forcePreload = false;
-        reassign = false;
-
         this.cctx = cctx;
         this.busyLock = busyLock;
         this.exchId = exchId;
@@ -351,41 +276,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @return Skip preload flag.
-     */
-    public boolean skipPreload() {
-        return skipPreload;
-    }
-
-    /**
-     * @return Dummy flag.
-     */
-    public boolean dummy() {
-        return dummy;
-    }
-
-    /**
-     * @return Force preload flag.
-     */
-    public boolean forcePreload() {
-        return forcePreload;
-    }
-
-    /**
-     * @return Dummy reassign flag.
-     */
-    public boolean reassign() {
-        return reassign;
-    }
-
-    /**
-     * @return {@code True} if dummy reassign.
-     */
-    public boolean dummyReassign() {
-        return (dummy() || forcePreload()) && reassign();
-    }
-
-    /**
      * @param grpId Cache group ID.
      * @param partId Partition ID.
      * @return ID of history supplier node or null if it doesn't exist.
@@ -453,6 +343,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent 
discoEvt, DiscoCache discoCache) {
         assert exchId.equals(this.exchId);
 
+        this.exchId.discoveryEvent(discoEvt);
         this.discoEvt = discoEvt;
         this.discoCache = discoCache;
 
@@ -476,7 +367,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /**
      * @return {@code True} if deactivate cluster exchange.
      */
-    boolean deactivateCluster() {
+    private boolean deactivateCluster() {
         return exchActions != null && exchActions.deactivate();
     }
 
@@ -495,13 +386,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @return Forced Rebalance future.
-     */
-    @Nullable public GridCompoundFuture<Boolean, Boolean> 
forcedRebalanceFuture() {
-        return forcedRebFut;
-    }
-
-    /**
      * @return {@code true} if entered to busy state.
      */
     private boolean enterBusy() {
@@ -538,7 +422,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         assert discoEvt != null : this;
         assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
-        assert !dummy && !forcePreload : this;
 
         try {
             discoCache.updateAlives(cctx.discovery());
@@ -553,8 +436,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             boolean crdNode = crd != null && crd.isLocal();
 
-            skipPreload = cctx.kernalContext().clientNode();
-
             exchLog.info("Started exchange init [topVer=" + topVer +
                 ", crd=" + crdNode +
                 ", evt=" + discoEvt.type() +
@@ -1330,13 +1211,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable AffinityTopologyVersion res, 
@Nullable Throwable err) {
-        boolean realExchange = !dummy && !forcePreload;
-
         if (!done.compareAndSet(false, true))
-            return dummy;
+            return false;
 
         if (err == null &&
-            realExchange &&
             !cctx.kernalContext().clientNode() &&
             (serverNodeDiscoveryEvent() || affChangeMsg != null)) {
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -1347,7 +1225,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             }
        }
 
-        if (err == null && realExchange) {
+        if (err == null) {
             if (centralizedAff) {
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (grp.isLocal())
@@ -1417,14 +1295,14 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         cctx.database().releaseHistoryForExchange();
 
-        if (err == null && realExchange) {
+        if (err == null) {
             for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                 if (!grp.isLocal())
                     
grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()));
             }
         }
 
-        if (super.onDone(res, err) && realExchange) {
+        if (super.onDone(res, err)) {
             if (log.isDebugEnabled())
                 log.debug("Completed partition exchange [localNode=" + 
cctx.localNodeId() + ", exchange= " + this +
                     ", durationFromInit=" + (U.currentTimeMillis() - initTs) + 
']');
@@ -1446,7 +1324,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             return true;
         }
 
-        return dummy;
+        return false;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 2b18c24..7efd4aa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -179,22 +179,21 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments 
assign(GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public GridDhtPreloaderAssignments 
assign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture 
exchFut) {
         // No assignments for disabled preloader.
         GridDhtPartitionTopology top = grp.topology();
 
         if (!grp.rebalanceEnabled())
-            return new GridDhtPreloaderAssignments(exchFut, 
top.topologyVersion());
+            return new GridDhtPreloaderAssignments(exchId, 
top.topologyVersion());
 
         int partCnt = grp.affinity().partitions();
 
-        assert exchFut.forcePreload() || exchFut.dummyReassign() ||
-            
exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
-            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+        assert exchFut == null || 
exchFut.topologyVersion().equals(top.topologyVersion()) :
+            "Topology version mismatch [exchId=" + exchId +
                 ", grp=" + grp.name() +
                 ", topVer=" + top.topologyVersion() + ']';
 
-        GridDhtPreloaderAssignments assigns = new 
GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+        GridDhtPreloaderAssignments assigns = new 
GridDhtPreloaderAssignments(exchId, top.topologyVersion());
 
         AffinityTopologyVersion topVer = assigns.topologyVersion();
 
@@ -204,7 +203,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
             if (ctx.exchange().hasPendingExchange()) {
                 if (log.isDebugEnabled())
                     log.debug("Skipping assignments creation, exchange worker 
has pending assignments: " +
-                        exchFut.exchangeId());
+                        exchId);
 
                 assigns.cancelled(true);
 
@@ -220,7 +219,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
 
                 ClusterNode histSupplier = null;
 
-                if (ctx.database().persistenceEnabled()) {
+                if (ctx.database().persistenceEnabled() && exchFut != null) {
                     UUID nodeId = 
exchFut.partitionHistorySupplier(grp.groupId(), p);
 
                     if (nodeId != null)
@@ -243,7 +242,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                     if (msg == null) {
                         assigns.put(histSupplier, msg = new 
GridDhtPartitionDemandMessage(
                             top.updateSequence(),
-                            exchFut.exchangeId().topologyVersion(),
+                            exchId.topologyVersion(),
                             grp.groupId()));
                     }
 
@@ -292,14 +291,12 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                         top.own(part);
 
                         if 
(grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
-                            DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
-                        grp.addRebalanceEvent(p,
-                            EVT_CACHE_REBALANCE_PART_DATA_LOST,
-                            discoEvt.eventNode(),
-                            discoEvt.type(),
-                            discoEvt.timestamp());
-                    }
+                            grp.addRebalanceEvent(p,
+                                EVT_CACHE_REBALANCE_PART_DATA_LOST,
+                                exchId.eventNode(),
+                                exchId.event(),
+                                exchId.eventTimestamp());
+                        }
 
                         if (log.isDebugEnabled())
                             log.debug("Owning partition as there are no other 
owners: " + part);
@@ -312,7 +309,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                     if (msg == null) {
                         assigns.put(n, msg = new GridDhtPartitionDemandMessage(
                             top.updateSequence(),
-                            exchFut.exchangeId().topologyVersion(),
+                            exchId.topologyVersion(),
                             grp.groupId()));
                     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 3f82c9b..41dd076 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -30,25 +30,24 @@ public class GridDhtPreloaderAssignments extends 
ConcurrentHashMap<ClusterNode,
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Exchange future. */
-    @GridToStringExclude
-    private final GridDhtPartitionsExchangeFuture exchFut;
+    /** */
+    private final GridDhtPartitionExchangeId exchangeId;
 
-    /** Last join order. */
+    /** */
     private final AffinityTopologyVersion topVer;
 
     /** */
     private boolean cancelled;
 
     /**
-     * @param exchFut Exchange future.
+     * @param exchangeId Exchange ID.
      * @param topVer Last join order.
      */
-    public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture 
exchFut, AffinityTopologyVersion topVer) {
-        assert exchFut != null;
+    public GridDhtPreloaderAssignments(GridDhtPartitionExchangeId exchangeId, 
AffinityTopologyVersion topVer) {
+        assert exchangeId != null;
         assert topVer.topologyVersion() > 0 : topVer;
 
-        this.exchFut = exchFut;
+        this.exchangeId = exchangeId;
         this.topVer = topVer;
     }
 
@@ -69,8 +68,8 @@ public class GridDhtPreloaderAssignments extends 
ConcurrentHashMap<ClusterNode,
     /**
      * @return Exchange future.
      */
-    GridDhtPartitionsExchangeFuture exchangeFuture() {
-        return exchFut;
+    GridDhtPartitionExchangeId exchangeId() {
+        return exchangeId;
     }
 
     /**
@@ -82,7 +81,7 @@ public class GridDhtPreloaderAssignments extends 
ConcurrentHashMap<ClusterNode,
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridDhtPreloaderAssignments.class, this, "exchId", 
exchFut.exchangeId(),
+        return S.toString(GridDhtPreloaderAssignments.class, this, "exchId", 
exchangeId,
             "super", super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
new file mode 100644
index 0000000..9a76a8e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import 
org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+
+/**
+ *
+ */
+public class RebalanceReassignExchangeTask implements 
CachePartitionExchangeWorkerTask {
+    /** */
+    private final GridDhtPartitionExchangeId exchId;
+
+    /**
+     * @param exchId Exchange ID.
+     */
+    public RebalanceReassignExchangeTask(GridDhtPartitionExchangeId exchId) {
+        assert exchId != null;
+
+        this.exchId = exchId;
+    }
+
+    /**
+     * @return Exchange ID.
+     */
+    public GridDhtPartitionExchangeId exchangeId() {
+        return exchId;
+    }
+}

Reply via email to