http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
new file mode 100644
index 0000000..253a56a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -0,0 +1,1576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
+import 
org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import 
org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridIterator;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.util.deque.FastSizeDeque;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
+import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
+import static 
org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
+
+/**
+ * Key partition.
+ */
+public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl 
implements Comparable<GridDhtLocalPartition>, GridReservable {
+    /** */
+    private static final GridCacheMapEntryFactory ENTRY_FACTORY = new 
GridCacheMapEntryFactory() {
+        @Override public GridCacheMapEntry create(
+            GridCacheContext ctx,
+            AffinityTopologyVersion topVer,
+            KeyCacheObject key
+        ) {
+            return new GridDhtCacheEntry(ctx, topVer, key);
+        }
+    };
+
+    /** Maximum size for delete queue. */
+    public static final int MAX_DELETE_QUEUE_SIZE = 
Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000);
+
+    /** ONLY FOR TEST PURPOSES: force test checkpoint on partition eviction. */
+    private static boolean forceTestCheckpointOnEviction = 
IgniteSystemProperties.getBoolean("TEST_CHECKPOINT_ON_EVICTION", false);
+
+    /** ONLY FOR TEST PURPOSES: partition id where test checkpoint was 
enforced during eviction. */
+    static volatile Integer partWhereTestCheckpointEnforced;
+
+    /** Maximum size for {@link #rmvQueue}. */
+    private final int rmvQueueMaxSize;
+
+    /** Removed items TTL. */
+    private final long rmvdEntryTtl;
+
+    /** Static logger to avoid re-creation. */
+    private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
+
+    /** Logger. */
+    private static volatile IgniteLogger log;
+
+    /** Partition ID. */
+    private final int id;
+
+    /** State. 32 bits - size, 16 bits - reservations, 13 bits - reserved, 3 
bits - GridDhtPartitionState. */
+    @GridToStringExclude
+    private final AtomicLong state = new AtomicLong((long)MOVING.ordinal() << 
32);
+
+    /** Evict guard. Must be CASed to -1 only when partition state is EVICTED. 
*/
+    @GridToStringExclude
+    private final AtomicInteger evictGuard = new AtomicInteger();
+
+    /** Rent future. */
+    @GridToStringExclude
+    private final GridFutureAdapter<?> rent;
+
+    /** Clear future. */
+    @GridToStringExclude
+    private final ClearFuture clearFuture;
+
+    /** */
+    @GridToStringExclude
+    private final GridCacheSharedContext ctx;
+
+    /** */
+    @GridToStringExclude
+    private final CacheGroupContext grp;
+
+    /** Create time. */
+    @GridToStringExclude
+    private final long createTime = U.currentTimeMillis();
+
+    /** Lock. */
+    @GridToStringExclude
+    private final ReentrantLock lock = new ReentrantLock();
+
+    /** */
+    @GridToStringExclude
+    private final ConcurrentMap<Integer, CacheMapHolder> cacheMaps;
+
+    /** */
+    @GridToStringExclude
+    private final CacheMapHolder singleCacheEntryMap;
+
+    /** Remove queue. */
+    @GridToStringExclude
+    private final FastSizeDeque<RemovedEntryHolder> rmvQueue = new 
FastSizeDeque<>(new ConcurrentLinkedDeque<>());
+
+    /** Group reservations. */
+    @GridToStringExclude
+    private final CopyOnWriteArrayList<GridDhtPartitionsReservation> 
reservations = new CopyOnWriteArrayList<>();
+
+    /** */
+    @GridToStringExclude
+    private volatile CacheDataStore store;
+
+    /** Set if failed to move partition to RENTING state due to reservations, 
to be checked when
+     * reservation is released. */
+    private volatile boolean delayedRenting;
+
+    /** Set if partition must be cleared in MOVING state. */
+    private volatile boolean clear;
+
+    /** Set if topology update sequence should be updated on partition 
destroy. */
+    private boolean updateSeqOnDestroy;
+
+    /**
+     * @param ctx Context.
+     * @param grp Cache group.
+     * @param id Partition ID.
+     */
+    @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
+    public GridDhtLocalPartition(GridCacheSharedContext ctx,
+        CacheGroupContext grp,
+        int id) {
+        super(ENTRY_FACTORY);
+
+        this.id = id;
+        this.ctx = ctx;
+        this.grp = grp;
+
+        log = U.logger(ctx.kernalContext(), logRef, this);
+
+        if (grp.sharedGroup()) {
+            singleCacheEntryMap = null;
+            cacheMaps = new ConcurrentHashMap<>();
+        }
+        else {
+            singleCacheEntryMap = new CacheMapHolder(grp.singleCacheContext(), 
createEntriesMap());
+            cacheMaps = null;
+        }
+
+        rent = new GridFutureAdapter<Object>() {
+            @Override public String toString() {
+                return "PartitionRentFuture [part=" + 
GridDhtLocalPartition.this + ']';
+            }
+        };
+
+        clearFuture = new ClearFuture();
+
+        int delQueueSize = grp.systemCache() ? 100 :
+            Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20);
+
+        rmvQueueMaxSize = U.ceilPow2(delQueueSize);
+
+        rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000);
+
+        try {
+            store = grp.offheap().createCacheDataStore(id);
+
+            // Log partition creation for further crash recovery purposes.
+            if (grp.walEnabled())
+                ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, 
state(), updateCounter()));
+
+            // Inject row cache cleaner on store creation
+            // Used in case the cache with enabled SqlOnheapCache is single 
cache at the cache group
+            if (ctx.kernalContext().query().moduleEnabled()) {
+                GridQueryRowCacheCleaner cleaner = 
ctx.kernalContext().query().getIndexing()
+                    .rowCacheCleaner(grp.groupId());
+
+                if (store != null && cleaner != null)
+                    store.setRowCacheCleaner(cleaner);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            // TODO ignite-db
+            throw new IgniteException(e);
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Partition has been created [grp=" + 
grp.cacheOrGroupName()
+                + ", p=" + id + ", state=" + state() + "]");
+    }
+
+    /**
+     * @return Entries map.
+     */
+    private ConcurrentMap<KeyCacheObject, GridCacheMapEntry> 
createEntriesMap() {
+        return new ConcurrentHashMap<>(Math.max(10, 
GridCacheAdapter.DFLT_START_CACHE_SIZE / grp.affinity().partitions()),
+            0.75f,
+            Runtime.getRuntime().availableProcessors() * 2);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int internalSize() {
+        if (grp.sharedGroup()) {
+            int size = 0;
+
+            for (CacheMapHolder hld : cacheMaps.values())
+                size += hld.map.size();
+
+            return size;
+        }
+
+        return singleCacheEntryMap.map.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMapHolder entriesMap(GridCacheContext cctx) {
+        if (grp.sharedGroup())
+            return cacheMapHolder(cctx);
+
+        return singleCacheEntryMap;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer 
cacheId) {
+        return grp.sharedGroup() ? cacheMaps.get(cacheId) : 
singleCacheEntryMap;
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @return Map holder.
+     */
+    private CacheMapHolder cacheMapHolder(GridCacheContext cctx) {
+        assert grp.sharedGroup();
+
+        CacheMapHolder hld = cacheMaps.get(cctx.cacheIdBoxed());
+
+        if (hld != null)
+            return hld;
+
+        CacheMapHolder  old = cacheMaps.putIfAbsent(cctx.cacheIdBoxed(), hld = 
new CacheMapHolder(cctx, createEntriesMap()));
+
+        if (old != null)
+            hld = old;
+
+        return hld;
+    }
+
+    /**
+     * @return Data store.
+     */
+    public CacheDataStore dataStore() {
+        return store;
+    }
+
+    /**
+     * Adds group reservation to this partition.
+     *
+     * @param r Reservation.
+     * @return {@code false} If such reservation already added.
+     */
+    public boolean addReservation(GridDhtPartitionsReservation r) {
+        assert (getPartState(state.get())) != EVICTED : "we can reserve only 
active partitions";
+        assert (getReservations(state.get())) != 0 : "partition must be 
already reserved before adding group reservation";
+
+        return reservations.addIfAbsent(r);
+    }
+
+    /**
+     * @param r Reservation.
+     */
+    public void removeReservation(GridDhtPartitionsReservation r) {
+        if (!reservations.remove(r))
+            throw new IllegalStateException("Reservation was already 
removed.");
+    }
+
+    /**
+     * @return Partition ID.
+     */
+    public int id() {
+        return id;
+    }
+
+    /**
+     * @return Create time.
+     */
+    public long createTime() {
+        return createTime;
+    }
+
+    /**
+     * @return Partition state.
+     */
+    public GridDhtPartitionState state() {
+        return getPartState(state.get());
+    }
+
+    /**
+     * @return Reservations.
+     */
+    public int reservations() {
+        return getReservations(state.get());
+    }
+
+    /**
+     * @return {@code True} if partition is empty.
+     */
+    public boolean isEmpty() {
+        return store.fullSize() == 0 && internalSize() == 0;
+    }
+
+    /**
+     * @return If partition is moving or owning or renting.
+     */
+    public boolean valid() {
+        GridDhtPartitionState state = state();
+
+        return state == MOVING || state == OWNING || state == RENTING;
+    }
+
+    /**
+     * @param entry Entry to remove.
+     */
+    public void onRemoved(GridDhtCacheEntry entry) {
+        assert entry.obsolete() : entry;
+
+        // Make sure to remove exactly this entry.
+        removeEntry(entry);
+
+        // Attempt to evict.
+        tryContinueClearing();
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     * @param ver Version.
+     */
+    private void removeVersionedEntry(int cacheId, KeyCacheObject key, 
GridCacheVersion ver) {
+        CacheMapHolder hld = grp.sharedGroup() ? cacheMaps.get(cacheId) : 
singleCacheEntryMap;
+
+        GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null;
+
+        if (entry != null && entry.markObsoleteVersion(ver))
+            removeEntry(entry);
+    }
+
+    /**
+     *
+     */
+    public void cleanupRemoveQueue() {
+        while (rmvQueue.sizex() >= rmvQueueMaxSize) {
+            RemovedEntryHolder item = rmvQueue.pollFirst();
+
+            if (item != null)
+                removeVersionedEntry(item.cacheId(), item.key(), 
item.version());
+        }
+
+        if (!grp.isDrEnabled()) {
+            RemovedEntryHolder item = rmvQueue.peekFirst();
+
+            while (item != null && item.expireTime() < U.currentTimeMillis()) {
+                item = rmvQueue.pollFirst();
+
+                if (item == null)
+                    break;
+
+                removeVersionedEntry(item.cacheId(), item.key(), 
item.version());
+
+                item = rmvQueue.peekFirst();
+            }
+        }
+    }
+
+    /**
+     * @param cacheId cacheId Cache ID.
+     * @param key Removed key.
+     * @param ver Removed version.
+     */
+    public void onDeferredDelete(int cacheId, KeyCacheObject key, 
GridCacheVersion ver) {
+        cleanupRemoveQueue();
+
+        rmvQueue.add(new RemovedEntryHolder(cacheId, key, ver, rmvdEntryTtl));
+    }
+
+    /**
+     * Locks partition.
+     */
+    @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
+    public void lock() {
+        lock.lock();
+    }
+
+    /**
+     * Unlocks partition.
+     */
+    public void unlock() {
+        lock.unlock();
+    }
+
+    /**
+     * Reserves a partition so it won't be cleared or evicted.
+     *
+     * @return {@code True} if reserved.
+     */
+    @Override public boolean reserve() {
+        while (true) {
+            long state = this.state.get();
+
+            if (getPartState(state) == EVICTED)
+                return false;
+
+            long newState = setReservations(state, getReservations(state) + 1);
+
+            if (this.state.compareAndSet(state, newState))
+                return true;
+        }
+    }
+
+    /**
+     * Releases previously reserved partition.
+     */
+    @Override public void release() {
+        release0(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void release(int sizeChange, CacheMapHolder hld, 
GridCacheEntryEx e) {
+        if (grp.sharedGroup() && sizeChange != 0)
+            hld.size.addAndGet(sizeChange);
+
+        release0(sizeChange);
+    }
+
+    /**
+     * @param sizeChange Size change delta.
+     */
+    private void release0(int sizeChange) {
+        while (true) {
+            long state = this.state.get();
+
+            int reservations = getReservations(state);
+
+            if (reservations == 0)
+                return;
+
+            assert getPartState(state) != EVICTED : getPartState(state);
+
+            long newState = setReservations(state, --reservations);
+            newState = setSize(newState, getSize(newState) + sizeChange);
+
+            assert getSize(newState) == getSize(state) + sizeChange;
+
+            // Decrement reservations.
+            if (this.state.compareAndSet(state, newState)) {
+                // If no more reservations try to continue delayed renting or 
clearing process.
+                if (reservations == 0) {
+                    if (delayedRenting)
+                        rent(true);
+                    else
+                        tryContinueClearing();
+                }
+
+                break;
+            }
+        }
+    }
+
+    /**
+     * @param stateToRestore State to restore.
+     */
+    public void restoreState(GridDhtPartitionState stateToRestore) {
+        state.set(setPartState(state.get(), stateToRestore));
+    }
+
+    /**
+     * For testing purposes only.
+     * @param toState State to set.
+     */
+    public void setState(GridDhtPartitionState toState) {
+        if (grp.persistenceEnabled() && grp.walEnabled()) {
+            synchronized (this) {
+                long state0 = state.get();
+
+                this.state.compareAndSet(state0, setPartState(state0, 
toState));
+
+                try {
+                    ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), 
id, toState, updateCounter()));
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Error while writing to log", e);
+                }
+            }
+        }
+        else
+            restoreState(toState);
+    }
+
+    /**
+     * @param state Current aggregated value.
+     * @param toState State to switch to.
+     * @return {@code true} if cas succeeds.
+     */
+    private boolean casState(long state, GridDhtPartitionState toState) {
+        if (grp.persistenceEnabled() && grp.walEnabled()) {
+            synchronized (this) {
+                GridDhtPartitionState prevState = state();
+
+                boolean update = this.state.compareAndSet(state, 
setPartState(state, toState));
+
+                if (update) {
+                    try {
+                        ctx.wal().log(new 
PartitionMetaStateRecord(grp.groupId(), id, toState, updateCounter()));
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to log partition state change to 
WAL.", e);
+                    }
+
+                    if (log.isDebugEnabled())
+                        log.debug("Partition changed state [grp=" + 
grp.cacheOrGroupName()
+                            + ", p=" + id + ", prev=" + prevState + ", to=" + 
toState + "]");
+                }
+
+                return update;
+            }
+        }
+        else {
+            GridDhtPartitionState prevState = state();
+
+            boolean update = this.state.compareAndSet(state, 
setPartState(state, toState));
+
+            if (update) {
+                if (log.isDebugEnabled())
+                    log.debug("Partition changed state [grp=" + 
grp.cacheOrGroupName()
+                        + ", p=" + id + ", prev=" + prevState + ", to=" + 
toState + "]");
+            }
+
+            return update;
+        }
+    }
+
+    /**
+     * @return {@code True} if transitioned to OWNING state.
+     */
+    public boolean own() {
+        while (true) {
+            long state = this.state.get();
+
+            GridDhtPartitionState partState = getPartState(state);
+
+            if (partState == RENTING || partState == EVICTED)
+                return false;
+
+            if (partState == OWNING)
+                return true;
+
+            assert partState == MOVING || partState == LOST;
+
+            if (casState(state, OWNING))
+                return true;
+        }
+    }
+
+    /**
+     * Forcibly moves partition to a MOVING state.
+     */
+    public void moving() {
+        while (true) {
+            long state = this.state.get();
+
+            GridDhtPartitionState partState = getPartState(state);
+
+            assert partState == OWNING || partState == RENTING : "Only 
partitions in state OWNING or RENTING can be moved to MOVING state";
+
+            if (casState(state, MOVING))
+                break;
+        }
+    }
+
+    /**
+     * @return {@code True} if partition state changed.
+     */
+    public boolean markLost() {
+        while (true) {
+            long state = this.state.get();
+
+            GridDhtPartitionState partState = getPartState(state);
+
+            if (partState == LOST)
+                return false;
+
+            if (casState(state, LOST))
+                return true;
+        }
+    }
+
+    /**
+     * Initiates partition eviction process.
+     *
+     * If partition has reservations, eviction will be delayed and continued 
after all reservations will be released.
+     *
+     * @param updateSeq If {@code true} topology update sequence will be 
updated after eviction is finished.
+     * @return Future to signal that this node is no longer an owner or backup.
+     */
+    public IgniteInternalFuture<?> rent(boolean updateSeq) {
+        long state0 = this.state.get();
+
+        GridDhtPartitionState partState = getPartState(state0);
+
+        if (partState == RENTING || partState == EVICTED)
+            return rent;
+
+        delayedRenting = true;
+
+        if (getReservations(state0) == 0 && casState(state0, RENTING)) {
+            delayedRenting = false;
+
+            // Evict asynchronously, as the 'rent' method may be called
+            // from within write locks on local partition.
+            clearAsync0(updateSeq);
+        }
+
+        return rent;
+    }
+
+    /**
+     * Starts clearing process asynchronously if it's requested and not 
running at the moment.
+     * Method may finish clearing process ahead of time if partition is empty 
and doesn't have reservations.
+     *
+     * @param updateSeq Update sequence.
+     */
+    private void clearAsync0(boolean updateSeq) {
+        long state = this.state.get();
+
+        GridDhtPartitionState partState = getPartState(state);
+
+        boolean evictionRequested = partState == RENTING || delayedRenting;
+        boolean clearingRequested = partState == MOVING && clear;
+
+        if (!evictionRequested && !clearingRequested)
+            return;
+
+        boolean reinitialized = clearFuture.initialize(updateSeq, 
evictionRequested);
+
+        // Clearing process is already running at the moment. No needs to run 
it again.
+        if (!reinitialized)
+            return;
+
+        // Try fast eviction.
+        if (freeAndEmpty(state) && !grp.queriesEnabled() && !groupReserved()) {
+            if (partState == RENTING && casState(state, EVICTED) || 
clearingRequested) {
+                clearFuture.finish();
+
+                if (state() == EVICTED && markForDestroy()) {
+                    updateSeqOnDestroy = updateSeq;
+
+                    destroy();
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Partition has been fast evicted [grp=" + 
grp.cacheOrGroupName()
+                        + ", p=" + id + ", state=" + state() + "]");
+
+                return;
+            }
+        }
+
+        ctx.evict().evictPartitionAsync(grp,this);
+    }
+
+    /**
+     * Initiates single clear process if partition is in MOVING state or 
continues cleaning for RENTING state.
+     * Method does nothing if clear process is already running.
+     */
+    public void clearAsync() {
+        GridDhtPartitionState state0 = state();
+
+        if (state0 != MOVING && state0 != RENTING)
+            return;
+
+        clear = true;
+
+        clearAsync0(false);
+    }
+
+    /**
+     * Continues delayed clearing of partition if possible.
+     * Clearing may be delayed because of existing reservations.
+     */
+    public void tryContinueClearing() {
+        clearAsync0(true);
+    }
+
+    /**
+     * @return {@code true} If there is a group reservation.
+     */
+    private boolean groupReserved() {
+        for (GridDhtPartitionsReservation reservation : reservations) {
+            if (!reservation.invalidate())
+                return true; // Failed to invalidate reservation -> we are 
reserved.
+        }
+
+        return false;
+    }
+
+    /**
+     * @param state State.
+     * @return {@code True} if partition has no reservations and empty.
+     */
+    private boolean freeAndEmpty(long state) {
+        return isEmpty() && getSize(state) == 0 && getReservations(state) == 0;
+    }
+
+    /**
+     * @return {@code true} if evicting thread was added.
+     */
+    private boolean addEvicting() {
+        while (true) {
+            int cnt = evictGuard.get();
+
+            if (cnt != 0)
+                return false;
+
+            if (evictGuard.compareAndSet(cnt, cnt + 1))
+                return true;
+        }
+    }
+
+    /**
+     * @return {@code true} if no thread evicting partition at the moment.
+     */
+    private boolean clearEvicting() {
+        boolean free;
+
+        while (true) {
+            int cnt = evictGuard.get();
+
+            assert cnt > 0;
+
+            if (evictGuard.compareAndSet(cnt, cnt - 1)) {
+                free = cnt == 1;
+
+                break;
+            }
+        }
+
+        return free;
+    }
+
+    /**
+     * @return {@code True} if partition is safe to destroy.
+     */
+    public boolean markForDestroy() {
+        while (true) {
+            int cnt = evictGuard.get();
+
+            if (cnt != 0)
+                return false;
+
+            if (evictGuard.compareAndSet(0, -1))
+                return true;
+        }
+    }
+
+    /**
+     * Moves partition state to {@code EVICTED} if possible.
+     * and initiates partition destroy process after successful moving 
partition state to {@code EVICTED} state.
+     *
+     * @param updateSeq If {@code true} increment update sequence on cache 
group topology after successful eviction.
+     */
+    private void finishEviction(boolean updateSeq) {
+        long state0 = this.state.get();
+
+        GridDhtPartitionState state = getPartState(state0);
+
+        if (state == EVICTED || (freeAndEmpty(state0) && state == RENTING && 
casState(state0, EVICTED)))
+            updateSeqOnDestroy = updateSeq;
+    }
+
+    /**
+     * Destroys partition data store and invokes appropriate callbacks.
+     */
+    public void destroy() {
+        assert state() == EVICTED : this;
+        assert evictGuard.get() == -1;
+
+        grp.onPartitionEvicted(id);
+
+        destroyCacheDataStore();
+
+        rent.onDone();
+
+        ((GridDhtPreloader)grp.preloader()).onPartitionEvicted(this, 
updateSeqOnDestroy);
+
+        clearDeferredDeletes();
+    }
+
+    /**
+     * Awaits completion of partition destroy process in case of {@code 
EVICTED} partition state.
+     */
+    public void awaitDestroy() {
+        if (state() != EVICTED)
+            return;
+
+        final long timeout = 10_000;
+
+        for (;;) {
+            try {
+                rent.get(timeout);
+
+                break;
+            }
+            catch (IgniteFutureTimeoutCheckedException ignored) {
+                U.warn(log, "Failed to await partition destroy within timeout 
" + this);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException("Failed to await partition destroy " 
+ this, e);
+            }
+        }
+    }
+
+    /**
+     * Adds listener on {@link #clearFuture} finish.
+     *
+     * @param lsnr Listener.
+     */
+    public void onClearFinished(IgniteInClosure<? super 
IgniteInternalFuture<?>> lsnr) {
+        clearFuture.listen(lsnr);
+    }
+
+    /**
+     * @return {@code True} if clearing process is running at the moment on 
the partition.
+     */
+    public boolean isClearing() {
+        return !clearFuture.isDone();
+    }
+
+    /**
+     * Tries to start partition clear process {@link 
GridDhtLocalPartition#clearAll(EvictionContext)}).
+     * Only one thread is allowed to do such process concurrently.
+     * At the end of clearing method completes {@code clearFuture}.
+     *
+     * @return {@code false} if clearing is not started due to existing 
reservations.
+     * @throws NodeStoppingException If node is stopping.
+     */
+    public boolean tryClear(EvictionContext evictionCtx) throws 
NodeStoppingException {
+        if (clearFuture.isDone())
+            return true;
+
+        long state = this.state.get();
+
+        if (getReservations(state) != 0 || groupReserved())
+            return false;
+
+        if (addEvicting()) {
+            try {
+                // Attempt to evict partition entries from cache.
+                long clearedEntities = clearAll(evictionCtx);
+
+                if (log.isDebugEnabled())
+                    log.debug("Partition has been cleared [grp=" + 
grp.cacheOrGroupName()
+                        + ", p=" + id + ", state=" + state() + ", clearedCnt=" 
+ clearedEntities + "]");
+            }
+            catch (NodeStoppingException e) {
+                clearFuture.finish(e);
+
+                throw e;
+            }
+            finally {
+                boolean free = clearEvicting();
+
+                if (free)
+                    clearFuture.finish();
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Release created data store for this partition.
+     */
+    private void destroyCacheDataStore() {
+        try {
+            grp.offheap().destroyCacheDataStore(dataStore());
+        }
+        catch (IgniteCheckedException e) {
+            log.error("Unable to destroy cache data store on partition 
eviction [id=" + id + "]", e);
+        }
+    }
+
+    /**
+     * On partition unlock callback.
+     * Tries to continue delayed partition clearing.
+     */
+    public void onUnlock() {
+        tryContinueClearing();
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return {@code True} if local node is primary for this partition.
+     */
+    public boolean primary(AffinityTopologyVersion topVer) {
+        List<ClusterNode> nodes = 
grp.affinity().cachedAffinity(topVer).get(id);
+
+        return !nodes.isEmpty() && ctx.localNode().equals(nodes.get(0));
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return {@code True} if local node is backup for this partition.
+     */
+    public boolean backup(AffinityTopologyVersion topVer) {
+        List<ClusterNode> nodes = 
grp.affinity().cachedAffinity(topVer).get(id);
+
+        return nodes.indexOf(ctx.localNode()) > 0;
+    }
+
+    /**
+     * @param cacheId ID of cache initiated counter update.
+     * @param topVer Topology version for current operation.
+     * @return Next update index.
+     */
+    public long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer, 
boolean primary, @Nullable Long primaryCntr) {
+        long nextCntr = store.nextUpdateCounter();
+
+        if (grp.sharedGroup())
+            grp.onPartitionCounterUpdate(cacheId, id, primaryCntr != null ? 
primaryCntr : nextCntr, topVer, primary);
+
+        return nextCntr;
+    }
+
+    /**
+     * @return Current update counter.
+     */
+    public long updateCounter() {
+        return store.updateCounter();
+    }
+
+    /**
+     * @param val Update counter value.
+     */
+    public void updateCounter(long val) {
+        store.updateCounter(val);
+    }
+
+    /**
+     * @return Initial update counter.
+     */
+    public long initialUpdateCounter() {
+        return store.initialUpdateCounter();
+    }
+
+    /**
+     * @param val Initial update counter value.
+     */
+    public void initialUpdateCounter(long val) {
+        store.updateInitialCounter(val);
+    }
+
+    /**
+     * Updates MVCC cache update counter on primary node.
+     *
+     * @param delta Value to be added to update counter.
+     * @return Update counter value before update.
+     */
+    public long getAndIncrementUpdateCounter(long delta) {
+        return store.getAndIncrementUpdateCounter(delta);
+    }
+
+    /**
+     * Updates MVCC cache update counter on backup node.
+     *
+     * @param start Start position
+     * @param delta Delta.
+     */
+    public void updateCounter(long start, long delta) {
+         store.updateCounter(start, delta);
+    }
+
+    /**
+     * @return Total size of all caches.
+     */
+    public long fullSize() {
+        return store.fullSize();
+    }
+
+    /**
+     * Removes all entries and rows from this partition.
+     *
+     * @return Number of rows cleared from page memory.
+     * @throws NodeStoppingException If node stopping.
+     */
+    private long clearAll(EvictionContext evictionCtx) throws 
NodeStoppingException {
+        GridCacheVersion clearVer = ctx.versions().next();
+
+        GridCacheObsoleteEntryExtras extras = new 
GridCacheObsoleteEntryExtras(clearVer);
+
+        boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
+
+        if (grp.sharedGroup()) {
+            for (CacheMapHolder hld : cacheMaps.values())
+                clear(hld.map, extras, rec);
+        }
+        else
+            clear(singleCacheEntryMap.map, extras, rec);
+
+        long cleared = 0;
+
+        final int stopCheckingFreq = 1000;
+
+        CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
+
+        try {
+            GridIterator<CacheDataRow> it0 = 
grp.offheap().partitionIterator(id);
+
+            while (it0.hasNext()) {
+                ctx.database().checkpointReadLock();
+
+                try {
+                    CacheDataRow row = it0.next();
+
+                    // Do not clear fresh rows in case of single partition 
clearing.
+                    if (row.version().compareTo(clearVer) >= 0 && (state() == 
MOVING && clear))
+                        continue;
+
+                    if (grp.sharedGroup() && (hld == null || 
hld.cctx.cacheId() != row.cacheId()))
+                        hld = cacheMapHolder(ctx.cacheContext(row.cacheId()));
+
+                    assert hld != null;
+
+                    GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(
+                        hld,
+                        hld.cctx,
+                        grp.affinity().lastVersion(),
+                        row.key(),
+                        true,
+                        false);
+
+                    if (cached instanceof GridDhtCacheEntry && 
((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
+                        removeEntry(cached);
+
+                        if (rec && !hld.cctx.config().isEventsDisabled()) {
+                            hld.cctx.events().addEvent(cached.partition(),
+                                cached.key(),
+                                ctx.localNodeId(),
+                                (IgniteUuid)null,
+                                null,
+                                EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+                                null,
+                                false,
+                                cached.rawGet(),
+                                cached.hasValue(),
+                                null,
+                                null,
+                                null,
+                                false);
+                        }
+
+                        cleared++;
+                    }
+
+                    // For each 'stopCheckingFreq' cleared entities check 
clearing process to stop.
+                    if (cleared % stopCheckingFreq == 0 && 
evictionCtx.shouldStop())
+                        return cleared;
+                }
+                catch (GridDhtInvalidPartitionException e) {
+                    assert isEmpty() && state() == EVICTED : "Invalid error 
[e=" + e + ", part=" + this + ']';
+
+                    break; // Partition is already concurrently cleared and 
evicted.
+                }
+                finally {
+                    ctx.database().checkpointReadUnlock();
+                }
+            }
+
+            if (forceTestCheckpointOnEviction) {
+                if (partWhereTestCheckpointEnforced == null && cleared >= 
fullSize()) {
+                    
ctx.database().forceCheckpoint("test").finishFuture().get();
+
+                    log.warning("Forced checkpoint by test reasons for 
partition: " + this);
+
+                    partWhereTestCheckpointEnforced = id;
+                }
+            }
+        }
+        catch (NodeStoppingException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to get iterator for evicted partition: " + 
id);
+
+            throw e;
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to get iterator for evicted partition: " + 
id, e);
+        }
+
+        return cleared;
+    }
+
+    /**
+     * Removes all cache entries from specified {@code map}.
+     *
+     * @param map Map to clear.
+     * @param extras Obsolete extras.
+     * @param evt Unload event flag.
+     * @throws NodeStoppingException If current node is stopping.
+     */
+    private void clear(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map,
+        GridCacheObsoleteEntryExtras extras,
+        boolean evt) throws NodeStoppingException {
+        Iterator<GridCacheMapEntry> it = map.values().iterator();
+
+        while (it.hasNext()) {
+            GridCacheMapEntry cached = null;
+
+            ctx.database().checkpointReadLock();
+
+            try {
+                cached = it.next();
+
+                if (cached instanceof GridDhtCacheEntry && 
((GridDhtCacheEntry)cached).clearInternal(extras.obsoleteVersion(), extras)) {
+                    removeEntry(cached);
+
+                    if (!cached.isInternal()) {
+                        if (evt) {
+                            grp.addCacheEvent(cached.partition(),
+                                cached.key(),
+                                ctx.localNodeId(),
+                                EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+                                null,
+                                false,
+                                cached.rawGet(),
+                                cached.hasValue(),
+                                false);
+                        }
+                    }
+                }
+            }
+            catch (GridDhtInvalidPartitionException e) {
+                assert isEmpty() && state() == EVICTED : "Invalid error [e=" + 
e + ", part=" + this + ']';
+
+                break; // Partition is already concurrently cleared and 
evicted.
+            }
+            catch (NodeStoppingException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to clear cache entry for evicted 
partition: " + cached.partition());
+
+                throw e;
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to clear cache entry for evicted 
partition: " + cached, e);
+            }
+            finally {
+                ctx.database().checkpointReadUnlock();
+            }
+        }
+    }
+
+    /**
+     * Removes all deferred delete requests from {@code rmvQueue}.
+     */
+    private void clearDeferredDeletes() {
+        for (RemovedEntryHolder e : rmvQueue)
+            removeVersionedEntry(e.cacheId(), e.key(), e.version());
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"OverlyStrongTypeCast"})
+    @Override public boolean equals(Object obj) {
+        return obj instanceof GridDhtLocalPartition && (obj == this || 
((GridDhtLocalPartition)obj).id() == id);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(@NotNull GridDhtLocalPartition part) {
+        if (part == null)
+            return 1;
+
+        return Integer.compare(id, part.id());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtLocalPartition.class, this,
+            "grp", grp.cacheOrGroupName(),
+            "state", state(),
+            "reservations", reservations(),
+            "empty", isEmpty(),
+            "createTime", U.format(createTime));
+    }
+
+    /** {@inheritDoc} */
+    @Override public int publicSize(int cacheId) {
+        if (grp.sharedGroup()) {
+            CacheMapHolder hld = cacheMaps.get(cacheId);
+
+            return hld != null ? hld.size.get() : 0;
+        }
+
+        return getSize(state.get());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrementPublicSize(@Nullable CacheMapHolder hld, 
GridCacheEntryEx e) {
+        if (grp.sharedGroup()) {
+            if (hld == null)
+                hld = cacheMapHolder(e.context());
+
+            hld.size.incrementAndGet();
+        }
+
+        while (true) {
+            long state = this.state.get();
+
+            if (this.state.compareAndSet(state, setSize(state, getSize(state) 
+ 1)))
+                return;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decrementPublicSize(@Nullable CacheMapHolder hld, 
GridCacheEntryEx e) {
+        if (grp.sharedGroup()) {
+            if (hld == null)
+                hld = cacheMapHolder(e.context());
+
+            hld.size.decrementAndGet();
+        }
+
+        while (true) {
+            long state = this.state.get();
+
+            assert getPartState(state) != EVICTED;
+
+            if (this.state.compareAndSet(state, setSize(state, getSize(state) 
- 1)))
+                return;
+        }
+    }
+
+    /**
+     * Returns group context.
+     *
+     * @return Group context.
+     */
+    public CacheGroupContext group() {
+        return grp;
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     */
+    public void onCacheStopped(int cacheId) {
+        assert grp.sharedGroup() : grp.cacheOrGroupName();
+
+        for (Iterator<RemovedEntryHolder> it = rmvQueue.iterator(); 
it.hasNext();) {
+            RemovedEntryHolder e = it.next();
+
+            if (e.cacheId() == cacheId)
+                it.remove();
+        }
+
+        cacheMaps.remove(cacheId);
+    }
+
+    /**
+     * @param state Composite state.
+     * @return Partition state.
+     */
+    private static GridDhtPartitionState getPartState(long state) {
+        return GridDhtPartitionState.fromOrdinal((int)(state & 
(0x0000000000000007L)));
+    }
+
+    /**
+     * @param state Composite state to update.
+     * @param partState Partition state.
+     * @return Updated composite state.
+     */
+    private static long setPartState(long state, GridDhtPartitionState 
partState) {
+        return (state & (~0x0000000000000007L)) | partState.ordinal();
+    }
+
+    /**
+     * @param state Composite state.
+     * @return Reservations.
+     */
+    private static int getReservations(long state) {
+        return (int)((state & 0x00000000FFFF0000L) >> 16);
+    }
+
+    /**
+     * @param state Composite state to update.
+     * @param reservations Reservations to set.
+     * @return Updated composite state.
+     */
+    private static long setReservations(long state, int reservations) {
+        return (state & (~0x00000000FFFF0000L)) | (reservations << 16);
+    }
+
+    /**
+     * @param state Composite state.
+     * @return Size.
+     */
+    private static int getSize(long state) {
+        return (int)((state & 0xFFFFFFFF00000000L) >> 32);
+    }
+
+    /**
+     * @param state Composite state to update.
+     * @param size Size to set.
+     * @return Updated composite state.
+     */
+    private static long setSize(long state, int size) {
+        return (state & (~0xFFFFFFFF00000000L)) | ((long)size << 32);
+    }
+
+    /**
+     * Removed entry holder.
+     */
+    private static class RemovedEntryHolder {
+        /** */
+        private final int cacheId;
+
+        /** Cache key */
+        private final KeyCacheObject key;
+
+        /** Entry version */
+        private final GridCacheVersion ver;
+
+        /** Entry expire time. */
+        private final long expireTime;
+
+        /**
+         * @param cacheId Cache ID.
+         * @param key Key.
+         * @param ver Entry version.
+         * @param ttl TTL.
+         */
+        private RemovedEntryHolder(int cacheId, KeyCacheObject key, 
GridCacheVersion ver, long ttl) {
+            this.cacheId = cacheId;
+            this.key = key;
+            this.ver = ver;
+
+            expireTime = U.currentTimeMillis() + ttl;
+        }
+
+        /**
+         * @return Cache ID.
+         */
+        int cacheId() {
+            return cacheId;
+        }
+
+        /**
+         * @return Key.
+         */
+        KeyCacheObject key() {
+            return key;
+        }
+
+        /**
+         * @return Version.
+         */
+        GridCacheVersion version() {
+            return ver;
+        }
+
+        /**
+         * @return item expired time
+         */
+        long expireTime() {
+            return expireTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemovedEntryHolder.class, this);
+        }
+    }
+
+    /**
+     * Future is needed to control partition clearing process.
+     * Future can be used both for single clearing or eviction processes.
+     */
+    class ClearFuture extends GridFutureAdapter<Boolean> {
+        /** Flag indicates that eviction callback was registered on the 
current future. */
+        private volatile boolean evictionCbRegistered;
+
+        /** Flag indicates that clearing callback was registered on the 
current future. */
+        private volatile boolean clearingCbRegistered;
+
+        /** Flag indicates that future with all callbacks was finished. */
+        private volatile boolean finished;
+
+        /**
+         * Constructor.
+         */
+        ClearFuture() {
+            onDone();
+            finished = true;
+        }
+
+        /**
+         * Registers finish eviction callback on the future.
+         *
+         * @param updateSeq If {@code true} update topology sequence after 
successful eviction.
+         */
+        private void registerEvictionCallback(boolean updateSeq) {
+            if (evictionCbRegistered)
+                return;
+
+            synchronized (this) {
+                // Double check
+                if (evictionCbRegistered)
+                    return;
+
+                evictionCbRegistered = true;
+
+                // Initiates partition eviction and destroy.
+                listen(f -> {
+                    try {
+                        // Check for errors.
+                        f.get();
+
+                        finishEviction(updateSeq);
+                    }
+                    catch (Exception e) {
+                        rent.onDone(e);
+                    }
+
+                    evictionCbRegistered = false;
+                });
+            }
+        }
+
+        /**
+         * Registers clearing callback on the future.
+         */
+        private void registerClearingCallback() {
+            if (clearingCbRegistered)
+                return;
+
+            synchronized (this) {
+                // Double check
+                if (clearingCbRegistered)
+                    return;
+
+                clearingCbRegistered = true;
+
+                // Recreate cache data store in case of allowed fast eviction, 
and reset clear flag.
+                listen(f -> {
+                    clear = false;
+
+                    clearingCbRegistered = false;
+                });
+            }
+        }
+
+        /**
+         * Successfully finishes the future.
+         */
+        public void finish() {
+            synchronized (this) {
+                onDone();
+                finished = true;
+            }
+        }
+
+        /**
+         * Finishes the future with error.
+         *
+         * @param t Error.
+         */
+        public void finish(Throwable t) {
+            synchronized (this) {
+                onDone(t);
+                finished = true;
+            }
+        }
+
+        /**
+         * Reuses future if it's done.
+         * Adds appropriate callbacks to the future in case of eviction or 
single clearing.
+         *
+         * @param updateSeq Update sequence.
+         * @param evictionRequested If {@code true} adds eviction callback, in 
other case adds single clearing callback.
+         * @return {@code true} if future has been reinitialized.
+         */
+        public boolean initialize(boolean updateSeq, boolean 
evictionRequested) {
+            // In case of running clearing just try to add missing callbacks 
to avoid extra synchronization.
+            if (!finished) {
+                if (evictionRequested)
+                    registerEvictionCallback(updateSeq);
+                else
+                    registerClearingCallback();
+
+                return false;
+            }
+
+            synchronized (this) {
+                boolean done = isDone();
+
+                if (done) {
+                    reset();
+
+                    finished = false;
+                    evictionCbRegistered = false;
+                    clearingCbRegistered = false;
+                }
+
+                if (evictionRequested)
+                    registerEvictionCallback(updateSeq);
+                else
+                    registerClearingCallback();
+
+                return done;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionState.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionState.java
new file mode 100644
index 0000000..4cbce4b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionState.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Partition states.
+ */
+public enum GridDhtPartitionState {
+    /** Partition is being loaded from another node. */
+    MOVING,
+
+    /** This node is either a primary or backup owner. */
+    OWNING,
+
+    /** This node is neither primary or back up owner. */
+    RENTING,
+
+    /** Partition has been evicted from cache. */
+    EVICTED,
+
+    /** Partition state is invalid, partition should not be used. */
+    LOST;
+
+    /** Enum values. */
+    private static final GridDhtPartitionState[] VALS = values();
+
+    /**
+     * @param ord Ordinal value.
+     * @return Enum value.
+     */
+    @Nullable public static GridDhtPartitionState fromOrdinal(int ord) {
+        return ord < 0 || ord >= VALS.length ? null : VALS[ord];
+    }
+
+    /**
+     * @return {@code True} if state is active or owning.
+     */
+    public boolean active() {
+        return this != EVICTED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
new file mode 100644
index 0000000..b6cb5bb
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * DHT partition topology.
+ */
+@GridToStringExclude
+public interface GridDhtPartitionTopology {
+    /**
+     * @return  Total cache partitions.
+     */
+    public int partitions();
+
+    /**
+     * Locks the topology, usually during mapping on locks or transactions.
+     */
+    public void readLock();
+
+    /**
+     * Unlocks topology locked by {@link #readLock()} method.
+     */
+    public void readUnlock();
+
+    /**
+     * @return {@code True} if locked by current thread.
+     */
+    public boolean holdsLock();
+    /**
+     * Updates topology version.
+     *
+     * @param exchFut Exchange future.
+     * @param discoCache Discovery data cache.
+     * @param updateSeq Update sequence.
+     * @param stopping Stopping flag.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    public void updateTopologyVersion(
+        GridDhtTopologyFuture exchFut,
+        DiscoCache discoCache,
+        MvccCoordinator mvccCrd,
+        long updateSeq,
+        boolean stopping
+    ) throws IgniteInterruptedCheckedException;
+
+    /**
+     * @return Result topology version of last finished exchange.
+     */
+    public AffinityTopologyVersion readyTopologyVersion();
+
+    /**
+     * @return Start topology version of last exchange.
+     */
+    public AffinityTopologyVersion lastTopologyChangeVersion();
+
+    /**
+     * Gets a future that will be completed when partition exchange map for 
this
+     * particular topology version is done.
+     *
+     * @return Topology version ready future.
+     */
+    public GridDhtTopologyFuture topologyVersionFuture();
+
+    /**
+     * @return {@code True} if cache is being stopped.
+     */
+    public boolean stopping();
+
+    /**
+     * @return Cache group ID.
+     */
+    public int groupId();
+
+    /**
+     * Pre-initializes this topology.
+     *
+     * @param exchFut Exchange future.
+     * @param affReady Affinity ready flag.
+     * @param updateMoving
+     * @throws IgniteCheckedException If failed.
+     */
+    public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut,
+        boolean affReady,
+        boolean updateMoving)
+        throws IgniteCheckedException;
+
+    /**
+     * @param affVer Affinity version.
+     * @param exchFut Exchange future.
+     * @return {@code True} if partitions must be refreshed.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion 
affVer, GridDhtPartitionsExchangeFuture exchFut)
+        throws IgniteInterruptedCheckedException;
+
+    /**
+     * Initializes local data structures after partitions are restored from 
persistence.
+     *
+     * @param topVer Topology version.
+     */
+    public void afterStateRestored(AffinityTopologyVersion topVer);
+
+    /**
+     * Post-initializes this topology.
+     *
+     * @param exchFut Exchange future.
+     * @return {@code True} if mapping was changed.
+     * @throws IgniteCheckedException If failed.
+     */
+    public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) 
throws IgniteCheckedException;
+
+    /**
+     * @param topVer Topology version at the time of creation.
+     * @param p Partition ID.
+     * @param create If {@code true}, then partition will be created if it's 
not there.
+     * @return Local partition.
+     * @throws GridDhtInvalidPartitionException If partition is evicted or 
absent and
+     *      does not belong to this node.
+     */
+    @Nullable public GridDhtLocalPartition localPartition(int p, 
AffinityTopologyVersion topVer, boolean create)
+        throws GridDhtInvalidPartitionException;
+
+    /**
+     * Unconditionally creates partition during restore of persisted partition 
state.
+     *
+     * @param p Partition ID.
+     * @return Partition.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridDhtLocalPartition forceCreatePartition(int p) throws 
IgniteCheckedException;
+
+    /**
+     * @param topVer Topology version at the time of creation.
+     * @param p Partition ID.
+     * @param create If {@code true}, then partition will be created if it's 
not there.
+     * @return Local partition.
+     * @throws GridDhtInvalidPartitionException If partition is evicted or 
absent and
+     *      does not belong to this node.
+     */
+    @Nullable public GridDhtLocalPartition localPartition(int p, 
AffinityTopologyVersion topVer, boolean create,
+        boolean showRenting)
+        throws GridDhtInvalidPartitionException;
+
+    /**
+     * @param parts Partitions to release (should be reserved before).
+     */
+    public void releasePartitions(int... parts);
+
+    /**
+     * @param part Partition number.
+     * @return Local partition.
+     * @throws GridDhtInvalidPartitionException If partition is evicted or 
absent and
+     *      does not belong to this node.
+     */
+    @Nullable public GridDhtLocalPartition localPartition(int part)
+        throws GridDhtInvalidPartitionException;
+
+    /**
+     * @return All local partitions by copying them into another list.
+     */
+    public List<GridDhtLocalPartition> localPartitions();
+
+    /**
+     *
+     * @return All current active local partitions.
+     */
+    public Iterable<GridDhtLocalPartition> currentLocalPartitions();
+
+    /**
+     * @return Local IDs.
+     */
+    public GridDhtPartitionMap localPartitionMap();
+
+    /**
+     * @param nodeId Node ID.
+     * @param part Partition.
+     * @return Partition state.
+     */
+    public GridDhtPartitionState partitionState(UUID nodeId, int part);
+
+    /**
+     * @return Current update sequence.
+     */
+    public long updateSequence();
+
+    /**
+     * @param p Partition ID.
+     * @param topVer Topology version.
+     * @return Collection of all nodes responsible for this partition with 
primary node being first.
+     */
+    public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer);
+
+    /**
+     * @param p Partition ID.
+     * @param affAssignment Assignments.
+     * @param affNodes Node assigned for given partition by affinity.
+     * @return Collection of all nodes responsible for this partition with 
primary node being first. The first N
+     *      elements of this collection (with N being 1 + backups) are actual 
DHT affinity nodes, other nodes
+     *      are current additional owners of the partition after topology 
change.
+     */
+    @Nullable public List<ClusterNode> nodes(int p, AffinityAssignment 
affAssignment, List<ClusterNode> affNodes);
+
+    /**
+     * @param p Partition ID.
+     * @return Collection of all nodes who {@code own} this partition.
+     */
+    public List<ClusterNode> owners(int p);
+
+    /**
+     * @return List indexed by partition number, each list element is 
collection of all nodes who
+     *      owns corresponding partition.
+     */
+    public List<List<ClusterNode>> allOwners();
+
+    /**
+     * @param p Partition ID.
+     * @param topVer Topology version.
+     * @return Collection of all nodes who {@code own} this partition.
+     */
+    public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer);
+
+    /**
+     * @param p Partition ID.
+     * @return Collection of all nodes who {@code are preloading} this 
partition.
+     */
+    public List<ClusterNode> moving(int p);
+
+    /**
+     * @param onlyActive If {@code true}, then only {@code active} partitions 
will be returned.
+     * @return Node IDs mapped to partitions.
+     */
+    public GridDhtPartitionFullMap partitionMap(boolean onlyActive);
+
+    /**
+     * @return {@code True} If one of cache nodes has partitions in {@link 
GridDhtPartitionState#MOVING} state.
+     */
+    public boolean hasMovingPartitions();
+
+    /**
+     * @param e Entry removed from cache.
+     */
+    public void onRemoved(GridDhtCacheEntry e);
+
+    /**
+     * @param exchangeResVer Result topology version for exchange. Value 
should be greater than previously passed. Null value
+     *      means full map received is not related to exchange
+     * @param partMap Update partition map.
+     * @param cntrMap Partition update counters.
+     * @param partsToReload Set of partitions that need to be reloaded.
+     * @param msgTopVer Topology version from incoming message. This value is 
not null only for case message is not
+     *      related to exchange. Value should be not less than previous 
'Topology version from exchange'.
+     * @return {@code True} if local state was changed.
+     */
+    public boolean update(
+        @Nullable AffinityTopologyVersion exchangeResVer,
+        GridDhtPartitionFullMap partMap,
+        @Nullable CachePartitionFullCountersMap cntrMap,
+        Set<Integer> partsToReload,
+        @Nullable Map<Integer, Long> partSizes,
+        @Nullable AffinityTopologyVersion msgTopVer);
+
+    /**
+     * @param exchId Exchange ID.
+     * @param parts Partitions.
+     * @param force {@code True} to skip stale update check.
+     * @return {@code True} if local state was changed.
+     */
+    public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
+        GridDhtPartitionMap parts,
+        boolean force);
+
+    /**
+     * Collects update counters collected during exchange. Called on 
coordinator.
+     *
+     * @param cntrMap Counters map.
+     */
+    public void collectUpdateCounters(CachePartitionPartialCountersMap 
cntrMap);
+
+    /**
+     * Applies update counters collected during exchange on coordinator. 
Called on coordinator.
+     */
+    public void applyUpdateCounters();
+
+    /**
+     * Checks if there is at least one owner for each partition in the cache 
topology.
+     * If not, marks such a partition as LOST.
+     * <p>
+     * This method should be called on topology coordinator after all 
partition messages are received.
+     *
+     * @param resTopVer Exchange result version.
+     * @param discoEvt Discovery event for which we detect lost partitions.
+     * @return {@code True} if partitions state got updated.
+     */
+    public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, 
DiscoveryEvent discoEvt);
+
+    /**
+     * Resets the state of all LOST partitions to OWNING.
+     *
+     * @param resTopVer Exchange result version.
+     */
+    public void resetLostPartitions(AffinityTopologyVersion resTopVer);
+
+    /**
+     * @return Collection of lost partitions, if any.
+     */
+    public Collection<Integer> lostPartitions();
+
+    /**
+     * @return Partition update counters.
+     */
+    public CachePartitionFullCountersMap fullUpdateCounters();
+
+    /**
+     * @return Partition update counters.
+     */
+    public CachePartitionPartialCountersMap localUpdateCounters(boolean 
skipZeros);
+
+    /**
+     * @return Partition cache sizes.
+     */
+    public Map<Integer, Long> partitionSizes();
+
+    /**
+     * @param part Partition to own.
+     * @return {@code True} if owned.
+     */
+    public boolean own(GridDhtLocalPartition part);
+
+    /**
+     * Owns all moving partitions for the given topology version.
+     *
+     * @param topVer Topology version.
+     */
+    public void ownMoving(AffinityTopologyVersion topVer);
+
+    /**
+     * @param part Evicted partition.
+     * @param updateSeq Update sequence increment flag.
+     */
+    public void onEvicted(GridDhtLocalPartition part, boolean updateSeq);
+
+    /**
+     * @param nodeId Node to get partitions for.
+     * @return Partitions for node.
+     */
+    @Nullable public GridDhtPartitionMap partitions(UUID nodeId);
+
+    /**
+     * Prints memory stats.
+     *
+     * @param threshold Threshold for number of entries.
+     */
+    public void printMemoryStats(int threshold);
+
+    /**
+     * @return Sizes of up-to-date partition versions in topology.
+     */
+    Map<Integer, Long> globalPartSizes();
+
+    /**
+     * @param partSizes Sizes of up-to-date partition versions in topology.
+     */
+    void globalPartSizes(@Nullable Map<Integer, Long> partSizes);
+
+    /**
+     * @param topVer Topology version.
+     * @return {@code True} if rebalance process finished.
+     */
+    public boolean rebalanceFinished(AffinityTopologyVersion topVer);
+
+    /**
+     * Calculates nodes and partitions which have non-actual state and must be 
rebalanced.
+     * State of all current owners that aren't contained in the given {@code 
ownersByUpdCounters} will be reset to MOVING.
+     *
+     * @param ownersByUpdCounters Map (partition, set of node IDs that have 
most actual state about partition
+     *                            (update counter is maximal) and should hold 
OWNING state for such partition).
+     * @param haveHistory Set of partitions which have WAL history to 
rebalance.
+     * @return Map (nodeId, set of partitions that should be rebalanced 
<b>fully</b> by this node).
+     */
+    public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> 
ownersByUpdCounters, Set<Integer> haveHistory);
+
+    /**
+     * Callback on exchange done.
+     *
+     * @param assignment New affinity assignment.
+     * @param updateRebalanceVer {@code True} if need check rebalance state.
+     */
+    public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, 
AffinityAssignment assignment, boolean updateRebalanceVer);
+
+    public MvccCoordinator mvccCoordinator();
+}

Reply via email to