http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
new file mode 100644
index 0000000..7035e37
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -0,0 +1,2999 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.PartitionLossPolicy;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.util.F0;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.GridPartitionStateMap;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
+import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
+
+/**
+ * Partition topology.
+ */
+@GridToStringExclude
+public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
+    /** */
+    private static final GridDhtPartitionState[] MOVING_STATES = new 
GridDhtPartitionState[] {MOVING};
+
+    /** Flag to control amount of output for full map. */
+    private static final boolean FULL_MAP_DEBUG = false;
+
+    /** */
+    private static final boolean FAST_DIFF_REBUILD = false;
+
+    /** */
+    private final GridCacheSharedContext ctx;
+
+    /** */
+    private final CacheGroupContext grp;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Time logger. */
+    private final IgniteLogger timeLog;
+
+    /** */
+    private final AtomicReferenceArray<GridDhtLocalPartition> locParts;
+
+    /** Node to partition map. */
+    private GridDhtPartitionFullMap node2part;
+
+    /** Partitions map for left nodes. */
+    private GridDhtPartitionFullMap leftNode2Part = new 
GridDhtPartitionFullMap();
+
+    /** */
+    private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>();
+
+    /** */
+    private volatile AffinityTopologyVersion diffFromAffinityVer = 
AffinityTopologyVersion.NONE;
+
+    /** Last started exchange version (always >= readyTopVer). */
+    private volatile AffinityTopologyVersion lastTopChangeVer = 
AffinityTopologyVersion.NONE;
+
+    /** Last finished exchange version. */
+    private volatile AffinityTopologyVersion readyTopVer = 
AffinityTopologyVersion.NONE;
+
+    /** Discovery cache. */
+    private volatile DiscoCache discoCache;
+
+    /** */
+    private volatile boolean stopping;
+
+    /** A future that will be completed when topology with version topVer will 
be ready to use. */
+    private volatile GridDhtTopologyFuture topReadyFut;
+
+    /** */
+    private final GridAtomicLong updateSeq = new GridAtomicLong(1);
+
+    /** Lock. */
+    private final StripedCompositeReadWriteLock lock = new 
StripedCompositeReadWriteLock(16);
+
+    /** Partition update counter. */
+    private final CachePartitionFullCountersMap cntrMap;
+
+    /** */
+    private volatile Map<Integer, Long> globalPartSizes;
+
+    /** */
+    private volatile AffinityTopologyVersion rebalancedTopVer = 
AffinityTopologyVersion.NONE;
+
+    /** */
+    private volatile MvccCoordinator mvccCrd;
+
+    /**
+     * @param ctx Cache shared context.
+     * @param grp Cache group.
+     */
+    public GridDhtPartitionTopologyImpl(
+        GridCacheSharedContext ctx,
+        CacheGroupContext grp
+    ) {
+        assert ctx != null;
+        assert grp != null;
+
+        this.ctx = ctx;
+        this.grp = grp;
+
+        log = ctx.logger(getClass());
+
+        timeLog = ctx.logger(GridDhtPartitionsExchangeFuture.EXCHANGE_LOG);
+
+        locParts = new 
AtomicReferenceArray<>(grp.affinityFunction().partitions());
+
+        cntrMap = new CachePartitionFullCountersMap(locParts.length());
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partitions() {
+        return grp.affinityFunction().partitions();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int groupId() {
+        return grp.groupId();
+    }
+
+    /**
+     *
+     */
+    public void onReconnected() {
+        lock.writeLock().lock();
+
+        try {
+            node2part = null;
+
+            diffFromAffinity.clear();
+
+            updateSeq.set(1);
+
+            topReadyFut = null;
+
+            diffFromAffinityVer = AffinityTopologyVersion.NONE;
+
+            rebalancedTopVer = AffinityTopologyVersion.NONE;
+
+            readyTopVer = AffinityTopologyVersion.NONE;
+
+            lastTopChangeVer = AffinityTopologyVersion.NONE;
+
+            discoCache = ctx.discovery().discoCache();
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * @return Full map string representation.
+     */
+    @SuppressWarnings({"ConstantConditions"})
+    private String fullMapString() {
+        return node2part == null ? "null" : FULL_MAP_DEBUG ? 
node2part.toFullString() : node2part.toString();
+    }
+
+    /**
+     * @param map Map to get string for.
+     * @return Full map string representation.
+     */
+    @SuppressWarnings({"ConstantConditions"})
+    private String mapString(GridDhtPartitionMap map) {
+        return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : 
map.toString();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
+    @Override public void readLock() {
+        lock.readLock().lock();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    /** {@inheritDoc} */
+    @Override public MvccCoordinator mvccCoordinator() {
+        return mvccCrd;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean holdsLock() {
+        return lock.isWriteLockedByCurrentThread() || lock.getReadHoldCount() 
> 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void updateTopologyVersion(
+        GridDhtTopologyFuture exchFut,
+        @NotNull DiscoCache discoCache,
+        MvccCoordinator mvccCrd,
+        long updSeq,
+        boolean stopping
+    ) throws IgniteInterruptedCheckedException {
+        U.writeLock(lock);
+
+        try {
+            AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
+
+            assert exchTopVer.compareTo(readyTopVer) > 0 : "Invalid topology 
version [grp=" + grp.cacheOrGroupName() +
+                ", topVer=" + readyTopVer +
+                ", exchTopVer=" + exchTopVer +
+                ", discoCacheVer=" + (this.discoCache != null ? 
this.discoCache.version() : "None") +
+                ", exchDiscoCacheVer=" + discoCache.version() +
+                ", fut=" + exchFut + ']';
+
+            this.stopping = stopping;
+
+            updateSeq.setIfGreater(updSeq);
+
+            topReadyFut = exchFut;
+
+            rebalancedTopVer = AffinityTopologyVersion.NONE;
+
+            lastTopChangeVer = exchTopVer;
+
+            this.discoCache = discoCache;
+            this.mvccCrd = mvccCrd;
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion readyTopologyVersion() {
+        AffinityTopologyVersion topVer = this.readyTopVer;
+
+        assert topVer.topologyVersion() > 0 : "Invalid topology version 
[topVer=" + topVer +
+            ", group=" + grp.cacheOrGroupName() + ']';
+
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion lastTopologyChangeVersion() {
+        AffinityTopologyVersion topVer = this.lastTopChangeVer;
+
+        assert topVer.topologyVersion() > 0 : "Invalid topology version 
[topVer=" + topVer +
+            ", group=" + grp.cacheOrGroupName() + ']';
+
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtTopologyFuture topologyVersionFuture() {
+        assert topReadyFut != null;
+
+        return topReadyFut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopping() {
+        return stopping;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean 
initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
+        GridDhtPartitionsExchangeFuture exchFut)
+        throws IgniteInterruptedCheckedException
+    {
+        boolean needRefresh;
+
+        ctx.database().checkpointReadLock();
+
+        try {
+            U.writeLock(lock);
+
+            try {
+                if (stopping)
+                    return false;
+
+                long updateSeq = this.updateSeq.incrementAndGet();
+
+                needRefresh = initPartitions(affVer, 
grp.affinity().readyAssignments(affVer), exchFut, updateSeq);
+
+                consistencyCheck();
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
+        }
+        finally {
+            ctx.database().checkpointReadUnlock();
+        }
+
+        return needRefresh;
+    }
+
+    /**
+     * Creates and initializes partitions using given {@code affVer} and 
{@code affAssignment}.
+     *
+     * @param affVer Affinity version to use.
+     * @param affAssignment Affinity assignment to use.
+     * @param exchFut Exchange future.
+     * @param updateSeq Update sequence.
+     * @return {@code True} if partitions must be refreshed.
+     */
+    private boolean initPartitions(AffinityTopologyVersion affVer, 
List<List<ClusterNode>> affAssignment, GridDhtPartitionsExchangeFuture exchFut, 
long updateSeq) {
+        boolean needRefresh = false;
+
+        if (grp.affinityNode()) {
+            ClusterNode loc = ctx.localNode();
+
+            ClusterNode oldest = discoCache.oldestAliveServerNode();
+
+            GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+
+            int partitions = grp.affinity().partitions();
+
+            if (grp.rebalanceEnabled()) {
+                boolean added = 
exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
+
+                boolean first = added || (loc.equals(oldest) && 
loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || 
exchFut.activateCluster();
+
+                if (first) {
+                    assert exchId.isJoined() || added || 
exchFut.activateCluster();
+
+                    if (log.isDebugEnabled()) {
+                        String reason;
+
+                        if (exchId.isJoined())
+                            reason = "First node in cluster";
+                        else if (added)
+                            reason = "Cache group added";
+                        else
+                            reason = "Cluster activate";
+
+                        log.debug("Initialize partitions (" + reason + ")" + " 
[grp=" + grp.cacheOrGroupName() + "]");
+                    }
+
+                    for (int p = 0; p < partitions; p++) {
+                        if (localNode(p, affAssignment)) {
+                            // Partition is created first time, so it's safe 
to own it.
+                            boolean shouldOwn = locParts.get(p) == null;
+
+                            GridDhtLocalPartition locPart = 
getOrCreatePartition(p);
+
+                            if (shouldOwn) {
+                                locPart.own();
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Partition has been owned 
(created first time) " +
+                                        "[grp=" + grp.cacheOrGroupName() + ", 
p=" + locPart.id() + ']');
+                            }
+
+                            needRefresh = true;
+
+                            updateSeq = updateLocal(p, locPart.state(), 
updateSeq, affVer);
+                        }
+                    }
+                }
+                else
+                    createPartitions(affVer, affAssignment, updateSeq);
+            }
+            else {
+                // If preloader is disabled, then we simply clear out
+                // the partitions this node is not responsible for.
+                for (int p = 0; p < partitions; p++) {
+                    GridDhtLocalPartition locPart = localPartition0(p, affVer, 
false, true);
+
+                    boolean belongs = localNode(p, affAssignment);
+
+                    if (locPart != null) {
+                        if (!belongs) {
+                            GridDhtPartitionState state = locPart.state();
+
+                            if (state.active()) {
+                                locPart.rent(false);
+
+                                updateSeq = updateLocal(p, locPart.state(), 
updateSeq, affVer);
+
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Evicting partition with 
rebalancing disabled (it does not belong to " +
+                                        "affinity) [grp=" + 
grp.cacheOrGroupName() + ", part=" + locPart + ']');
+                                }
+                            }
+                        }
+                        else
+                            locPart.own();
+                    }
+                    else if (belongs) {
+                        locPart = getOrCreatePartition(p);
+
+                        locPart.own();
+
+                        updateLocal(p, locPart.state(), updateSeq, affVer);
+                    }
+                }
+            }
+        }
+
+        updateRebalanceVersion(affVer, affAssignment);
+
+        return needRefresh;
+    }
+
+    /**
+     * Creates non-existing partitions belong to given affinity {@code aff}.
+     *
+     * @param affVer Affinity version.
+     * @param aff Affinity assignments.
+     * @param updateSeq Update sequence.
+     */
+    private void createPartitions(AffinityTopologyVersion affVer, 
List<List<ClusterNode>> aff, long updateSeq) {
+        if (!grp.affinityNode())
+            return;
+
+        int partitions = grp.affinity().partitions();
+
+        if (log.isDebugEnabled())
+            log.debug("Create non-existing partitions [grp=" + 
grp.cacheOrGroupName() + "]");
+
+        for (int p = 0; p < partitions; p++) {
+            if (node2part != null && node2part.valid()) {
+                if (localNode(p, aff)) {
+                    // This will make sure that all non-existing partitions
+                    // will be created in MOVING state.
+                    GridDhtLocalPartition locPart = getOrCreatePartition(p);
+
+                    updateSeq = updateLocal(p, locPart.state(), updateSeq, 
affVer);
+                }
+            }
+            // If this node's map is empty, we pre-create local partitions,
+            // so local map will be sent correctly during exchange.
+            else if (localNode(p, aff))
+                getOrCreatePartition(p);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeExchange(
+        GridDhtPartitionsExchangeFuture exchFut,
+        boolean affReady,
+        boolean updateMoving
+    ) throws IgniteCheckedException {
+        ctx.database().checkpointReadLock();
+
+        try {
+            synchronized (ctx.exchange().interruptLock()) {
+                if (Thread.currentThread().isInterrupted())
+                    throw new IgniteInterruptedCheckedException("Thread is 
interrupted: " + Thread.currentThread());
+
+                U.writeLock(lock);
+
+                try {
+                    if (stopping)
+                        return;
+
+                    assert lastTopChangeVer.equals(exchFut.initialVersion()) : 
"Invalid topology version [topVer=" + lastTopChangeVer +
+                        ", exchId=" + exchFut.exchangeId() + ']';
+
+                    ExchangeDiscoveryEvents evts = exchFut.context().events();
+
+                    if (affReady) {
+                        assert 
grp.affinity().lastVersion().equals(evts.topologyVersion()) : "Invalid affinity 
version [" +
+                            "grp=" + grp.cacheOrGroupName() +
+                            ", affVer=" + grp.affinity().lastVersion() +
+                            ", evtsVer=" + evts.topologyVersion() + ']';
+
+                        lastTopChangeVer = readyTopVer = 
evts.topologyVersion();
+
+                        discoCache = evts.discoveryCache();
+                    }
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Partition map beforeExchange [grp=" + 
grp.cacheOrGroupName() +
+                            ", exchId=" + exchFut.exchangeId() + ", fullMap=" 
+ fullMapString() + ']');
+                    }
+
+                    long updateSeq = this.updateSeq.incrementAndGet();
+
+                    cntrMap.clear();
+
+                    initializeFullMap(updateSeq);
+
+                    boolean grpStarted = 
exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
+
+                    if (evts.hasServerLeft()) {
+                        List<DiscoveryEvent> evts0 = evts.events();
+
+                        for (int i = 0; i < evts0.size(); i++) {
+                            DiscoveryEvent evt = evts0.get(i);
+
+                            if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
+                                removeNode(evt.eventNode().id());
+                        }
+                    }
+
+                    if (grp.affinityNode()) {
+                        if (grpStarted ||
+                            exchFut.firstEvent().type() == 
EVT_DISCOVERY_CUSTOM_EVT ||
+                            exchFut.serverNodeDiscoveryEvent()) {
+
+                            AffinityTopologyVersion affVer;
+                            List<List<ClusterNode>> affAssignment;
+
+                            if (affReady) {
+                                affVer = evts.topologyVersion();
+
+                                assert 
grp.affinity().lastVersion().equals(affVer) :
+                                        "Invalid affinity [topVer=" + 
grp.affinity().lastVersion() +
+                                                ", grp=" + 
grp.cacheOrGroupName() +
+                                                ", affVer=" + affVer +
+                                                ", fut=" + exchFut + ']';
+
+                                affAssignment = 
grp.affinity().readyAssignments(affVer);
+                            }
+                            else {
+                                assert !exchFut.context().mergeExchanges();
+
+                                affVer = exchFut.initialVersion();
+                                affAssignment = 
grp.affinity().idealAssignment();
+                            }
+
+                            initPartitions(affVer, affAssignment, exchFut, 
updateSeq);
+                        }
+                    }
+
+                    consistencyCheck();
+
+                    if (updateMoving) {
+                        assert 
grp.affinity().lastVersion().equals(evts.topologyVersion());
+
+                        
createMovingPartitions(grp.affinity().readyAffinity(evts.topologyVersion()));
+                    }
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Partition map after beforeExchange [grp=" + 
grp.cacheOrGroupName() + ", " +
+                            "exchId=" + exchFut.exchangeId() + ", fullMap=" + 
fullMapString() + ']');
+                    }
+
+                    if (log.isTraceEnabled()) {
+                        log.trace("Partition states after beforeExchange 
[grp=" + grp.cacheOrGroupName()
+                            + ", exchId=" + exchFut.exchangeId() + ", states=" 
+ dumpPartitionStates() + ']');
+                    }
+                }
+                finally {
+                    lock.writeLock().unlock();
+                }
+            }
+        }
+        finally {
+            ctx.database().checkpointReadUnlock();
+        }
+    }
+
+    /**
+     * Initializes full map if current full map is empty or invalid in case of 
coordinator or cache groups start.
+     *
+     * @param updateSeq Update sequence to initialize full map.
+     */
+    private void initializeFullMap(long updateSeq) {
+        if (!(topReadyFut instanceof GridDhtPartitionsExchangeFuture))
+            return;
+
+        GridDhtPartitionsExchangeFuture exchFut = 
(GridDhtPartitionsExchangeFuture) topReadyFut;
+
+        boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), 
grp.receivedFrom());
+
+        ClusterNode oldest = discoCache.oldestAliveServerNode();
+
+        // If this is the oldest node.
+        if (oldest != null && (ctx.localNode().equals(oldest) || grpStarted)) {
+            if (node2part == null) {
+                node2part = new GridDhtPartitionFullMap(oldest.id(), 
oldest.order(), updateSeq);
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Created brand new full topology map on oldest 
node [" +
+                        "grp=" + grp.cacheOrGroupName() + ", exchId=" + 
exchFut.exchangeId() +
+                        ", fullMap=" + fullMapString() + ']');
+                }
+            }
+            else if (!node2part.valid()) {
+                node2part = new GridDhtPartitionFullMap(oldest.id(),
+                    oldest.order(),
+                    updateSeq,
+                    node2part,
+                    false);
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Created new full topology map on oldest node [" 
+
+                        "grp=" +  grp.cacheOrGroupName() + ", exchId=" + 
exchFut.exchangeId() +
+                        ", fullMap=" + node2part + ']');
+                }
+            }
+            else if (!node2part.nodeId().equals(ctx.localNode().id())) {
+                node2part = new GridDhtPartitionFullMap(oldest.id(),
+                    oldest.order(),
+                    updateSeq,
+                    node2part,
+                    false);
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Copied old map into new map on oldest node 
(previous oldest node left) [" +
+                        "grp=" + grp.cacheOrGroupName() + ", exchId=" + 
exchFut.exchangeId() +
+                        ", fullMap=" + fullMapString() + ']');
+                }
+            }
+        }
+    }
+
+    /**
+     * @param p Partition number.
+     * @param topVer Topology version.
+     * @return {@code True} if given partition belongs to local node.
+     */
+    private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) {
+        return grp.affinity().nodes(p, topVer).contains(ctx.localNode());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void afterStateRestored(AffinityTopologyVersion topVer) {
+        lock.writeLock().lock();
+
+        try {
+            long updateSeq = this.updateSeq.incrementAndGet();
+
+            initializeFullMap(updateSeq);
+
+            for (int p = 0; p < grp.affinity().partitions(); p++) {
+                GridDhtLocalPartition locPart = locParts.get(p);
+
+                if (locPart == null)
+                    updateLocal(p, EVICTED, updateSeq, topVer);
+                else {
+                    GridDhtPartitionState state = locPart.state();
+
+                    updateLocal(p, state, updateSeq, topVer);
+
+                    // Restart cleaning.
+                    if (state == RENTING)
+                        locPart.clearAsync();
+                }
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture 
exchFut) {
+        boolean changed = false;
+
+        int partitions = grp.affinity().partitions();
+
+        AffinityTopologyVersion topVer = 
exchFut.context().events().topologyVersion();
+
+        assert grp.affinity().lastVersion().equals(topVer) : "Affinity is not 
initialized " +
+            "[grp=" + grp.cacheOrGroupName() +
+            ", topVer=" + topVer +
+            ", affVer=" + grp.affinity().lastVersion() +
+            ", fut=" + exchFut + ']';
+
+        ctx.database().checkpointReadLock();
+
+        try {
+
+            lock.writeLock().lock();
+
+            try {
+                if (stopping)
+                    return false;
+
+                assert readyTopVer.initialized() : readyTopVer;
+                assert lastTopChangeVer.equals(readyTopVer);
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Partition map before afterExchange [grp=" + 
grp.cacheOrGroupName() +
+                        ", exchId=" + exchFut.exchangeId() +
+                        ", fullMap=" + fullMapString() + ']');
+                }
+
+                if (log.isTraceEnabled()) {
+                    log.trace("Partition states before afterExchange [grp=" + 
grp.cacheOrGroupName()
+                        + ", exchVer=" + exchFut.exchangeId() + ", states=" + 
dumpPartitionStates() + ']');
+                }
+
+                long updateSeq = this.updateSeq.incrementAndGet();
+
+                for (int p = 0; p < partitions; p++) {
+                    GridDhtLocalPartition locPart = localPartition0(p, topVer, 
false, true);
+
+                    if (partitionLocalNode(p, topVer)) {
+                        // Prepare partition to rebalance if it's not happened 
on full map update phase.
+                        if (locPart == null || locPart.state() == RENTING || 
locPart.state() == EVICTED)
+                            locPart = rebalancePartition(p, false);
+
+                        GridDhtPartitionState state = locPart.state();
+
+                        if (state == MOVING) {
+                            if (grp.rebalanceEnabled()) {
+                                Collection<ClusterNode> owners = owners(p);
+
+                                // If an owner node left during exchange, then 
new exchange should be started with detecting lost partitions.
+                                if (!F.isEmpty(owners)) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Will not own partition 
(there are owners to rebalance from) " +
+                                            "[grp=" + grp.cacheOrGroupName() + 
", p=" + p + ", owners = " + owners + ']');
+                                }
+                            }
+                            else
+                                updateSeq = updateLocal(p, locPart.state(), 
updateSeq, topVer);
+                        }
+                    }
+                    else {
+                        if (locPart != null) {
+                            GridDhtPartitionState state = locPart.state();
+
+                            if (state == MOVING) {
+                                locPart.rent(false);
+
+                                updateSeq = updateLocal(p, locPart.state(), 
updateSeq, topVer);
+
+                                changed = true;
+
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Evicting " + state + " 
partition (it does not belong to affinity) [" +
+                                        "grp=" + grp.cacheOrGroupName() + ", 
p=" + locPart.id() + ']');
+                                }
+                            }
+                        }
+                    }
+                }
+
+                AffinityAssignment aff = grp.affinity().readyAffinity(topVer);
+
+                if (node2part != null && node2part.valid())
+                    changed |= checkEvictions(updateSeq, aff);
+
+                updateRebalanceVersion(aff.topologyVersion(), 
aff.assignment());
+
+                consistencyCheck();
+
+                if (log.isTraceEnabled()) {
+                    log.trace("Partition states after afterExchange [grp=" + 
grp.cacheOrGroupName()
+                        + ", exchVer=" + exchFut.exchangeId() + ", states=" + 
dumpPartitionStates() + ']');
+                }
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
+        }
+        finally {
+            ctx.database().checkpointReadUnlock();
+        }
+
+        return changed;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridDhtLocalPartition localPartition(int p, 
AffinityTopologyVersion topVer,
+        boolean create)
+        throws GridDhtInvalidPartitionException {
+        return localPartition0(p, topVer, create, false);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridDhtLocalPartition localPartition(int p, 
AffinityTopologyVersion topVer,
+        boolean create, boolean showRenting) throws 
GridDhtInvalidPartitionException {
+        return localPartition0(p, topVer, create, showRenting);
+    }
+
+    /**
+     * Creates partition with id {@code p} if it doesn't exist or evicted.
+     * In other case returns existing partition.
+     *
+     * @param p Partition number.
+     * @return Partition.
+     */
+    private GridDhtLocalPartition getOrCreatePartition(int p) {
+        assert lock.isWriteLockedByCurrentThread();
+
+        assert ctx.database().checkpointLockIsHeldByThread();
+
+        GridDhtLocalPartition loc = locParts.get(p);
+
+        if (loc == null || loc.state() == EVICTED) {
+            // Make sure that after eviction partition is destroyed.
+            if (loc != null)
+                loc.awaitDestroy();
+
+            locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
+
+            long updCntr = cntrMap.updateCounter(p);
+
+            if (updCntr != 0)
+                loc.updateCounter(updCntr);
+
+            if (ctx.pageStore() != null) {
+                try {
+                    ctx.pageStore().onPartitionCreated(grp.groupId(), p);
+                }
+                catch (IgniteCheckedException e) {
+                    // TODO ignite-db
+                    throw new IgniteException(e);
+                }
+            }
+        }
+
+        return loc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtLocalPartition forceCreatePartition(int p) throws 
IgniteCheckedException {
+        lock.writeLock().lock();
+
+        try {
+            GridDhtLocalPartition part = locParts.get(p);
+
+            if (part != null && part.state() != EVICTED)
+                return part;
+
+            part = new GridDhtLocalPartition(ctx, grp, p);
+
+            locParts.set(p, part);
+
+            ctx.pageStore().onPartitionCreated(grp.groupId(), p);
+
+            return part;
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * @param p Partition number.
+     * @param topVer Topology version.
+     * @param create If {@code true} create partition if it doesn't exists or 
evicted.
+     * @param showRenting If {@code true} return partition in RENTING state if 
exists.
+     * @return Local partition.
+     */
+    @SuppressWarnings("TooBroadScope")
+    private GridDhtLocalPartition localPartition0(int p,
+        AffinityTopologyVersion topVer,
+        boolean create,
+        boolean showRenting) {
+        GridDhtLocalPartition loc;
+
+        loc = locParts.get(p);
+
+        GridDhtPartitionState state = loc != null ? loc.state() : null;
+
+        if (loc != null && state != EVICTED && (state != RENTING || 
showRenting))
+            return loc;
+
+        if (!create)
+            return null;
+
+        boolean created = false;
+
+        ctx.database().checkpointReadLock();
+
+        try {
+            lock.writeLock().lock();
+
+            try {
+                loc = locParts.get(p);
+
+                state = loc != null ? loc.state() : null;
+
+                boolean belongs = partitionLocalNode(p, topVer);
+
+                if (loc != null && state == EVICTED) {
+                    // Make sure that after eviction partition is destroyed.
+                    loc.awaitDestroy();
+
+                    locParts.set(p, loc = null);
+
+                    if (!belongs) {
+                        throw new GridDhtInvalidPartitionException(p, "Adding 
entry to evicted partition " +
+                            "(often may be caused by inconsistent 
'key.hashCode()' implementation) " +
+                            "[grp=" + grp.cacheOrGroupName() + ", part=" + p + 
", topVer=" + topVer +
+                            ", this.topVer=" + this.readyTopVer + ']');
+                    }
+                }
+                else if (loc != null && state == RENTING && !showRenting) {
+                    throw new GridDhtInvalidPartitionException(p, "Adding 
entry to partition that is concurrently " +
+                        "evicted [grp=" + grp.cacheOrGroupName() + ", part=" + 
p + ", shouldBeMoving="
+                        + ", belongs=" + belongs + ", topVer=" + topVer + ", 
curTopVer=" + this.readyTopVer + "]");
+                }
+
+                if (loc == null) {
+                    if (!belongs)
+                        throw new GridDhtInvalidPartitionException(p, 
"Creating partition which does not belong to " +
+                            "local node (often may be caused by inconsistent 
'key.hashCode()' implementation) " +
+                            "[grp=" + grp.cacheOrGroupName() + ", part=" + p + 
", topVer=" + topVer +
+                            ", this.topVer=" + this.readyTopVer + ']');
+
+                    locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, 
p));
+
+                    this.updateSeq.incrementAndGet();
+
+                    created = true;
+                }
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
+        }
+        finally {
+            ctx.database().checkpointReadUnlock();
+        }
+
+        if (created && ctx.pageStore() != null) {
+            try {
+                ctx.pageStore().onPartitionCreated(grp.groupId(), p);
+            }
+            catch (IgniteCheckedException e) {
+                // TODO ignite-db
+                throw new IgniteException(e);
+            }
+        }
+
+        return loc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void releasePartitions(int... parts) {
+        assert parts != null;
+        assert parts.length > 0;
+
+        for (int i = 0; i < parts.length; i++) {
+            GridDhtLocalPartition part = locParts.get(parts[i]);
+
+            if (part != null)
+                part.release();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtLocalPartition localPartition(int part) {
+        return locParts.get(part);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<GridDhtLocalPartition> localPartitions() {
+        List<GridDhtLocalPartition> list = new ArrayList<>(locParts.length());
+
+        for (int i = 0; i < locParts.length(); i++) {
+            GridDhtLocalPartition part = locParts.get(i);
+
+            if (part != null && part.state().active())
+                list.add(part);
+        }
+
+        return list;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterable<GridDhtLocalPartition> currentLocalPartitions() {
+        return new Iterable<GridDhtLocalPartition>() {
+            @Override public Iterator<GridDhtLocalPartition> iterator() {
+                return new CurrentPartitionsIterator();
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onRemoved(GridDhtCacheEntry e) {
+        /*
+         * Make sure not to acquire any locks here as this method
+         * may be called from sensitive synchronization blocks.
+         * ===================================================
+         */
+
+        GridDhtLocalPartition loc = localPartition(e.partition(), readyTopVer, 
false);
+
+        if (loc != null)
+            loc.onRemoved(e);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionMap localPartitionMap() {
+        GridPartitionStateMap map = new 
GridPartitionStateMap(locParts.length());
+
+        lock.readLock().lock();
+
+        try {
+            for (int i = 0; i < locParts.length(); i++) {
+                GridDhtLocalPartition part = locParts.get(i);
+
+                if (part == null)
+                    continue;
+
+                map.put(i, part.state());
+            }
+
+            GridDhtPartitionMap locPartMap = node2part != null ? 
node2part.get(ctx.localNodeId()) : null;
+
+            return new GridDhtPartitionMap(ctx.localNodeId(),
+                updateSeq.get(),
+                locPartMap != null ? locPartMap.topologyVersion() : 
readyTopVer,
+                map,
+                true);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionState partitionState(UUID nodeId, int 
part) {
+        lock.readLock().lock();
+
+        try {
+            GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+            if (partMap != null) {
+                GridDhtPartitionState state = partMap.get(part);
+
+                return state == null ? EVICTED : state;
+            }
+
+            return EVICTED;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public List<ClusterNode> nodes(int p,
+        AffinityAssignment affAssignment,
+        List<ClusterNode> affNodes) {
+        return nodes0(p, affAssignment, affNodes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion 
topVer) {
+        AffinityAssignment affAssignment = 
grp.affinity().cachedAffinity(topVer);
+
+        List<ClusterNode> affNodes = affAssignment.get(p);
+
+        List<ClusterNode> nodes = nodes0(p, affAssignment, affNodes);
+
+        return nodes != null ? nodes : affNodes;
+    }
+
+    /**
+     * @param p Partition.
+     * @param affAssignment Assignments.
+     * @param affNodes Node assigned for given partition by affinity.
+     * @return Nodes responsible for given partition (primary is first).
+     */
+    @Nullable private List<ClusterNode> nodes0(int p, AffinityAssignment 
affAssignment, List<ClusterNode> affNodes) {
+        if (grp.isReplicated())
+            return affNodes;
+
+        AffinityTopologyVersion topVer = affAssignment.topologyVersion();
+
+        lock.readLock().lock();
+
+        try {
+            assert node2part != null && node2part.valid() : "Invalid 
node-to-partitions map [topVer1=" + topVer +
+                ", topVer2=" + this.readyTopVer +
+                ", node=" + ctx.igniteInstanceName() +
+                ", grp=" + grp.cacheOrGroupName() +
+                ", node2part=" + node2part + ']';
+
+            List<ClusterNode> nodes = null;
+
+            if (!topVer.equals(diffFromAffinityVer)) {
+                LT.warn(log, "Requested topology version does not match 
calculated diff, will require full iteration to" +
+                    "calculate mapping [grp=" + grp.cacheOrGroupName() + ", 
topVer=" + topVer +
+                    ", diffVer=" + diffFromAffinityVer + "]");
+
+                nodes = new ArrayList<>();
+
+                nodes.addAll(affNodes);
+
+                for (Map.Entry<UUID, GridDhtPartitionMap> entry : 
node2part.entrySet()) {
+                    GridDhtPartitionState state = entry.getValue().get(p);
+
+                    ClusterNode n = ctx.discovery().node(entry.getKey());
+
+                    if (n != null && state != null && (state == MOVING || 
state == OWNING || state == RENTING)
+                        && !nodes.contains(n) && (topVer.topologyVersion() < 0 
|| n.order() <= topVer.topologyVersion())) {
+                        nodes.add(n);
+                    }
+
+                }
+
+                return nodes;
+            }
+
+            Collection<UUID> diffIds = diffFromAffinity.get(p);
+
+            if (!F.isEmpty(diffIds)) {
+                HashSet<UUID> affIds = affAssignment.getIds(p);
+
+                for (UUID nodeId : diffIds) {
+                    if (affIds.contains(nodeId)) {
+                        U.warn(log, "Node from diff is affinity node, skipping 
it [grp=" + grp.cacheOrGroupName() +
+                            ", node=" + nodeId + ']');
+
+                        continue;
+                    }
+
+                    if (hasState(p, nodeId, OWNING, MOVING, RENTING)) {
+                        ClusterNode n = ctx.discovery().node(nodeId);
+
+                        if (n != null && (topVer.topologyVersion() < 0 || 
n.order() <= topVer.topologyVersion())) {
+                            if (nodes == null) {
+                                nodes = new ArrayList<>(affNodes.size() + 
diffIds.size());
+
+                                nodes.addAll(affNodes);
+                            }
+
+                            nodes.add(n);
+                        }
+                    }
+                }
+            }
+
+            return nodes;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param p Partition.
+     * @param topVer Topology version ({@code -1} for all nodes).
+     * @param state Partition state.
+     * @param states Additional partition states.
+     * @return List of nodes for the partition.
+     */
+    private List<ClusterNode> nodes(
+        int p,
+        AffinityTopologyVersion topVer,
+        GridDhtPartitionState state,
+        GridDhtPartitionState... states
+    ) {
+        Collection<UUID> allIds = 
F.nodeIds(discoCache.cacheGroupAffinityNodes(grp.groupId()));
+
+        lock.readLock().lock();
+
+        try {
+            assert node2part != null && node2part.valid() : "Invalid 
node-to-partitions map [topVer=" + topVer +
+                ", grp=" + grp.cacheOrGroupName() +
+                ", allIds=" + allIds +
+                ", node2part=" + node2part + ']';
+
+            // Node IDs can be null if both, primary and backup, nodes 
disappear.
+            List<ClusterNode> nodes = new ArrayList<>();
+
+            for (UUID id : allIds) {
+                if (hasState(p, id, state, states)) {
+                    ClusterNode n = ctx.discovery().node(id);
+
+                    if (n != null && (topVer.topologyVersion() < 0 || 
n.order() <= topVer.topologyVersion()))
+                        nodes.add(n);
+                }
+            }
+
+            return nodes;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion 
topVer) {
+        if (!grp.rebalanceEnabled())
+            return ownersAndMoving(p, topVer);
+
+        return nodes(p, topVer, OWNING, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> owners(int p) {
+        return owners(p, AffinityTopologyVersion.NONE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> allOwners() {
+        lock.readLock().lock();
+
+        try {
+            int parts = partitions();
+
+            List<List<ClusterNode>> res = new ArrayList<>(parts);
+
+            for (int i = 0; i < parts; i++)
+                res.add(new ArrayList<>());
+
+            List<ClusterNode> allNodes = 
discoCache.cacheGroupAffinityNodes(grp.groupId());
+
+            for (int i = 0; i < allNodes.size(); i++) {
+                ClusterNode node = allNodes.get(i);
+
+                GridDhtPartitionMap nodeParts = node2part.get(node.id());
+
+                if (nodeParts != null) {
+                    for (Map.Entry<Integer, GridDhtPartitionState> e : 
nodeParts.map().entrySet()) {
+                        if (e.getValue() == OWNING) {
+                            int part = e.getKey();
+
+                            List<ClusterNode> owners = res.get(part);
+
+                            owners.add(node);
+                        }
+                    }
+                }
+            }
+
+            return res;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> moving(int p) {
+        if (!grp.rebalanceEnabled())
+            return ownersAndMoving(p, AffinityTopologyVersion.NONE);
+
+        return nodes(p, AffinityTopologyVersion.NONE, MOVING, null);
+    }
+
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return List of nodes in state OWNING or MOVING.
+     */
+    private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion 
topVer) {
+        return nodes(p, topVer, OWNING, MOVING_STATES);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long updateSequence() {
+        return updateSeq.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) {
+        lock.readLock().lock();
+
+        try {
+            if (node2part == null || stopping)
+                return null;
+
+            assert node2part.valid() : "Invalid node2part [node2part=" + 
node2part +
+                ", grp=" + grp.cacheOrGroupName() +
+                ", stopping=" + stopping +
+                ", locNodeId=" + ctx.localNode().id() +
+                ", locName=" + ctx.igniteInstanceName() + ']';
+
+            GridDhtPartitionFullMap m = node2part;
+
+            return new GridDhtPartitionFullMap(m.nodeId(), m.nodeOrder(), 
m.updateSequence(), m, onlyActive);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Checks should current partition map overwritten by new partition map
+     * Method returns true if topology version or update sequence of new map 
are greater than of current map
+     *
+     * @param currentMap Current partition map
+     * @param newMap New partition map
+     * @return True if current partition map should be overwritten by new 
partition map, false in other case
+     */
+    private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, 
GridDhtPartitionMap newMap) {
+        return newMap != null &&
+                
(newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 ||
+                 
newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && 
newMap.updateSequence() > currentMap.updateSequence());
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
+    @Override public boolean update(
+        @Nullable AffinityTopologyVersion exchangeVer,
+        GridDhtPartitionFullMap partMap,
+        @Nullable CachePartitionFullCountersMap incomeCntrMap,
+        Set<Integer> partsToReload,
+        @Nullable Map<Integer, Long> partSizes,
+        @Nullable AffinityTopologyVersion msgTopVer) {
+        if (log.isDebugEnabled()) {
+            log.debug("Updating full partition map " +
+                "[grp=" + grp.cacheOrGroupName() + ", exchVer=" + exchangeVer 
+ ", fullMap=" + fullMapString() + ']');
+        }
+
+        assert partMap != null;
+
+        ctx.database().checkpointReadLock();
+
+        try {
+            lock.writeLock().lock();
+
+            try {
+                if (log.isTraceEnabled() && exchangeVer != null) {
+                    log.trace("Partition states before full update [grp=" + 
grp.cacheOrGroupName()
+                        + ", exchVer=" + exchangeVer + ", states=" + 
dumpPartitionStates() + ']');
+                }
+
+                if (stopping || !lastTopChangeVer.initialized() ||
+                    // Ignore message not-related to exchange if exchange is 
in progress.
+                    (exchangeVer == null && 
!lastTopChangeVer.equals(readyTopVer)))
+                    return false;
+
+                if (incomeCntrMap != null) {
+                    // update local counters in partitions
+                    for (int i = 0; i < locParts.length(); i++) {
+                        cntrMap.updateCounter(i, 
incomeCntrMap.updateCounter(i));
+
+                        GridDhtLocalPartition part = locParts.get(i);
+
+                        if (part == null)
+                            continue;
+
+                        if (part.state() == OWNING || part.state() == MOVING) {
+                            long updCntr = 
incomeCntrMap.updateCounter(part.id());
+                            long curCntr = part.updateCounter();
+
+                            if (updCntr != 0 && updCntr > curCntr) {
+                                part.updateCounter(updCntr);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Partition update counter has 
updated [grp=" + grp.cacheOrGroupName() + ", p=" + part.id()
+                                        + ", state=" + part.state() + ", 
prevCntr=" + curCntr + ", nextCntr=" + updCntr + "]");
+                            }
+                        }
+                    }
+                }
+
+                if (exchangeVer != null) {
+                    // Ignore if exchange already finished or new exchange 
started.
+                    if (readyTopVer.compareTo(exchangeVer) > 0 || 
lastTopChangeVer.compareTo(exchangeVer) > 0) {
+                        U.warn(log, "Stale exchange id for full partition map 
update (will ignore) [" +
+                            "grp=" + grp.cacheOrGroupName() +
+                            ", lastTopChange=" + lastTopChangeVer +
+                            ", readTopVer=" + readyTopVer +
+                            ", exchVer=" + exchangeVer + ']');
+
+                        return false;
+                    }
+                }
+
+                if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) 
> 0) {
+                    U.warn(log, "Stale version for full partition map update 
message (will ignore) [" +
+                        "grp=" + grp.cacheOrGroupName() +
+                        ", lastTopChange=" + lastTopChangeVer +
+                        ", readTopVer=" + readyTopVer +
+                        ", msgVer=" + msgTopVer + ']');
+
+                    return false;
+                }
+
+                boolean fullMapUpdated = (node2part == null);
+
+                if (node2part != null) {
+                    for (GridDhtPartitionMap part : node2part.values()) {
+                        GridDhtPartitionMap newPart = 
partMap.get(part.nodeId());
+
+                        if (shouldOverridePartitionMap(part, newPart)) {
+                            fullMapUpdated = true;
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("Overriding partition map in full 
update map [" +
+                                    "grp=" + grp.cacheOrGroupName() +
+                                    ", exchVer=" + exchangeVer +
+                                    ", curPart=" + mapString(part) +
+                                    ", newPart=" + mapString(newPart) + ']');
+                            }
+
+                            if (newPart.nodeId().equals(ctx.localNodeId()))
+                                
updateSeq.setIfGreater(newPart.updateSequence());
+                        }
+                        else {
+                            // If for some nodes current partition has a newer 
map,
+                            // then we keep the newer value.
+                            partMap.put(part.nodeId(), part);
+                        }
+                    }
+
+                    // Check that we have new nodes.
+                    for (GridDhtPartitionMap part : partMap.values()) {
+                        if (fullMapUpdated)
+                            break;
+
+                        fullMapUpdated = !node2part.containsKey(part.nodeId());
+                    }
+
+                    // Remove entry if node left.
+                    for (Iterator<UUID> it = partMap.keySet().iterator(); 
it.hasNext(); ) {
+                        UUID nodeId = it.next();
+
+                        if (!ctx.discovery().alive(nodeId)) {
+                            if (log.isTraceEnabled())
+                                log.trace("Removing left node from full map 
update [grp=" + grp.cacheOrGroupName() +
+                                    ", nodeId=" + nodeId + ", partMap=" + 
partMap + ']');
+
+                            if (node2part.containsKey(nodeId)) {
+                                GridDhtPartitionMap map = partMap.get(nodeId);
+
+                                if (map != null)
+                                    leftNode2Part.put(nodeId, map);
+                            }
+
+                            it.remove();
+                        }
+                    }
+                }
+                else {
+                    GridDhtPartitionMap locNodeMap = 
partMap.get(ctx.localNodeId());
+
+                    if (locNodeMap != null)
+                        updateSeq.setIfGreater(locNodeMap.updateSequence());
+                }
+
+                if (!fullMapUpdated) {
+                    if (log.isTraceEnabled()) {
+                        log.trace("No updates for full partition map (will 
ignore) [" +
+                            "grp=" + grp.cacheOrGroupName() +
+                            ", lastExch=" + lastTopChangeVer +
+                            ", exchVer=" + exchangeVer +
+                            ", curMap=" + node2part +
+                            ", newMap=" + partMap + ']');
+                    }
+
+                    return false;
+                }
+
+                if (exchangeVer != null) {
+                    assert exchangeVer.compareTo(readyTopVer) >= 0 && 
exchangeVer.compareTo(lastTopChangeVer) >= 0;
+
+                    lastTopChangeVer = readyTopVer = exchangeVer;
+                }
+
+                node2part = partMap;
+
+                if (exchangeVer == null && !grp.isReplicated() &&
+                        (readyTopVer.initialized() && 
readyTopVer.compareTo(diffFromAffinityVer) >= 0)) {
+                    AffinityAssignment affAssignment = 
grp.affinity().readyAffinity(readyTopVer);
+
+                    for (Map.Entry<UUID, GridDhtPartitionMap> e : 
partMap.entrySet()) {
+                        for (Map.Entry<Integer, GridDhtPartitionState> e0 : 
e.getValue().entrySet()) {
+                            int p = e0.getKey();
+
+                            Set<UUID> diffIds = diffFromAffinity.get(p);
+
+                            if ((e0.getValue() == MOVING || e0.getValue() == 
OWNING || e0.getValue() == RENTING) &&
+                                !affAssignment.getIds(p).contains(e.getKey())) 
{
+
+                                if (diffIds == null)
+                                    diffFromAffinity.put(p, diffIds = 
U.newHashSet(3));
+
+                                diffIds.add(e.getKey());
+                            }
+                            else {
+                                if (diffIds != null && 
diffIds.remove(e.getKey())) {
+                                    if (diffIds.isEmpty())
+                                        diffFromAffinity.remove(p);
+                                }
+                            }
+                        }
+                    }
+
+                    diffFromAffinityVer = readyTopVer;
+                }
+
+                boolean changed = false;
+
+                GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId());
+
+                // Only in real exchange occurred.
+                if (exchangeVer != null &&
+                    nodeMap != null &&
+                    grp.persistenceEnabled() &&
+                    readyTopVer.initialized()) {
+                    for (Map.Entry<Integer, GridDhtPartitionState> e : 
nodeMap.entrySet()) {
+                        int p = e.getKey();
+                        GridDhtPartitionState state = e.getValue();
+
+                        if (state == OWNING) {
+                            GridDhtLocalPartition locPart = locParts.get(p);
+
+                            assert locPart != null : grp.cacheOrGroupName();
+
+                            if (locPart.state() == MOVING) {
+                                boolean success = locPart.own();
+
+                                assert success : locPart;
+
+                                changed |= success;
+                            }
+                        }
+                        else if (state == MOVING) {
+                            boolean haveHistory = !partsToReload.contains(p);
+
+                            rebalancePartition(p, haveHistory);
+
+                            changed = true;
+                        }
+                    }
+                }
+
+                long updateSeq = this.updateSeq.incrementAndGet();
+
+                if (readyTopVer.initialized() && 
readyTopVer.equals(lastTopChangeVer)) {
+                    AffinityAssignment aff = 
grp.affinity().readyAffinity(readyTopVer);
+
+                    if (exchangeVer == null)
+                        changed |= checkEvictions(updateSeq, aff);
+
+                    updateRebalanceVersion(aff.topologyVersion(), 
aff.assignment());
+                }
+
+                if (partSizes != null)
+                    this.globalPartSizes = partSizes;
+
+                consistencyCheck();
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Partition map after full update [grp=" + 
grp.cacheOrGroupName() +
+                        ", map=" + fullMapString() + ']');
+                }
+
+                if (log.isTraceEnabled() && exchangeVer != null) {
+                    log.trace("Partition states after full update [grp=" + 
grp.cacheOrGroupName()
+                        + ", exchVer=" + exchangeVer + ", states=" + 
dumpPartitionStates() + ']');
+                }
+
+                if (changed) {
+                    if (log.isDebugEnabled())
+                        log.debug("Partitions have been scheduled to resend 
[reason=" +
+                            "Full map update [grp" + grp.cacheOrGroupName() + 
"]");
+
+                    ctx.exchange().scheduleResendPartitions();
+                }
+
+                return changed;
+            } finally {
+                lock.writeLock().unlock();
+            }
+        }
+        finally {
+            ctx.database().checkpointReadUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void 
collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) {
+        assert cntrMap != null;
+
+        long now = U.currentTimeMillis();
+
+        lock.writeLock().lock();
+
+        try {
+            long acquired = U.currentTimeMillis();
+
+            if (acquired - now >= 100) {
+                if (timeLog.isInfoEnabled())
+                    timeLog.info("Waited too long to acquire topology write 
lock " +
+                        "[grp=" + grp.cacheOrGroupName() + ", waitTime=" + 
(acquired - now) + ']');
+            }
+
+            if (stopping)
+                return;
+
+            for (int i = 0; i < cntrMap.size(); i++) {
+                int pId = cntrMap.partitionAt(i);
+
+                long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i);
+                long updateCntr = cntrMap.updateCounterAt(i);
+
+                if (this.cntrMap.updateCounter(pId) < updateCntr) {
+                    this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr);
+                    this.cntrMap.updateCounter(pId, updateCntr);
+                }
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void applyUpdateCounters() {
+        long now = U.currentTimeMillis();
+
+        lock.writeLock().lock();
+
+        try {
+            long acquired = U.currentTimeMillis();
+
+            if (acquired - now >= 100) {
+                if (timeLog.isInfoEnabled())
+                    timeLog.info("Waited too long to acquire topology write 
lock " +
+                        "[grp=" + grp.cacheOrGroupName() + ", waitTime=" + 
(acquired - now) + ']');
+            }
+
+            if (stopping)
+                return;
+
+            for (int i = 0; i < locParts.length(); i++) {
+                GridDhtLocalPartition part = locParts.get(i);
+
+                if (part == null)
+                    continue;
+
+                boolean reserve = part.reserve();
+
+                try {
+                    GridDhtPartitionState state = part.state();
+
+                    if (!reserve || state == EVICTED || state == RENTING)
+                        continue;
+
+                    long updCntr = cntrMap.updateCounter(part.id());
+
+                    if (updCntr > part.updateCounter())
+                        part.updateCounter(updCntr);
+                    else if (part.updateCounter() > 0) {
+                        cntrMap.initialUpdateCounter(part.id(), 
part.initialUpdateCounter());
+                        cntrMap.updateCounter(part.id(), part.updateCounter());
+                    }
+                }
+                finally {
+                    if (reserve)
+                        part.release();
+                }
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Method checks is new partition map more stale than current partition map
+     * New partition map is stale if topology version or update sequence are 
less or equal than of current map
+     *
+     * @param currentMap Current partition map
+     * @param newMap New partition map
+     * @return True if new partition map is more stale than current partition 
map, false in other case
+     */
+    private boolean isStaleUpdate(GridDhtPartitionMap currentMap, 
GridDhtPartitionMap newMap) {
+        return currentMap != null && newMap.compareTo(currentMap) <= 0;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
+    @Override public boolean update(
+        @Nullable GridDhtPartitionExchangeId exchId,
+        GridDhtPartitionMap parts,
+        boolean force
+    ) {
+        if (log.isDebugEnabled()) {
+            log.debug("Updating single partition map [grp=" + 
grp.cacheOrGroupName() + ", exchId=" + exchId +
+                ", parts=" + mapString(parts) + ']');
+        }
+
+        if (!ctx.discovery().alive(parts.nodeId())) {
+            if (log.isTraceEnabled()) {
+                log.trace("Received partition update for non-existing node 
(will ignore) [grp=" + grp.cacheOrGroupName() +
+                    ", exchId=" + exchId + ", parts=" + parts + ']');
+            }
+
+            return false;
+        }
+
+        ctx.database().checkpointReadLock();
+
+        try {
+            lock.writeLock().lock();
+
+            try {
+                if (stopping)
+                    return false;
+
+                if (!force) {
+                    if (lastTopChangeVer.initialized() && exchId != null && 
lastTopChangeVer.compareTo(exchId.topologyVersion()) > 0) {
+                        U.warn(log, "Stale exchange id for single partition 
map update (will ignore) [" +
+                            "grp=" + grp.cacheOrGroupName() +
+                            ", lastTopChange=" + lastTopChangeVer +
+                            ", readTopVer=" + readyTopVer +
+                            ", exch=" + exchId.topologyVersion() + ']');
+
+                        return false;
+                    }
+                }
+
+                if (node2part == null)
+                    // Create invalid partition map.
+                    node2part = new GridDhtPartitionFullMap();
+
+                GridDhtPartitionMap cur = node2part.get(parts.nodeId());
+
+                if (force) {
+                    if (cur != null && cur.topologyVersion().initialized())
+                        parts.updateSequence(cur.updateSequence(), 
cur.topologyVersion());
+                }
+                else if (isStaleUpdate(cur, parts)) {
+                    assert cur != null;
+
+                    String msg = "Stale update for single partition map update 
(will ignore) [" +
+                        "nodeId=" + parts.nodeId() +
+                        ", grp=" + grp.cacheOrGroupName() +
+                        ", exchId=" + exchId +
+                        ", curMap=" + cur +
+                        ", newMap=" + parts + ']';
+
+                    // This is usual situation when partition maps are equal, 
just print debug message.
+                    if (cur.compareTo(parts) == 0) {
+                        if (log.isTraceEnabled())
+                            log.trace(msg);
+                    }
+                    else
+                        U.warn(log, msg);
+
+                    return false;
+                }
+
+                long updateSeq = this.updateSeq.incrementAndGet();
+
+                node2part.newUpdateSequence(updateSeq);
+
+                boolean changed = false;
+
+                if (cur == null || !cur.equals(parts))
+                    changed = true;
+
+                node2part.put(parts.nodeId(), parts);
+
+                // During exchange diff is calculated after all messages are 
received and affinity initialized.
+                if (exchId == null && !grp.isReplicated()) {
+                    if (readyTopVer.initialized() && 
readyTopVer.compareTo(diffFromAffinityVer) >= 0) {
+                        AffinityAssignment affAssignment = 
grp.affinity().readyAffinity(readyTopVer);
+
+                        // Add new mappings.
+                        for (Map.Entry<Integer, GridDhtPartitionState> e : 
parts.entrySet()) {
+                            int p = e.getKey();
+
+                            Set<UUID> diffIds = diffFromAffinity.get(p);
+
+                            if ((e.getValue() == MOVING || e.getValue() == 
OWNING || e.getValue() == RENTING)
+                                && 
!affAssignment.getIds(p).contains(parts.nodeId())) {
+                                if (diffIds == null)
+                                    diffFromAffinity.put(p, diffIds = 
U.newHashSet(3));
+
+                                if (diffIds.add(parts.nodeId()))
+                                    changed = true;
+                            }
+                            else {
+                                if (diffIds != null && 
diffIds.remove(parts.nodeId())) {
+                                    changed = true;
+
+                                    if (diffIds.isEmpty())
+                                        diffFromAffinity.remove(p);
+                                }
+                            }
+                        }
+
+                        // Remove obsolete mappings.
+                        if (cur != null) {
+                            for (Integer p : F.view(cur.keySet(), 
F0.notIn(parts.keySet()))) {
+                                Set<UUID> ids = diffFromAffinity.get(p);
+
+                                if (ids != null && ids.remove(parts.nodeId())) 
{
+                                    changed = true;
+
+                                    if (ids.isEmpty())
+                                        diffFromAffinity.remove(p);
+                                }
+                            }
+                        }
+
+                        diffFromAffinityVer = readyTopVer;
+                    }
+                }
+
+                if (readyTopVer.initialized() && 
readyTopVer.equals(lastTopChangeVer)) {
+                    AffinityAssignment aff = 
grp.affinity().readyAffinity(readyTopVer);
+
+                    if (exchId == null)
+                        changed |= checkEvictions(updateSeq, aff);
+
+                    updateRebalanceVersion(aff.topologyVersion(), 
aff.assignment());
+                }
+
+                consistencyCheck();
+
+                if (log.isDebugEnabled())
+                    log.debug("Partition map after single update [grp=" + 
grp.cacheOrGroupName() + ", map=" + fullMapString() + ']');
+
+                if (changed && exchId == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Partitions have been scheduled to resend 
[reason=" +
+                            "Single map update [grp" + grp.cacheOrGroupName() 
+ "]");
+
+                    ctx.exchange().scheduleResendPartitions();
+                }
+
+                return changed;
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
+        }
+        finally {
+            ctx.database().checkpointReadUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onExchangeDone(@Nullable 
GridDhtPartitionsExchangeFuture fut,
+        AffinityAssignment assignment,
+        boolean updateRebalanceVer) {
+        lock.writeLock().lock();
+
+        try {
+            assert !(topReadyFut instanceof GridDhtPartitionsExchangeFuture) ||
+                
assignment.topologyVersion().equals(((GridDhtPartitionsExchangeFuture)topReadyFut).context().events().topologyVersion());
+
+            readyTopVer = lastTopChangeVer = assignment.topologyVersion();
+
+            if (fut != null)
+                discoCache = fut.events().discoveryCache();
+
+            if (!grp.isReplicated()) {
+                boolean rebuildDiff = fut == null || fut.localJoinExchange() 
|| fut.serverNodeDiscoveryEvent() ||
+                    fut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || 
!diffFromAffinityVer.initialized();
+
+                if (rebuildDiff) {
+                    if 
(assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
+                        rebuildDiff(assignment);
+                }
+                else
+                    diffFromAffinityVer = readyTopVer;
+
+                if (!updateRebalanceVer)
+                    updateRebalanceVersion(assignment.topologyVersion(), 
assignment.assignment());
+            }
+
+            if (updateRebalanceVer)
+                updateRebalanceVersion(assignment.topologyVersion(), 
assignment.assignment());
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * @param aff Affinity.
+     */
+    private void createMovingPartitions(AffinityAssignment aff) {
+        for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
+            GridDhtPartitionMap map = e.getValue();
+
+            addMoving(map, aff.backupPartitions(e.getKey()));
+            addMoving(map, aff.primaryPartitions(e.getKey()));
+        }
+    }
+
+    /**
+     * @param map Node partition state map.
+     * @param parts Partitions assigned to node.
+     */
+    private void addMoving(GridDhtPartitionMap map, Set<Integer> parts) {
+        if (F.isEmpty(parts))
+            return;
+
+        for (Integer p : parts) {
+            GridDhtPartitionState state = map.get(p);
+
+            if (state == null || state == EVICTED)
+                map.put(p, MOVING);
+        }
+    }
+
+    /**
+     * Rebuilds {@link #diffFromAffinity} from given assignment.
+     *
+     * @param affAssignment New affinity assignment.
+     */
+    private void rebuildDiff(AffinityAssignment affAssignment) {
+        assert lock.isWriteLockedByCurrentThread();
+
+        if (node2part == null)
+            return;
+
+        if (FAST_DIFF_REBUILD) {
+            Collection<UUID> affNodes = 
F.nodeIds(ctx.discovery().cacheGroupAffinityNodes(grp.groupId(),
+                affAssignment.topologyVersion()));
+
+            for (Map.Entry<Integer, Set<UUID>> e : 
diffFromAffinity.entrySet()) {
+                int p = e.getKey();
+
+                Iterator<UUID> iter = e.getValue().iterator();
+
+                while (iter.hasNext()) {
+                    UUID nodeId = iter.next();
+
+                    if (!affNodes.contains(nodeId) || 
affAssignment.getIds(p).contains(nodeId))
+                        iter.remove();
+                }
+            }
+        }
+        else {
+            for (Map.Entry<UUID, GridDhtPartitionMap> e : 
node2part.entrySet()) {
+                UUID nodeId = e.getKey();
+
+                for (Map.Entry<Integer, GridDhtPartitionState> e0 : 
e.getValue().entrySet()) {
+                    Integer p0 = e0.getKey();
+
+                    GridDhtPartitionState state = e0.getValue();
+
+                    Set<UUID> ids = diffFromAffinity.get(p0);
+
+                    if ((state == MOVING || state == OWNING || state == 
RENTING) && !affAssignment.getIds(p0).contains(nodeId)) {
+                        if (ids == null)
+                            diffFromAffinity.put(p0, ids = U.newHashSet(3));
+
+                        ids.add(nodeId);
+                    }
+                    else {
+                        if (ids != null)
+                            ids.remove(nodeId);
+                    }
+                }
+            }
+        }
+
+        diffFromAffinityVer = affAssignment.topologyVersion();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean detectLostPartitions(AffinityTopologyVersion 
resTopVer, DiscoveryEvent discoEvt) {
+        ctx.database().checkpointReadLock();
+
+        try {
+            lock.writeLock().lock();
+
+            try {
+                if (node2part == null)
+                    return false;
+
+                int parts = grp.affinity().partitions();
+
+                Set<Integer> lost = new HashSet<>(parts);
+
+                for (int p = 0; p < parts; p++)
+                    lost.add(p);
+
+                for (GridDhtPartitionMap partMap : node2part.values()) {
+                    for (Map.Entry<Integer, GridDhtPartitionState> e : 
partMap.entrySet()) {
+                        if (e.getValue() == OWNING) {
+                            lost.remove(e.getKey());
+
+                            if (lost.isEmpty())
+                                break;
+                        }
+                    }
+                }
+
+                boolean changed = false;
+
+                if (!F.isEmpty(lost)) {
+                    PartitionLossPolicy plc = 
grp.config().getPartitionLossPolicy();
+
+                    assert plc != null;
+
+                    Set<Integer> recentlyLost = new HashSet<>();
+
+                    for (Map.Entry<UUID, GridDhtPartitionMap> leftEntry : 
leftNode2Part.entrySet()) {
+                        for (Map.Entry<Integer, GridDhtPartitionState> entry : 
leftEntry.getValue().entrySet()) {
+                            if (entry.getValue() == OWNING)
+                                recentlyLost.add(entry.getKey());
+                        }
+                    }
+
+                    // Update partition state on all nodes.
+                    for (Integer part : lost) {
+                        long updSeq = updateSeq.incrementAndGet();
+
+                        GridDhtLocalPartition locPart = localPartition(part, 
resTopVer, false, true);
+
+                        if (locPart != null) {
+                            if (locPart.state() == LOST)
+                                continue;
+
+                            boolean marked = plc == PartitionLossPolicy.IGNORE 
? locPart.own() : locPart.markLost();
+
+                            if (marked)
+                                updateLocal(locPart.id(), locPart.state(), 
updSeq, resTopVer);
+
+                            changed |= marked;
+                        }
+                        // Update map for remote node.
+                        else if (plc != PartitionLossPolicy.IGNORE) {
+                            for (Map.Entry<UUID, GridDhtPartitionMap> e : 
node2part.entrySet()) {
+                                if (e.getKey().equals(ctx.localNodeId()))
+                                    continue;
+
+                                if (e.getValue().get(part) != EVICTED)
+                                    e.getValue().put(part, LOST);
+                            }
+                        }
+
+                        if (recentlyLost.contains(part) && 
grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                            grp.addRebalanceEvent(part,
+                                EVT_CACHE_REBALANCE_PART_DATA_LOST,
+                                discoEvt.eventNode(),
+                                discoEvt.type(),
+                                discoEvt.timestamp());
+                        }
+                    }
+
+                    if (plc != PartitionLossPolicy.IGNORE)
+                        grp.needsRecovery(true);
+                }
+
+                leftNode2Part.clear();
+
+                return changed;
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
+        }
+        finally {
+            ctx.database().checkpointReadUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resetLostPartitions(AffinityTopologyVersion 
resTopVer) {
+        ctx.database().checkpointReadLock();
+
+        try {
+            lock.writeLock().lock();
+
+            try {
+                long updSeq = updateSeq.incrementAndGet();
+
+                for (Map.Entry<UUID, GridDhtPartitionMap> e : 
node2part.entrySet()) {
+                    for (Map.Entry<Integer, GridDhtPartitionState> e0 : 
e.getValue().entrySet()) {
+                        if (e0.getValue() != LOST)
+                            continue;
+
+                        e0.setValue(OWNING);
+
+                        GridDhtLocalPartition locPart = 
localPartition(e0.getKey(), resTopVer, false);
+
+                        if (locPart != null && locPart.state() == LOST) {
+                            boolean marked = locPart.own();
+
+                            if (marked)
+                                updateLocal(locPart.id(), locPart.state(), 
updSeq, resTopVer);
+                        }
+                    }
+                }
+
+                checkEvictions(updSeq, 
grp.affinity().readyAffinity(resTopVer));
+
+                grp.needsRecovery(false);
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
+        }
+        finally {
+            ctx.database().checkpointReadUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Integer> lostPartitions() {
+        if (grp.config().getPartitionLossPolicy() == 
PartitionLossPolicy.IGNORE)
+            return Collections.emptySet();
+
+        lock.readLock().lock();
+
+        try {
+            Set<Integer> res = null;
+
+            int parts = grp.affinity().partitions();
+
+            for (GridDhtPartitionMap partMap : node2part.values()) {
+                for (Map.Entry<Integer, GridDhtPartitionState> e : 
partMap.entrySet()) {
+                    if (e.getValue() == LOST) {
+                        if (res == null)
+                            res = new HashSet<>(parts);
+
+                        res.add(e.getKey());
+                    }
+                }
+            }
+
+            return res == null ? Collections.<Integer>emptySet() : res;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, 
Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory) {
+        Map<UUID, Set<Integer>> result = new HashMap<>();
+
+        ctx.database().checkpointReadLock();
+
+        try {
+            lock.writeLock().lock();
+
+            try {
+                // First process local partitions.
+                for (Map.Entry<Integer, Set<UUID>> entry : 
ownersByUpdCounters.entrySet()) {
+                    int part = entry.getKey();
+                    Set<UUID> newOwners = entry.getValue();
+
+                    GridDhtLocalPartition locPart = localPartition(part);
+
+                    if (locPart == null || locPart.state() != OWNING)
+                        continue;
+
+                    if (!newOwners.contains(ctx.localNodeId())) {
+                        rebalancePartition(part, haveHistory.contains(part));
+
+                        result.computeIfAbsent(ctx.localNodeId(), n -> new 
HashSet<>());
+                        result.get(ctx.localNodeId()).add(part);
+                    }
+                }
+
+                // Then process remote partitions.
+                for (Map.Entry<Integer, Set<UUID>> entry : 
ownersByUpdCounters.entrySet()) {
+                    int part = entry.getKey();
+                    Set<UUID> newOwners = entry.getValue();
+
+                    for (Map.Entry<UUID, GridDhtPartitionMap> remotes : 
node2part.entrySet()) {
+                        UUID remoteNodeId = remotes.getKey();
+                        GridDhtPartitionMap partMap = remotes.getValue();
+
+                        GridDhtPartitionState state = partMap.get(part);
+
+                        if (state == null || state != OWNING)
+                            continue;
+
+                        if (!newOwners.contains(remoteNodeId)) {
+                            partMap.put(part, MOVING);
+
+                            partMap.updateSequence(partMap.updateSequence() + 
1, partMap.topologyVersion());
+
+                            if (partMap.nodeId().equals(ctx.localNodeId()))
+                                
updateSeq.setIfGreater(partMap.updateSequence());
+
+                            result.computeIfAbsent(remoteNodeId, n -> new 
HashSet<>());
+                            result.get(remoteNodeId).add(part);
+                        }
+                    }
+                }
+
+                for (Map.Entry<UUID, Set<Integer>> entry : result.entrySet()) {
+                    UUID nodeId = entry.getKey();
+                    Set<Integer> rebalancedParts = entry.getValue();
+
+                    if (!rebalancedParts.isEmpty()) {
+                        Set<Integer> historical = rebalancedParts.stream()
+                            .filter(haveHistory::contains)
+                            .collect(Collectors.toSet());
+
+                        // Filter out partitions having WAL history.
+                        rebalancedParts.removeAll(historical);
+
+                        U.warn(log, "Partitions have been scheduled for 
rebalancing due to outdated update counter "
+                            + "[grp=" + grp.cacheOrGroupName()
+                            + ", nodeId=" + nodeId
+                            + ", partsFull=" + S.compact(rebalancedParts)
+                            + ", partsHistorical=" + S.compact(historical) + 
"]");
+                    }
+                }
+
+                node2part = new GridDhtPartitionFullMap(node2part, 
updateSeq.incrementAndGet());
+            } finally {
+                lock.writeLock().unlock();
+            }
+        }
+        finally {
+            ctx.database().checkpointReadUnlock();
+        }
+
+        return result;
+    }
+
+    /**
+     * Prepares given partition {@code p} for rebalance.
+     * Changes partition state to MOVING and starts clearing if needed.
+     * Prevents ongoing renting if required.
+     *
+     * @param p Partition id.
+     * @param haveHistory If {@code true} there is WAL history to rebalance 
partition,
+     *                    in other case partition will be cleared for full 
rebalance.
+     */
+    private GridDhtLocalPartition rebalancePartition(int p, boolean 
haveHistory) {
+        GridDhtLocalPartition part = getOrCreatePartition(p);
+
+        // Prevent renting.
+        if (part.state() == RENTING) {
+            if (part.reserve()) {
+                part.moving();
+                part.release();
+            }
+            else {
+                assert part.state() == EVICTED : part;
+
+                part = getOrCreatePartition(p);
+            }
+        }
+
+        if (part.state() != MOVING)
+            part.moving();
+
+        if (!haveHistory)
+            part.clearAsync();
+
+        assert part.state() == MOVING : part;
+
+        return part;
+    }
+
+    /**
+     * Finds local partitions which don't belong to affinity and runs eviction 
process for such partitions.
+     *
+     * @param updateSeq Update sequence.
+     * @param aff Affinity assignments.
+     * @return {@code True} if there are local partitions need to be evicted.
+     */
+    private boolean checkEvictions(long updateSeq, AffinityAssignment aff) {
+        if (!ctx.kernalContext().state().evictionsAllowed())
+            return false;
+
+        boolean hasEvictedPartitions = false;
+
+        UUID locId = ctx.localNodeId();
+
+        List<IgniteInternalFuture<?>> rentingFutures = new ArrayLis

<TRUNCATED>

Reply via email to