http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java new file mode 100644 index 0000000..253a56a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java @@ -0,0 +1,1576 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.topology; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridIterator; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.util.deque.FastSizeDeque; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED; +import static org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING; + +/** + * Key partition. + */ +public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements Comparable<GridDhtLocalPartition>, GridReservable { + /** */ + private static final GridCacheMapEntryFactory ENTRY_FACTORY = new GridCacheMapEntryFactory() { + @Override public GridCacheMapEntry create( + GridCacheContext ctx, + AffinityTopologyVersion topVer, + KeyCacheObject key + ) { + return new GridDhtCacheEntry(ctx, topVer, key); + } + }; + + /** Maximum size for delete queue. */ + public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000); + + /** ONLY FOR TEST PURPOSES: force test checkpoint on partition eviction. */ + private static boolean forceTestCheckpointOnEviction = IgniteSystemProperties.getBoolean("TEST_CHECKPOINT_ON_EVICTION", false); + + /** ONLY FOR TEST PURPOSES: partition id where test checkpoint was enforced during eviction. */ + static volatile Integer partWhereTestCheckpointEnforced; + + /** Maximum size for {@link #rmvQueue}. */ + private final int rmvQueueMaxSize; + + /** Removed items TTL. */ + private final long rmvdEntryTtl; + + /** Static logger to avoid re-creation. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + private static volatile IgniteLogger log; + + /** Partition ID. */ + private final int id; + + /** State. 32 bits - size, 16 bits - reservations, 13 bits - reserved, 3 bits - GridDhtPartitionState. */ + @GridToStringExclude + private final AtomicLong state = new AtomicLong((long)MOVING.ordinal() << 32); + + /** Evict guard. Must be CASed to -1 only when partition state is EVICTED. */ + @GridToStringExclude + private final AtomicInteger evictGuard = new AtomicInteger(); + + /** Rent future. */ + @GridToStringExclude + private final GridFutureAdapter<?> rent; + + /** Clear future. */ + @GridToStringExclude + private final ClearFuture clearFuture; + + /** */ + @GridToStringExclude + private final GridCacheSharedContext ctx; + + /** */ + @GridToStringExclude + private final CacheGroupContext grp; + + /** Create time. */ + @GridToStringExclude + private final long createTime = U.currentTimeMillis(); + + /** Lock. */ + @GridToStringExclude + private final ReentrantLock lock = new ReentrantLock(); + + /** */ + @GridToStringExclude + private final ConcurrentMap<Integer, CacheMapHolder> cacheMaps; + + /** */ + @GridToStringExclude + private final CacheMapHolder singleCacheEntryMap; + + /** Remove queue. */ + @GridToStringExclude + private final FastSizeDeque<RemovedEntryHolder> rmvQueue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>()); + + /** Group reservations. */ + @GridToStringExclude + private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>(); + + /** */ + @GridToStringExclude + private volatile CacheDataStore store; + + /** Set if failed to move partition to RENTING state due to reservations, to be checked when + * reservation is released. */ + private volatile boolean delayedRenting; + + /** Set if partition must be cleared in MOVING state. */ + private volatile boolean clear; + + /** Set if topology update sequence should be updated on partition destroy. */ + private boolean updateSeqOnDestroy; + + /** + * @param ctx Context. + * @param grp Cache group. + * @param id Partition ID. + */ + @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") + public GridDhtLocalPartition(GridCacheSharedContext ctx, + CacheGroupContext grp, + int id) { + super(ENTRY_FACTORY); + + this.id = id; + this.ctx = ctx; + this.grp = grp; + + log = U.logger(ctx.kernalContext(), logRef, this); + + if (grp.sharedGroup()) { + singleCacheEntryMap = null; + cacheMaps = new ConcurrentHashMap<>(); + } + else { + singleCacheEntryMap = new CacheMapHolder(grp.singleCacheContext(), createEntriesMap()); + cacheMaps = null; + } + + rent = new GridFutureAdapter<Object>() { + @Override public String toString() { + return "PartitionRentFuture [part=" + GridDhtLocalPartition.this + ']'; + } + }; + + clearFuture = new ClearFuture(); + + int delQueueSize = grp.systemCache() ? 100 : + Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20); + + rmvQueueMaxSize = U.ceilPow2(delQueueSize); + + rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000); + + try { + store = grp.offheap().createCacheDataStore(id); + + // Log partition creation for further crash recovery purposes. + if (grp.walEnabled()) + ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, state(), updateCounter())); + + // Inject row cache cleaner on store creation + // Used in case the cache with enabled SqlOnheapCache is single cache at the cache group + if (ctx.kernalContext().query().moduleEnabled()) { + GridQueryRowCacheCleaner cleaner = ctx.kernalContext().query().getIndexing() + .rowCacheCleaner(grp.groupId()); + + if (store != null && cleaner != null) + store.setRowCacheCleaner(cleaner); + } + } + catch (IgniteCheckedException e) { + // TODO ignite-db + throw new IgniteException(e); + } + + if (log.isDebugEnabled()) + log.debug("Partition has been created [grp=" + grp.cacheOrGroupName() + + ", p=" + id + ", state=" + state() + "]"); + } + + /** + * @return Entries map. + */ + private ConcurrentMap<KeyCacheObject, GridCacheMapEntry> createEntriesMap() { + return new ConcurrentHashMap<>(Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / grp.affinity().partitions()), + 0.75f, + Runtime.getRuntime().availableProcessors() * 2); + } + + /** {@inheritDoc} */ + @Override public int internalSize() { + if (grp.sharedGroup()) { + int size = 0; + + for (CacheMapHolder hld : cacheMaps.values()) + size += hld.map.size(); + + return size; + } + + return singleCacheEntryMap.map.size(); + } + + /** {@inheritDoc} */ + @Override protected CacheMapHolder entriesMap(GridCacheContext cctx) { + if (grp.sharedGroup()) + return cacheMapHolder(cctx); + + return singleCacheEntryMap; + } + + /** {@inheritDoc} */ + @Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer cacheId) { + return grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap; + } + + /** + * @param cctx Cache context. + * @return Map holder. + */ + private CacheMapHolder cacheMapHolder(GridCacheContext cctx) { + assert grp.sharedGroup(); + + CacheMapHolder hld = cacheMaps.get(cctx.cacheIdBoxed()); + + if (hld != null) + return hld; + + CacheMapHolder old = cacheMaps.putIfAbsent(cctx.cacheIdBoxed(), hld = new CacheMapHolder(cctx, createEntriesMap())); + + if (old != null) + hld = old; + + return hld; + } + + /** + * @return Data store. + */ + public CacheDataStore dataStore() { + return store; + } + + /** + * Adds group reservation to this partition. + * + * @param r Reservation. + * @return {@code false} If such reservation already added. + */ + public boolean addReservation(GridDhtPartitionsReservation r) { + assert (getPartState(state.get())) != EVICTED : "we can reserve only active partitions"; + assert (getReservations(state.get())) != 0 : "partition must be already reserved before adding group reservation"; + + return reservations.addIfAbsent(r); + } + + /** + * @param r Reservation. + */ + public void removeReservation(GridDhtPartitionsReservation r) { + if (!reservations.remove(r)) + throw new IllegalStateException("Reservation was already removed."); + } + + /** + * @return Partition ID. + */ + public int id() { + return id; + } + + /** + * @return Create time. + */ + public long createTime() { + return createTime; + } + + /** + * @return Partition state. + */ + public GridDhtPartitionState state() { + return getPartState(state.get()); + } + + /** + * @return Reservations. + */ + public int reservations() { + return getReservations(state.get()); + } + + /** + * @return {@code True} if partition is empty. + */ + public boolean isEmpty() { + return store.fullSize() == 0 && internalSize() == 0; + } + + /** + * @return If partition is moving or owning or renting. + */ + public boolean valid() { + GridDhtPartitionState state = state(); + + return state == MOVING || state == OWNING || state == RENTING; + } + + /** + * @param entry Entry to remove. + */ + public void onRemoved(GridDhtCacheEntry entry) { + assert entry.obsolete() : entry; + + // Make sure to remove exactly this entry. + removeEntry(entry); + + // Attempt to evict. + tryContinueClearing(); + } + + /** + * @param cacheId Cache ID. + * @param key Key. + * @param ver Version. + */ + private void removeVersionedEntry(int cacheId, KeyCacheObject key, GridCacheVersion ver) { + CacheMapHolder hld = grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap; + + GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null; + + if (entry != null && entry.markObsoleteVersion(ver)) + removeEntry(entry); + } + + /** + * + */ + public void cleanupRemoveQueue() { + while (rmvQueue.sizex() >= rmvQueueMaxSize) { + RemovedEntryHolder item = rmvQueue.pollFirst(); + + if (item != null) + removeVersionedEntry(item.cacheId(), item.key(), item.version()); + } + + if (!grp.isDrEnabled()) { + RemovedEntryHolder item = rmvQueue.peekFirst(); + + while (item != null && item.expireTime() < U.currentTimeMillis()) { + item = rmvQueue.pollFirst(); + + if (item == null) + break; + + removeVersionedEntry(item.cacheId(), item.key(), item.version()); + + item = rmvQueue.peekFirst(); + } + } + } + + /** + * @param cacheId cacheId Cache ID. + * @param key Removed key. + * @param ver Removed version. + */ + public void onDeferredDelete(int cacheId, KeyCacheObject key, GridCacheVersion ver) { + cleanupRemoveQueue(); + + rmvQueue.add(new RemovedEntryHolder(cacheId, key, ver, rmvdEntryTtl)); + } + + /** + * Locks partition. + */ + @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) + public void lock() { + lock.lock(); + } + + /** + * Unlocks partition. + */ + public void unlock() { + lock.unlock(); + } + + /** + * Reserves a partition so it won't be cleared or evicted. + * + * @return {@code True} if reserved. + */ + @Override public boolean reserve() { + while (true) { + long state = this.state.get(); + + if (getPartState(state) == EVICTED) + return false; + + long newState = setReservations(state, getReservations(state) + 1); + + if (this.state.compareAndSet(state, newState)) + return true; + } + } + + /** + * Releases previously reserved partition. + */ + @Override public void release() { + release0(0); + } + + /** {@inheritDoc} */ + @Override protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) { + if (grp.sharedGroup() && sizeChange != 0) + hld.size.addAndGet(sizeChange); + + release0(sizeChange); + } + + /** + * @param sizeChange Size change delta. + */ + private void release0(int sizeChange) { + while (true) { + long state = this.state.get(); + + int reservations = getReservations(state); + + if (reservations == 0) + return; + + assert getPartState(state) != EVICTED : getPartState(state); + + long newState = setReservations(state, --reservations); + newState = setSize(newState, getSize(newState) + sizeChange); + + assert getSize(newState) == getSize(state) + sizeChange; + + // Decrement reservations. + if (this.state.compareAndSet(state, newState)) { + // If no more reservations try to continue delayed renting or clearing process. + if (reservations == 0) { + if (delayedRenting) + rent(true); + else + tryContinueClearing(); + } + + break; + } + } + } + + /** + * @param stateToRestore State to restore. + */ + public void restoreState(GridDhtPartitionState stateToRestore) { + state.set(setPartState(state.get(), stateToRestore)); + } + + /** + * For testing purposes only. + * @param toState State to set. + */ + public void setState(GridDhtPartitionState toState) { + if (grp.persistenceEnabled() && grp.walEnabled()) { + synchronized (this) { + long state0 = state.get(); + + this.state.compareAndSet(state0, setPartState(state0, toState)); + + try { + ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, toState, updateCounter())); + } + catch (IgniteCheckedException e) { + U.error(log, "Error while writing to log", e); + } + } + } + else + restoreState(toState); + } + + /** + * @param state Current aggregated value. + * @param toState State to switch to. + * @return {@code true} if cas succeeds. + */ + private boolean casState(long state, GridDhtPartitionState toState) { + if (grp.persistenceEnabled() && grp.walEnabled()) { + synchronized (this) { + GridDhtPartitionState prevState = state(); + + boolean update = this.state.compareAndSet(state, setPartState(state, toState)); + + if (update) { + try { + ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, toState, updateCounter())); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to log partition state change to WAL.", e); + } + + if (log.isDebugEnabled()) + log.debug("Partition changed state [grp=" + grp.cacheOrGroupName() + + ", p=" + id + ", prev=" + prevState + ", to=" + toState + "]"); + } + + return update; + } + } + else { + GridDhtPartitionState prevState = state(); + + boolean update = this.state.compareAndSet(state, setPartState(state, toState)); + + if (update) { + if (log.isDebugEnabled()) + log.debug("Partition changed state [grp=" + grp.cacheOrGroupName() + + ", p=" + id + ", prev=" + prevState + ", to=" + toState + "]"); + } + + return update; + } + } + + /** + * @return {@code True} if transitioned to OWNING state. + */ + public boolean own() { + while (true) { + long state = this.state.get(); + + GridDhtPartitionState partState = getPartState(state); + + if (partState == RENTING || partState == EVICTED) + return false; + + if (partState == OWNING) + return true; + + assert partState == MOVING || partState == LOST; + + if (casState(state, OWNING)) + return true; + } + } + + /** + * Forcibly moves partition to a MOVING state. + */ + public void moving() { + while (true) { + long state = this.state.get(); + + GridDhtPartitionState partState = getPartState(state); + + assert partState == OWNING || partState == RENTING : "Only partitions in state OWNING or RENTING can be moved to MOVING state"; + + if (casState(state, MOVING)) + break; + } + } + + /** + * @return {@code True} if partition state changed. + */ + public boolean markLost() { + while (true) { + long state = this.state.get(); + + GridDhtPartitionState partState = getPartState(state); + + if (partState == LOST) + return false; + + if (casState(state, LOST)) + return true; + } + } + + /** + * Initiates partition eviction process. + * + * If partition has reservations, eviction will be delayed and continued after all reservations will be released. + * + * @param updateSeq If {@code true} topology update sequence will be updated after eviction is finished. + * @return Future to signal that this node is no longer an owner or backup. + */ + public IgniteInternalFuture<?> rent(boolean updateSeq) { + long state0 = this.state.get(); + + GridDhtPartitionState partState = getPartState(state0); + + if (partState == RENTING || partState == EVICTED) + return rent; + + delayedRenting = true; + + if (getReservations(state0) == 0 && casState(state0, RENTING)) { + delayedRenting = false; + + // Evict asynchronously, as the 'rent' method may be called + // from within write locks on local partition. + clearAsync0(updateSeq); + } + + return rent; + } + + /** + * Starts clearing process asynchronously if it's requested and not running at the moment. + * Method may finish clearing process ahead of time if partition is empty and doesn't have reservations. + * + * @param updateSeq Update sequence. + */ + private void clearAsync0(boolean updateSeq) { + long state = this.state.get(); + + GridDhtPartitionState partState = getPartState(state); + + boolean evictionRequested = partState == RENTING || delayedRenting; + boolean clearingRequested = partState == MOVING && clear; + + if (!evictionRequested && !clearingRequested) + return; + + boolean reinitialized = clearFuture.initialize(updateSeq, evictionRequested); + + // Clearing process is already running at the moment. No needs to run it again. + if (!reinitialized) + return; + + // Try fast eviction. + if (freeAndEmpty(state) && !grp.queriesEnabled() && !groupReserved()) { + if (partState == RENTING && casState(state, EVICTED) || clearingRequested) { + clearFuture.finish(); + + if (state() == EVICTED && markForDestroy()) { + updateSeqOnDestroy = updateSeq; + + destroy(); + } + + if (log.isDebugEnabled()) + log.debug("Partition has been fast evicted [grp=" + grp.cacheOrGroupName() + + ", p=" + id + ", state=" + state() + "]"); + + return; + } + } + + ctx.evict().evictPartitionAsync(grp,this); + } + + /** + * Initiates single clear process if partition is in MOVING state or continues cleaning for RENTING state. + * Method does nothing if clear process is already running. + */ + public void clearAsync() { + GridDhtPartitionState state0 = state(); + + if (state0 != MOVING && state0 != RENTING) + return; + + clear = true; + + clearAsync0(false); + } + + /** + * Continues delayed clearing of partition if possible. + * Clearing may be delayed because of existing reservations. + */ + public void tryContinueClearing() { + clearAsync0(true); + } + + /** + * @return {@code true} If there is a group reservation. + */ + private boolean groupReserved() { + for (GridDhtPartitionsReservation reservation : reservations) { + if (!reservation.invalidate()) + return true; // Failed to invalidate reservation -> we are reserved. + } + + return false; + } + + /** + * @param state State. + * @return {@code True} if partition has no reservations and empty. + */ + private boolean freeAndEmpty(long state) { + return isEmpty() && getSize(state) == 0 && getReservations(state) == 0; + } + + /** + * @return {@code true} if evicting thread was added. + */ + private boolean addEvicting() { + while (true) { + int cnt = evictGuard.get(); + + if (cnt != 0) + return false; + + if (evictGuard.compareAndSet(cnt, cnt + 1)) + return true; + } + } + + /** + * @return {@code true} if no thread evicting partition at the moment. + */ + private boolean clearEvicting() { + boolean free; + + while (true) { + int cnt = evictGuard.get(); + + assert cnt > 0; + + if (evictGuard.compareAndSet(cnt, cnt - 1)) { + free = cnt == 1; + + break; + } + } + + return free; + } + + /** + * @return {@code True} if partition is safe to destroy. + */ + public boolean markForDestroy() { + while (true) { + int cnt = evictGuard.get(); + + if (cnt != 0) + return false; + + if (evictGuard.compareAndSet(0, -1)) + return true; + } + } + + /** + * Moves partition state to {@code EVICTED} if possible. + * and initiates partition destroy process after successful moving partition state to {@code EVICTED} state. + * + * @param updateSeq If {@code true} increment update sequence on cache group topology after successful eviction. + */ + private void finishEviction(boolean updateSeq) { + long state0 = this.state.get(); + + GridDhtPartitionState state = getPartState(state0); + + if (state == EVICTED || (freeAndEmpty(state0) && state == RENTING && casState(state0, EVICTED))) + updateSeqOnDestroy = updateSeq; + } + + /** + * Destroys partition data store and invokes appropriate callbacks. + */ + public void destroy() { + assert state() == EVICTED : this; + assert evictGuard.get() == -1; + + grp.onPartitionEvicted(id); + + destroyCacheDataStore(); + + rent.onDone(); + + ((GridDhtPreloader)grp.preloader()).onPartitionEvicted(this, updateSeqOnDestroy); + + clearDeferredDeletes(); + } + + /** + * Awaits completion of partition destroy process in case of {@code EVICTED} partition state. + */ + public void awaitDestroy() { + if (state() != EVICTED) + return; + + final long timeout = 10_000; + + for (;;) { + try { + rent.get(timeout); + + break; + } + catch (IgniteFutureTimeoutCheckedException ignored) { + U.warn(log, "Failed to await partition destroy within timeout " + this); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to await partition destroy " + this, e); + } + } + } + + /** + * Adds listener on {@link #clearFuture} finish. + * + * @param lsnr Listener. + */ + public void onClearFinished(IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) { + clearFuture.listen(lsnr); + } + + /** + * @return {@code True} if clearing process is running at the moment on the partition. + */ + public boolean isClearing() { + return !clearFuture.isDone(); + } + + /** + * Tries to start partition clear process {@link GridDhtLocalPartition#clearAll(EvictionContext)}). + * Only one thread is allowed to do such process concurrently. + * At the end of clearing method completes {@code clearFuture}. + * + * @return {@code false} if clearing is not started due to existing reservations. + * @throws NodeStoppingException If node is stopping. + */ + public boolean tryClear(EvictionContext evictionCtx) throws NodeStoppingException { + if (clearFuture.isDone()) + return true; + + long state = this.state.get(); + + if (getReservations(state) != 0 || groupReserved()) + return false; + + if (addEvicting()) { + try { + // Attempt to evict partition entries from cache. + long clearedEntities = clearAll(evictionCtx); + + if (log.isDebugEnabled()) + log.debug("Partition has been cleared [grp=" + grp.cacheOrGroupName() + + ", p=" + id + ", state=" + state() + ", clearedCnt=" + clearedEntities + "]"); + } + catch (NodeStoppingException e) { + clearFuture.finish(e); + + throw e; + } + finally { + boolean free = clearEvicting(); + + if (free) + clearFuture.finish(); + } + } + + return true; + } + + /** + * Release created data store for this partition. + */ + private void destroyCacheDataStore() { + try { + grp.offheap().destroyCacheDataStore(dataStore()); + } + catch (IgniteCheckedException e) { + log.error("Unable to destroy cache data store on partition eviction [id=" + id + "]", e); + } + } + + /** + * On partition unlock callback. + * Tries to continue delayed partition clearing. + */ + public void onUnlock() { + tryContinueClearing(); + } + + /** + * @param topVer Topology version. + * @return {@code True} if local node is primary for this partition. + */ + public boolean primary(AffinityTopologyVersion topVer) { + List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id); + + return !nodes.isEmpty() && ctx.localNode().equals(nodes.get(0)); + } + + /** + * @param topVer Topology version. + * @return {@code True} if local node is backup for this partition. + */ + public boolean backup(AffinityTopologyVersion topVer) { + List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id); + + return nodes.indexOf(ctx.localNode()) > 0; + } + + /** + * @param cacheId ID of cache initiated counter update. + * @param topVer Topology version for current operation. + * @return Next update index. + */ + public long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer, boolean primary, @Nullable Long primaryCntr) { + long nextCntr = store.nextUpdateCounter(); + + if (grp.sharedGroup()) + grp.onPartitionCounterUpdate(cacheId, id, primaryCntr != null ? primaryCntr : nextCntr, topVer, primary); + + return nextCntr; + } + + /** + * @return Current update counter. + */ + public long updateCounter() { + return store.updateCounter(); + } + + /** + * @param val Update counter value. + */ + public void updateCounter(long val) { + store.updateCounter(val); + } + + /** + * @return Initial update counter. + */ + public long initialUpdateCounter() { + return store.initialUpdateCounter(); + } + + /** + * @param val Initial update counter value. + */ + public void initialUpdateCounter(long val) { + store.updateInitialCounter(val); + } + + /** + * Updates MVCC cache update counter on primary node. + * + * @param delta Value to be added to update counter. + * @return Update counter value before update. + */ + public long getAndIncrementUpdateCounter(long delta) { + return store.getAndIncrementUpdateCounter(delta); + } + + /** + * Updates MVCC cache update counter on backup node. + * + * @param start Start position + * @param delta Delta. + */ + public void updateCounter(long start, long delta) { + store.updateCounter(start, delta); + } + + /** + * @return Total size of all caches. + */ + public long fullSize() { + return store.fullSize(); + } + + /** + * Removes all entries and rows from this partition. + * + * @return Number of rows cleared from page memory. + * @throws NodeStoppingException If node stopping. + */ + private long clearAll(EvictionContext evictionCtx) throws NodeStoppingException { + GridCacheVersion clearVer = ctx.versions().next(); + + GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer); + + boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); + + if (grp.sharedGroup()) { + for (CacheMapHolder hld : cacheMaps.values()) + clear(hld.map, extras, rec); + } + else + clear(singleCacheEntryMap.map, extras, rec); + + long cleared = 0; + + final int stopCheckingFreq = 1000; + + CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; + + try { + GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id); + + while (it0.hasNext()) { + ctx.database().checkpointReadLock(); + + try { + CacheDataRow row = it0.next(); + + // Do not clear fresh rows in case of single partition clearing. + if (row.version().compareTo(clearVer) >= 0 && (state() == MOVING && clear)) + continue; + + if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId())) + hld = cacheMapHolder(ctx.cacheContext(row.cacheId())); + + assert hld != null; + + GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent( + hld, + hld.cctx, + grp.affinity().lastVersion(), + row.key(), + true, + false); + + if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { + removeEntry(cached); + + if (rec && !hld.cctx.config().isEventsDisabled()) { + hld.cctx.events().addEvent(cached.partition(), + cached.key(), + ctx.localNodeId(), + (IgniteUuid)null, + null, + EVT_CACHE_REBALANCE_OBJECT_UNLOADED, + null, + false, + cached.rawGet(), + cached.hasValue(), + null, + null, + null, + false); + } + + cleared++; + } + + // For each 'stopCheckingFreq' cleared entities check clearing process to stop. + if (cleared % stopCheckingFreq == 0 && evictionCtx.shouldStop()) + return cleared; + } + catch (GridDhtInvalidPartitionException e) { + assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; + + break; // Partition is already concurrently cleared and evicted. + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + + if (forceTestCheckpointOnEviction) { + if (partWhereTestCheckpointEnforced == null && cleared >= fullSize()) { + ctx.database().forceCheckpoint("test").finishFuture().get(); + + log.warning("Forced checkpoint by test reasons for partition: " + this); + + partWhereTestCheckpointEnforced = id; + } + } + } + catch (NodeStoppingException e) { + if (log.isDebugEnabled()) + log.debug("Failed to get iterator for evicted partition: " + id); + + throw e; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to get iterator for evicted partition: " + id, e); + } + + return cleared; + } + + /** + * Removes all cache entries from specified {@code map}. + * + * @param map Map to clear. + * @param extras Obsolete extras. + * @param evt Unload event flag. + * @throws NodeStoppingException If current node is stopping. + */ + private void clear(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map, + GridCacheObsoleteEntryExtras extras, + boolean evt) throws NodeStoppingException { + Iterator<GridCacheMapEntry> it = map.values().iterator(); + + while (it.hasNext()) { + GridCacheMapEntry cached = null; + + ctx.database().checkpointReadLock(); + + try { + cached = it.next(); + + if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(extras.obsoleteVersion(), extras)) { + removeEntry(cached); + + if (!cached.isInternal()) { + if (evt) { + grp.addCacheEvent(cached.partition(), + cached.key(), + ctx.localNodeId(), + EVT_CACHE_REBALANCE_OBJECT_UNLOADED, + null, + false, + cached.rawGet(), + cached.hasValue(), + false); + } + } + } + } + catch (GridDhtInvalidPartitionException e) { + assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; + + break; // Partition is already concurrently cleared and evicted. + } + catch (NodeStoppingException e) { + if (log.isDebugEnabled()) + log.debug("Failed to clear cache entry for evicted partition: " + cached.partition()); + + throw e; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e); + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + } + + /** + * Removes all deferred delete requests from {@code rmvQueue}. + */ + private void clearDeferredDeletes() { + for (RemovedEntryHolder e : rmvQueue) + removeVersionedEntry(e.cacheId(), e.key(), e.version()); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"OverlyStrongTypeCast"}) + @Override public boolean equals(Object obj) { + return obj instanceof GridDhtLocalPartition && (obj == this || ((GridDhtLocalPartition)obj).id() == id); + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull GridDhtLocalPartition part) { + if (part == null) + return 1; + + return Integer.compare(id, part.id()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtLocalPartition.class, this, + "grp", grp.cacheOrGroupName(), + "state", state(), + "reservations", reservations(), + "empty", isEmpty(), + "createTime", U.format(createTime)); + } + + /** {@inheritDoc} */ + @Override public int publicSize(int cacheId) { + if (grp.sharedGroup()) { + CacheMapHolder hld = cacheMaps.get(cacheId); + + return hld != null ? hld.size.get() : 0; + } + + return getSize(state.get()); + } + + /** {@inheritDoc} */ + @Override public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { + if (grp.sharedGroup()) { + if (hld == null) + hld = cacheMapHolder(e.context()); + + hld.size.incrementAndGet(); + } + + while (true) { + long state = this.state.get(); + + if (this.state.compareAndSet(state, setSize(state, getSize(state) + 1))) + return; + } + } + + /** {@inheritDoc} */ + @Override public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { + if (grp.sharedGroup()) { + if (hld == null) + hld = cacheMapHolder(e.context()); + + hld.size.decrementAndGet(); + } + + while (true) { + long state = this.state.get(); + + assert getPartState(state) != EVICTED; + + if (this.state.compareAndSet(state, setSize(state, getSize(state) - 1))) + return; + } + } + + /** + * Returns group context. + * + * @return Group context. + */ + public CacheGroupContext group() { + return grp; + } + + /** + * @param cacheId Cache ID. + */ + public void onCacheStopped(int cacheId) { + assert grp.sharedGroup() : grp.cacheOrGroupName(); + + for (Iterator<RemovedEntryHolder> it = rmvQueue.iterator(); it.hasNext();) { + RemovedEntryHolder e = it.next(); + + if (e.cacheId() == cacheId) + it.remove(); + } + + cacheMaps.remove(cacheId); + } + + /** + * @param state Composite state. + * @return Partition state. + */ + private static GridDhtPartitionState getPartState(long state) { + return GridDhtPartitionState.fromOrdinal((int)(state & (0x0000000000000007L))); + } + + /** + * @param state Composite state to update. + * @param partState Partition state. + * @return Updated composite state. + */ + private static long setPartState(long state, GridDhtPartitionState partState) { + return (state & (~0x0000000000000007L)) | partState.ordinal(); + } + + /** + * @param state Composite state. + * @return Reservations. + */ + private static int getReservations(long state) { + return (int)((state & 0x00000000FFFF0000L) >> 16); + } + + /** + * @param state Composite state to update. + * @param reservations Reservations to set. + * @return Updated composite state. + */ + private static long setReservations(long state, int reservations) { + return (state & (~0x00000000FFFF0000L)) | (reservations << 16); + } + + /** + * @param state Composite state. + * @return Size. + */ + private static int getSize(long state) { + return (int)((state & 0xFFFFFFFF00000000L) >> 32); + } + + /** + * @param state Composite state to update. + * @param size Size to set. + * @return Updated composite state. + */ + private static long setSize(long state, int size) { + return (state & (~0xFFFFFFFF00000000L)) | ((long)size << 32); + } + + /** + * Removed entry holder. + */ + private static class RemovedEntryHolder { + /** */ + private final int cacheId; + + /** Cache key */ + private final KeyCacheObject key; + + /** Entry version */ + private final GridCacheVersion ver; + + /** Entry expire time. */ + private final long expireTime; + + /** + * @param cacheId Cache ID. + * @param key Key. + * @param ver Entry version. + * @param ttl TTL. + */ + private RemovedEntryHolder(int cacheId, KeyCacheObject key, GridCacheVersion ver, long ttl) { + this.cacheId = cacheId; + this.key = key; + this.ver = ver; + + expireTime = U.currentTimeMillis() + ttl; + } + + /** + * @return Cache ID. + */ + int cacheId() { + return cacheId; + } + + /** + * @return Key. + */ + KeyCacheObject key() { + return key; + } + + /** + * @return Version. + */ + GridCacheVersion version() { + return ver; + } + + /** + * @return item expired time + */ + long expireTime() { + return expireTime; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RemovedEntryHolder.class, this); + } + } + + /** + * Future is needed to control partition clearing process. + * Future can be used both for single clearing or eviction processes. + */ + class ClearFuture extends GridFutureAdapter<Boolean> { + /** Flag indicates that eviction callback was registered on the current future. */ + private volatile boolean evictionCbRegistered; + + /** Flag indicates that clearing callback was registered on the current future. */ + private volatile boolean clearingCbRegistered; + + /** Flag indicates that future with all callbacks was finished. */ + private volatile boolean finished; + + /** + * Constructor. + */ + ClearFuture() { + onDone(); + finished = true; + } + + /** + * Registers finish eviction callback on the future. + * + * @param updateSeq If {@code true} update topology sequence after successful eviction. + */ + private void registerEvictionCallback(boolean updateSeq) { + if (evictionCbRegistered) + return; + + synchronized (this) { + // Double check + if (evictionCbRegistered) + return; + + evictionCbRegistered = true; + + // Initiates partition eviction and destroy. + listen(f -> { + try { + // Check for errors. + f.get(); + + finishEviction(updateSeq); + } + catch (Exception e) { + rent.onDone(e); + } + + evictionCbRegistered = false; + }); + } + } + + /** + * Registers clearing callback on the future. + */ + private void registerClearingCallback() { + if (clearingCbRegistered) + return; + + synchronized (this) { + // Double check + if (clearingCbRegistered) + return; + + clearingCbRegistered = true; + + // Recreate cache data store in case of allowed fast eviction, and reset clear flag. + listen(f -> { + clear = false; + + clearingCbRegistered = false; + }); + } + } + + /** + * Successfully finishes the future. + */ + public void finish() { + synchronized (this) { + onDone(); + finished = true; + } + } + + /** + * Finishes the future with error. + * + * @param t Error. + */ + public void finish(Throwable t) { + synchronized (this) { + onDone(t); + finished = true; + } + } + + /** + * Reuses future if it's done. + * Adds appropriate callbacks to the future in case of eviction or single clearing. + * + * @param updateSeq Update sequence. + * @param evictionRequested If {@code true} adds eviction callback, in other case adds single clearing callback. + * @return {@code true} if future has been reinitialized. + */ + public boolean initialize(boolean updateSeq, boolean evictionRequested) { + // In case of running clearing just try to add missing callbacks to avoid extra synchronization. + if (!finished) { + if (evictionRequested) + registerEvictionCallback(updateSeq); + else + registerClearingCallback(); + + return false; + } + + synchronized (this) { + boolean done = isDone(); + + if (done) { + reset(); + + finished = false; + evictionCbRegistered = false; + clearingCbRegistered = false; + } + + if (evictionRequested) + registerEvictionCallback(updateSeq); + else + registerClearingCallback(); + + return done; + } + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionState.java new file mode 100644 index 0000000..4cbce4b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionState.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.topology; + +import org.jetbrains.annotations.Nullable; + +/** + * Partition states. + */ +public enum GridDhtPartitionState { + /** Partition is being loaded from another node. */ + MOVING, + + /** This node is either a primary or backup owner. */ + OWNING, + + /** This node is neither primary or back up owner. */ + RENTING, + + /** Partition has been evicted from cache. */ + EVICTED, + + /** Partition state is invalid, partition should not be used. */ + LOST; + + /** Enum values. */ + private static final GridDhtPartitionState[] VALS = values(); + + /** + * @param ord Ordinal value. + * @return Enum value. + */ + @Nullable public static GridDhtPartitionState fromOrdinal(int ord) { + return ord < 0 || ord >= VALS.length ? null : VALS[ord]; + } + + /** + * @return {@code True} if state is active or owning. + */ + public boolean active() { + return this != EVICTED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java new file mode 100644 index 0000000..b6cb5bb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.topology; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.jetbrains.annotations.Nullable; + +/** + * DHT partition topology. + */ +@GridToStringExclude +public interface GridDhtPartitionTopology { + /** + * @return Total cache partitions. + */ + public int partitions(); + + /** + * Locks the topology, usually during mapping on locks or transactions. + */ + public void readLock(); + + /** + * Unlocks topology locked by {@link #readLock()} method. + */ + public void readUnlock(); + + /** + * @return {@code True} if locked by current thread. + */ + public boolean holdsLock(); + /** + * Updates topology version. + * + * @param exchFut Exchange future. + * @param discoCache Discovery data cache. + * @param updateSeq Update sequence. + * @param stopping Stopping flag. + * @throws IgniteInterruptedCheckedException If interrupted. + */ + public void updateTopologyVersion( + GridDhtTopologyFuture exchFut, + DiscoCache discoCache, + MvccCoordinator mvccCrd, + long updateSeq, + boolean stopping + ) throws IgniteInterruptedCheckedException; + + /** + * @return Result topology version of last finished exchange. + */ + public AffinityTopologyVersion readyTopologyVersion(); + + /** + * @return Start topology version of last exchange. + */ + public AffinityTopologyVersion lastTopologyChangeVersion(); + + /** + * Gets a future that will be completed when partition exchange map for this + * particular topology version is done. + * + * @return Topology version ready future. + */ + public GridDhtTopologyFuture topologyVersionFuture(); + + /** + * @return {@code True} if cache is being stopped. + */ + public boolean stopping(); + + /** + * @return Cache group ID. + */ + public int groupId(); + + /** + * Pre-initializes this topology. + * + * @param exchFut Exchange future. + * @param affReady Affinity ready flag. + * @param updateMoving + * @throws IgniteCheckedException If failed. + */ + public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, + boolean affReady, + boolean updateMoving) + throws IgniteCheckedException; + + /** + * @param affVer Affinity version. + * @param exchFut Exchange future. + * @return {@code True} if partitions must be refreshed. + * @throws IgniteInterruptedCheckedException If interrupted. + */ + public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut) + throws IgniteInterruptedCheckedException; + + /** + * Initializes local data structures after partitions are restored from persistence. + * + * @param topVer Topology version. + */ + public void afterStateRestored(AffinityTopologyVersion topVer); + + /** + * Post-initializes this topology. + * + * @param exchFut Exchange future. + * @return {@code True} if mapping was changed. + * @throws IgniteCheckedException If failed. + */ + public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException; + + /** + * @param topVer Topology version at the time of creation. + * @param p Partition ID. + * @param create If {@code true}, then partition will be created if it's not there. + * @return Local partition. + * @throws GridDhtInvalidPartitionException If partition is evicted or absent and + * does not belong to this node. + */ + @Nullable public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create) + throws GridDhtInvalidPartitionException; + + /** + * Unconditionally creates partition during restore of persisted partition state. + * + * @param p Partition ID. + * @return Partition. + * @throws IgniteCheckedException If failed. + */ + public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException; + + /** + * @param topVer Topology version at the time of creation. + * @param p Partition ID. + * @param create If {@code true}, then partition will be created if it's not there. + * @return Local partition. + * @throws GridDhtInvalidPartitionException If partition is evicted or absent and + * does not belong to this node. + */ + @Nullable public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create, + boolean showRenting) + throws GridDhtInvalidPartitionException; + + /** + * @param parts Partitions to release (should be reserved before). + */ + public void releasePartitions(int... parts); + + /** + * @param part Partition number. + * @return Local partition. + * @throws GridDhtInvalidPartitionException If partition is evicted or absent and + * does not belong to this node. + */ + @Nullable public GridDhtLocalPartition localPartition(int part) + throws GridDhtInvalidPartitionException; + + /** + * @return All local partitions by copying them into another list. + */ + public List<GridDhtLocalPartition> localPartitions(); + + /** + * + * @return All current active local partitions. + */ + public Iterable<GridDhtLocalPartition> currentLocalPartitions(); + + /** + * @return Local IDs. + */ + public GridDhtPartitionMap localPartitionMap(); + + /** + * @param nodeId Node ID. + * @param part Partition. + * @return Partition state. + */ + public GridDhtPartitionState partitionState(UUID nodeId, int part); + + /** + * @return Current update sequence. + */ + public long updateSequence(); + + /** + * @param p Partition ID. + * @param topVer Topology version. + * @return Collection of all nodes responsible for this partition with primary node being first. + */ + public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer); + + /** + * @param p Partition ID. + * @param affAssignment Assignments. + * @param affNodes Node assigned for given partition by affinity. + * @return Collection of all nodes responsible for this partition with primary node being first. The first N + * elements of this collection (with N being 1 + backups) are actual DHT affinity nodes, other nodes + * are current additional owners of the partition after topology change. + */ + @Nullable public List<ClusterNode> nodes(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes); + + /** + * @param p Partition ID. + * @return Collection of all nodes who {@code own} this partition. + */ + public List<ClusterNode> owners(int p); + + /** + * @return List indexed by partition number, each list element is collection of all nodes who + * owns corresponding partition. + */ + public List<List<ClusterNode>> allOwners(); + + /** + * @param p Partition ID. + * @param topVer Topology version. + * @return Collection of all nodes who {@code own} this partition. + */ + public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer); + + /** + * @param p Partition ID. + * @return Collection of all nodes who {@code are preloading} this partition. + */ + public List<ClusterNode> moving(int p); + + /** + * @param onlyActive If {@code true}, then only {@code active} partitions will be returned. + * @return Node IDs mapped to partitions. + */ + public GridDhtPartitionFullMap partitionMap(boolean onlyActive); + + /** + * @return {@code True} If one of cache nodes has partitions in {@link GridDhtPartitionState#MOVING} state. + */ + public boolean hasMovingPartitions(); + + /** + * @param e Entry removed from cache. + */ + public void onRemoved(GridDhtCacheEntry e); + + /** + * @param exchangeResVer Result topology version for exchange. Value should be greater than previously passed. Null value + * means full map received is not related to exchange + * @param partMap Update partition map. + * @param cntrMap Partition update counters. + * @param partsToReload Set of partitions that need to be reloaded. + * @param msgTopVer Topology version from incoming message. This value is not null only for case message is not + * related to exchange. Value should be not less than previous 'Topology version from exchange'. + * @return {@code True} if local state was changed. + */ + public boolean update( + @Nullable AffinityTopologyVersion exchangeResVer, + GridDhtPartitionFullMap partMap, + @Nullable CachePartitionFullCountersMap cntrMap, + Set<Integer> partsToReload, + @Nullable Map<Integer, Long> partSizes, + @Nullable AffinityTopologyVersion msgTopVer); + + /** + * @param exchId Exchange ID. + * @param parts Partitions. + * @param force {@code True} to skip stale update check. + * @return {@code True} if local state was changed. + */ + public boolean update(@Nullable GridDhtPartitionExchangeId exchId, + GridDhtPartitionMap parts, + boolean force); + + /** + * Collects update counters collected during exchange. Called on coordinator. + * + * @param cntrMap Counters map. + */ + public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap); + + /** + * Applies update counters collected during exchange on coordinator. Called on coordinator. + */ + public void applyUpdateCounters(); + + /** + * Checks if there is at least one owner for each partition in the cache topology. + * If not, marks such a partition as LOST. + * <p> + * This method should be called on topology coordinator after all partition messages are received. + * + * @param resTopVer Exchange result version. + * @param discoEvt Discovery event for which we detect lost partitions. + * @return {@code True} if partitions state got updated. + */ + public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, DiscoveryEvent discoEvt); + + /** + * Resets the state of all LOST partitions to OWNING. + * + * @param resTopVer Exchange result version. + */ + public void resetLostPartitions(AffinityTopologyVersion resTopVer); + + /** + * @return Collection of lost partitions, if any. + */ + public Collection<Integer> lostPartitions(); + + /** + * @return Partition update counters. + */ + public CachePartitionFullCountersMap fullUpdateCounters(); + + /** + * @return Partition update counters. + */ + public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros); + + /** + * @return Partition cache sizes. + */ + public Map<Integer, Long> partitionSizes(); + + /** + * @param part Partition to own. + * @return {@code True} if owned. + */ + public boolean own(GridDhtLocalPartition part); + + /** + * Owns all moving partitions for the given topology version. + * + * @param topVer Topology version. + */ + public void ownMoving(AffinityTopologyVersion topVer); + + /** + * @param part Evicted partition. + * @param updateSeq Update sequence increment flag. + */ + public void onEvicted(GridDhtLocalPartition part, boolean updateSeq); + + /** + * @param nodeId Node to get partitions for. + * @return Partitions for node. + */ + @Nullable public GridDhtPartitionMap partitions(UUID nodeId); + + /** + * Prints memory stats. + * + * @param threshold Threshold for number of entries. + */ + public void printMemoryStats(int threshold); + + /** + * @return Sizes of up-to-date partition versions in topology. + */ + Map<Integer, Long> globalPartSizes(); + + /** + * @param partSizes Sizes of up-to-date partition versions in topology. + */ + void globalPartSizes(@Nullable Map<Integer, Long> partSizes); + + /** + * @param topVer Topology version. + * @return {@code True} if rebalance process finished. + */ + public boolean rebalanceFinished(AffinityTopologyVersion topVer); + + /** + * Calculates nodes and partitions which have non-actual state and must be rebalanced. + * State of all current owners that aren't contained in the given {@code ownersByUpdCounters} will be reset to MOVING. + * + * @param ownersByUpdCounters Map (partition, set of node IDs that have most actual state about partition + * (update counter is maximal) and should hold OWNING state for such partition). + * @param haveHistory Set of partitions which have WAL history to rebalance. + * @return Map (nodeId, set of partitions that should be rebalanced <b>fully</b> by this node). + */ + public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory); + + /** + * Callback on exchange done. + * + * @param assignment New affinity assignment. + * @param updateRebalanceVer {@code True} if need check rebalance state. + */ + public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, AffinityAssignment assignment, boolean updateRebalanceVer); + + public MvccCoordinator mvccCoordinator(); +}