http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java new file mode 100644 index 0000000..7035e37 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -0,0 +1,2999 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.topology; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.PartitionLossPolicy; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; +import org.apache.ignite.internal.util.F0; +import org.apache.ignite.internal.util.GridAtomicLong; +import org.apache.ignite.internal.util.GridPartitionStateMap; +import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.SB; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING; + +/** + * Partition topology. + */ +@GridToStringExclude +public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { + /** */ + private static final GridDhtPartitionState[] MOVING_STATES = new GridDhtPartitionState[] {MOVING}; + + /** Flag to control amount of output for full map. */ + private static final boolean FULL_MAP_DEBUG = false; + + /** */ + private static final boolean FAST_DIFF_REBUILD = false; + + /** */ + private final GridCacheSharedContext ctx; + + /** */ + private final CacheGroupContext grp; + + /** Logger. */ + private final IgniteLogger log; + + /** Time logger. */ + private final IgniteLogger timeLog; + + /** */ + private final AtomicReferenceArray<GridDhtLocalPartition> locParts; + + /** Node to partition map. */ + private GridDhtPartitionFullMap node2part; + + /** Partitions map for left nodes. */ + private GridDhtPartitionFullMap leftNode2Part = new GridDhtPartitionFullMap(); + + /** */ + private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>(); + + /** */ + private volatile AffinityTopologyVersion diffFromAffinityVer = AffinityTopologyVersion.NONE; + + /** Last started exchange version (always >= readyTopVer). */ + private volatile AffinityTopologyVersion lastTopChangeVer = AffinityTopologyVersion.NONE; + + /** Last finished exchange version. */ + private volatile AffinityTopologyVersion readyTopVer = AffinityTopologyVersion.NONE; + + /** Discovery cache. */ + private volatile DiscoCache discoCache; + + /** */ + private volatile boolean stopping; + + /** A future that will be completed when topology with version topVer will be ready to use. */ + private volatile GridDhtTopologyFuture topReadyFut; + + /** */ + private final GridAtomicLong updateSeq = new GridAtomicLong(1); + + /** Lock. */ + private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16); + + /** Partition update counter. */ + private final CachePartitionFullCountersMap cntrMap; + + /** */ + private volatile Map<Integer, Long> globalPartSizes; + + /** */ + private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE; + + /** */ + private volatile MvccCoordinator mvccCrd; + + /** + * @param ctx Cache shared context. + * @param grp Cache group. + */ + public GridDhtPartitionTopologyImpl( + GridCacheSharedContext ctx, + CacheGroupContext grp + ) { + assert ctx != null; + assert grp != null; + + this.ctx = ctx; + this.grp = grp; + + log = ctx.logger(getClass()); + + timeLog = ctx.logger(GridDhtPartitionsExchangeFuture.EXCHANGE_LOG); + + locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions()); + + cntrMap = new CachePartitionFullCountersMap(locParts.length()); + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return grp.affinityFunction().partitions(); + } + + /** {@inheritDoc} */ + @Override public int groupId() { + return grp.groupId(); + } + + /** + * + */ + public void onReconnected() { + lock.writeLock().lock(); + + try { + node2part = null; + + diffFromAffinity.clear(); + + updateSeq.set(1); + + topReadyFut = null; + + diffFromAffinityVer = AffinityTopologyVersion.NONE; + + rebalancedTopVer = AffinityTopologyVersion.NONE; + + readyTopVer = AffinityTopologyVersion.NONE; + + lastTopChangeVer = AffinityTopologyVersion.NONE; + + discoCache = ctx.discovery().discoCache(); + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * @return Full map string representation. + */ + @SuppressWarnings({"ConstantConditions"}) + private String fullMapString() { + return node2part == null ? "null" : FULL_MAP_DEBUG ? node2part.toFullString() : node2part.toString(); + } + + /** + * @param map Map to get string for. + * @return Full map string representation. + */ + @SuppressWarnings({"ConstantConditions"}) + private String mapString(GridDhtPartitionMap map) { + return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString(); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) + @Override public void readLock() { + lock.readLock().lock(); + } + + /** {@inheritDoc} */ + @Override public void readUnlock() { + lock.readLock().unlock(); + } + + /** {@inheritDoc} */ + @Override public MvccCoordinator mvccCoordinator() { + return mvccCrd; + } + + /** {@inheritDoc} */ + @Override public boolean holdsLock() { + return lock.isWriteLockedByCurrentThread() || lock.getReadHoldCount() > 0; + } + + /** {@inheritDoc} */ + @Override public void updateTopologyVersion( + GridDhtTopologyFuture exchFut, + @NotNull DiscoCache discoCache, + MvccCoordinator mvccCrd, + long updSeq, + boolean stopping + ) throws IgniteInterruptedCheckedException { + U.writeLock(lock); + + try { + AffinityTopologyVersion exchTopVer = exchFut.initialVersion(); + + assert exchTopVer.compareTo(readyTopVer) > 0 : "Invalid topology version [grp=" + grp.cacheOrGroupName() + + ", topVer=" + readyTopVer + + ", exchTopVer=" + exchTopVer + + ", discoCacheVer=" + (this.discoCache != null ? this.discoCache.version() : "None") + + ", exchDiscoCacheVer=" + discoCache.version() + + ", fut=" + exchFut + ']'; + + this.stopping = stopping; + + updateSeq.setIfGreater(updSeq); + + topReadyFut = exchFut; + + rebalancedTopVer = AffinityTopologyVersion.NONE; + + lastTopChangeVer = exchTopVer; + + this.discoCache = discoCache; + this.mvccCrd = mvccCrd; + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion readyTopologyVersion() { + AffinityTopologyVersion topVer = this.readyTopVer; + + assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + + ", group=" + grp.cacheOrGroupName() + ']'; + + return topVer; + } + + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion lastTopologyChangeVersion() { + AffinityTopologyVersion topVer = this.lastTopChangeVer; + + assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + + ", group=" + grp.cacheOrGroupName() + ']'; + + return topVer; + } + + /** {@inheritDoc} */ + @Override public GridDhtTopologyFuture topologyVersionFuture() { + assert topReadyFut != null; + + return topReadyFut; + } + + /** {@inheritDoc} */ + @Override public boolean stopping() { + return stopping; + } + + /** {@inheritDoc} */ + @Override public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, + GridDhtPartitionsExchangeFuture exchFut) + throws IgniteInterruptedCheckedException + { + boolean needRefresh; + + ctx.database().checkpointReadLock(); + + try { + U.writeLock(lock); + + try { + if (stopping) + return false; + + long updateSeq = this.updateSeq.incrementAndGet(); + + needRefresh = initPartitions(affVer, grp.affinity().readyAssignments(affVer), exchFut, updateSeq); + + consistencyCheck(); + } + finally { + lock.writeLock().unlock(); + } + } + finally { + ctx.database().checkpointReadUnlock(); + } + + return needRefresh; + } + + /** + * Creates and initializes partitions using given {@code affVer} and {@code affAssignment}. + * + * @param affVer Affinity version to use. + * @param affAssignment Affinity assignment to use. + * @param exchFut Exchange future. + * @param updateSeq Update sequence. + * @return {@code True} if partitions must be refreshed. + */ + private boolean initPartitions(AffinityTopologyVersion affVer, List<List<ClusterNode>> affAssignment, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { + boolean needRefresh = false; + + if (grp.affinityNode()) { + ClusterNode loc = ctx.localNode(); + + ClusterNode oldest = discoCache.oldestAliveServerNode(); + + GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + + int partitions = grp.affinity().partitions(); + + if (grp.rebalanceEnabled()) { + boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); + + boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || exchFut.activateCluster(); + + if (first) { + assert exchId.isJoined() || added || exchFut.activateCluster(); + + if (log.isDebugEnabled()) { + String reason; + + if (exchId.isJoined()) + reason = "First node in cluster"; + else if (added) + reason = "Cache group added"; + else + reason = "Cluster activate"; + + log.debug("Initialize partitions (" + reason + ")" + " [grp=" + grp.cacheOrGroupName() + "]"); + } + + for (int p = 0; p < partitions; p++) { + if (localNode(p, affAssignment)) { + // Partition is created first time, so it's safe to own it. + boolean shouldOwn = locParts.get(p) == null; + + GridDhtLocalPartition locPart = getOrCreatePartition(p); + + if (shouldOwn) { + locPart.own(); + + if (log.isDebugEnabled()) + log.debug("Partition has been owned (created first time) " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + locPart.id() + ']'); + } + + needRefresh = true; + + updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer); + } + } + } + else + createPartitions(affVer, affAssignment, updateSeq); + } + else { + // If preloader is disabled, then we simply clear out + // the partitions this node is not responsible for. + for (int p = 0; p < partitions; p++) { + GridDhtLocalPartition locPart = localPartition0(p, affVer, false, true); + + boolean belongs = localNode(p, affAssignment); + + if (locPart != null) { + if (!belongs) { + GridDhtPartitionState state = locPart.state(); + + if (state.active()) { + locPart.rent(false); + + updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer); + + if (log.isDebugEnabled()) { + log.debug("Evicting partition with rebalancing disabled (it does not belong to " + + "affinity) [grp=" + grp.cacheOrGroupName() + ", part=" + locPart + ']'); + } + } + } + else + locPart.own(); + } + else if (belongs) { + locPart = getOrCreatePartition(p); + + locPart.own(); + + updateLocal(p, locPart.state(), updateSeq, affVer); + } + } + } + } + + updateRebalanceVersion(affVer, affAssignment); + + return needRefresh; + } + + /** + * Creates non-existing partitions belong to given affinity {@code aff}. + * + * @param affVer Affinity version. + * @param aff Affinity assignments. + * @param updateSeq Update sequence. + */ + private void createPartitions(AffinityTopologyVersion affVer, List<List<ClusterNode>> aff, long updateSeq) { + if (!grp.affinityNode()) + return; + + int partitions = grp.affinity().partitions(); + + if (log.isDebugEnabled()) + log.debug("Create non-existing partitions [grp=" + grp.cacheOrGroupName() + "]"); + + for (int p = 0; p < partitions; p++) { + if (node2part != null && node2part.valid()) { + if (localNode(p, aff)) { + // This will make sure that all non-existing partitions + // will be created in MOVING state. + GridDhtLocalPartition locPart = getOrCreatePartition(p); + + updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer); + } + } + // If this node's map is empty, we pre-create local partitions, + // so local map will be sent correctly during exchange. + else if (localNode(p, aff)) + getOrCreatePartition(p); + } + } + + /** {@inheritDoc} */ + @Override public void beforeExchange( + GridDhtPartitionsExchangeFuture exchFut, + boolean affReady, + boolean updateMoving + ) throws IgniteCheckedException { + ctx.database().checkpointReadLock(); + + try { + synchronized (ctx.exchange().interruptLock()) { + if (Thread.currentThread().isInterrupted()) + throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread()); + + U.writeLock(lock); + + try { + if (stopping) + return; + + assert lastTopChangeVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + lastTopChangeVer + + ", exchId=" + exchFut.exchangeId() + ']'; + + ExchangeDiscoveryEvents evts = exchFut.context().events(); + + if (affReady) { + assert grp.affinity().lastVersion().equals(evts.topologyVersion()) : "Invalid affinity version [" + + "grp=" + grp.cacheOrGroupName() + + ", affVer=" + grp.affinity().lastVersion() + + ", evtsVer=" + evts.topologyVersion() + ']'; + + lastTopChangeVer = readyTopVer = evts.topologyVersion(); + + discoCache = evts.discoveryCache(); + } + + if (log.isDebugEnabled()) { + log.debug("Partition map beforeExchange [grp=" + grp.cacheOrGroupName() + + ", exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); + } + + long updateSeq = this.updateSeq.incrementAndGet(); + + cntrMap.clear(); + + initializeFullMap(updateSeq); + + boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); + + if (evts.hasServerLeft()) { + List<DiscoveryEvent> evts0 = evts.events(); + + for (int i = 0; i < evts0.size(); i++) { + DiscoveryEvent evt = evts0.get(i); + + if (ExchangeDiscoveryEvents.serverLeftEvent(evt)) + removeNode(evt.eventNode().id()); + } + } + + if (grp.affinityNode()) { + if (grpStarted || + exchFut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || + exchFut.serverNodeDiscoveryEvent()) { + + AffinityTopologyVersion affVer; + List<List<ClusterNode>> affAssignment; + + if (affReady) { + affVer = evts.topologyVersion(); + + assert grp.affinity().lastVersion().equals(affVer) : + "Invalid affinity [topVer=" + grp.affinity().lastVersion() + + ", grp=" + grp.cacheOrGroupName() + + ", affVer=" + affVer + + ", fut=" + exchFut + ']'; + + affAssignment = grp.affinity().readyAssignments(affVer); + } + else { + assert !exchFut.context().mergeExchanges(); + + affVer = exchFut.initialVersion(); + affAssignment = grp.affinity().idealAssignment(); + } + + initPartitions(affVer, affAssignment, exchFut, updateSeq); + } + } + + consistencyCheck(); + + if (updateMoving) { + assert grp.affinity().lastVersion().equals(evts.topologyVersion()); + + createMovingPartitions(grp.affinity().readyAffinity(evts.topologyVersion())); + } + + if (log.isDebugEnabled()) { + log.debug("Partition map after beforeExchange [grp=" + grp.cacheOrGroupName() + ", " + + "exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); + } + + if (log.isTraceEnabled()) { + log.trace("Partition states after beforeExchange [grp=" + grp.cacheOrGroupName() + + ", exchId=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']'); + } + } + finally { + lock.writeLock().unlock(); + } + } + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + + /** + * Initializes full map if current full map is empty or invalid in case of coordinator or cache groups start. + * + * @param updateSeq Update sequence to initialize full map. + */ + private void initializeFullMap(long updateSeq) { + if (!(topReadyFut instanceof GridDhtPartitionsExchangeFuture)) + return; + + GridDhtPartitionsExchangeFuture exchFut = (GridDhtPartitionsExchangeFuture) topReadyFut; + + boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); + + ClusterNode oldest = discoCache.oldestAliveServerNode(); + + // If this is the oldest node. + if (oldest != null && (ctx.localNode().equals(oldest) || grpStarted)) { + if (node2part == null) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); + + if (log.isDebugEnabled()) { + log.debug("Created brand new full topology map on oldest node [" + + "grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() + + ", fullMap=" + fullMapString() + ']'); + } + } + else if (!node2part.valid()) { + node2part = new GridDhtPartitionFullMap(oldest.id(), + oldest.order(), + updateSeq, + node2part, + false); + + if (log.isDebugEnabled()) { + log.debug("Created new full topology map on oldest node [" + + "grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() + + ", fullMap=" + node2part + ']'); + } + } + else if (!node2part.nodeId().equals(ctx.localNode().id())) { + node2part = new GridDhtPartitionFullMap(oldest.id(), + oldest.order(), + updateSeq, + node2part, + false); + + if (log.isDebugEnabled()) { + log.debug("Copied old map into new map on oldest node (previous oldest node left) [" + + "grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() + + ", fullMap=" + fullMapString() + ']'); + } + } + } + } + + /** + * @param p Partition number. + * @param topVer Topology version. + * @return {@code True} if given partition belongs to local node. + */ + private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) { + return grp.affinity().nodes(p, topVer).contains(ctx.localNode()); + } + + /** {@inheritDoc} */ + @Override public void afterStateRestored(AffinityTopologyVersion topVer) { + lock.writeLock().lock(); + + try { + long updateSeq = this.updateSeq.incrementAndGet(); + + initializeFullMap(updateSeq); + + for (int p = 0; p < grp.affinity().partitions(); p++) { + GridDhtLocalPartition locPart = locParts.get(p); + + if (locPart == null) + updateLocal(p, EVICTED, updateSeq, topVer); + else { + GridDhtPartitionState state = locPart.state(); + + updateLocal(p, state, updateSeq, topVer); + + // Restart cleaning. + if (state == RENTING) + locPart.clearAsync(); + } + } + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) { + boolean changed = false; + + int partitions = grp.affinity().partitions(); + + AffinityTopologyVersion topVer = exchFut.context().events().topologyVersion(); + + assert grp.affinity().lastVersion().equals(topVer) : "Affinity is not initialized " + + "[grp=" + grp.cacheOrGroupName() + + ", topVer=" + topVer + + ", affVer=" + grp.affinity().lastVersion() + + ", fut=" + exchFut + ']'; + + ctx.database().checkpointReadLock(); + + try { + + lock.writeLock().lock(); + + try { + if (stopping) + return false; + + assert readyTopVer.initialized() : readyTopVer; + assert lastTopChangeVer.equals(readyTopVer); + + if (log.isDebugEnabled()) { + log.debug("Partition map before afterExchange [grp=" + grp.cacheOrGroupName() + + ", exchId=" + exchFut.exchangeId() + + ", fullMap=" + fullMapString() + ']'); + } + + if (log.isTraceEnabled()) { + log.trace("Partition states before afterExchange [grp=" + grp.cacheOrGroupName() + + ", exchVer=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']'); + } + + long updateSeq = this.updateSeq.incrementAndGet(); + + for (int p = 0; p < partitions; p++) { + GridDhtLocalPartition locPart = localPartition0(p, topVer, false, true); + + if (partitionLocalNode(p, topVer)) { + // Prepare partition to rebalance if it's not happened on full map update phase. + if (locPart == null || locPart.state() == RENTING || locPart.state() == EVICTED) + locPart = rebalancePartition(p, false); + + GridDhtPartitionState state = locPart.state(); + + if (state == MOVING) { + if (grp.rebalanceEnabled()) { + Collection<ClusterNode> owners = owners(p); + + // If an owner node left during exchange, then new exchange should be started with detecting lost partitions. + if (!F.isEmpty(owners)) { + if (log.isDebugEnabled()) + log.debug("Will not own partition (there are owners to rebalance from) " + + "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", owners = " + owners + ']'); + } + } + else + updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer); + } + } + else { + if (locPart != null) { + GridDhtPartitionState state = locPart.state(); + + if (state == MOVING) { + locPart.rent(false); + + updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer); + + changed = true; + + if (log.isDebugEnabled()) { + log.debug("Evicting " + state + " partition (it does not belong to affinity) [" + + "grp=" + grp.cacheOrGroupName() + ", p=" + locPart.id() + ']'); + } + } + } + } + } + + AffinityAssignment aff = grp.affinity().readyAffinity(topVer); + + if (node2part != null && node2part.valid()) + changed |= checkEvictions(updateSeq, aff); + + updateRebalanceVersion(aff.topologyVersion(), aff.assignment()); + + consistencyCheck(); + + if (log.isTraceEnabled()) { + log.trace("Partition states after afterExchange [grp=" + grp.cacheOrGroupName() + + ", exchVer=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']'); + } + } + finally { + lock.writeLock().unlock(); + } + } + finally { + ctx.database().checkpointReadUnlock(); + } + + return changed; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, + boolean create) + throws GridDhtInvalidPartitionException { + return localPartition0(p, topVer, create, false); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, + boolean create, boolean showRenting) throws GridDhtInvalidPartitionException { + return localPartition0(p, topVer, create, showRenting); + } + + /** + * Creates partition with id {@code p} if it doesn't exist or evicted. + * In other case returns existing partition. + * + * @param p Partition number. + * @return Partition. + */ + private GridDhtLocalPartition getOrCreatePartition(int p) { + assert lock.isWriteLockedByCurrentThread(); + + assert ctx.database().checkpointLockIsHeldByThread(); + + GridDhtLocalPartition loc = locParts.get(p); + + if (loc == null || loc.state() == EVICTED) { + // Make sure that after eviction partition is destroyed. + if (loc != null) + loc.awaitDestroy(); + + locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); + + long updCntr = cntrMap.updateCounter(p); + + if (updCntr != 0) + loc.updateCounter(updCntr); + + if (ctx.pageStore() != null) { + try { + ctx.pageStore().onPartitionCreated(grp.groupId(), p); + } + catch (IgniteCheckedException e) { + // TODO ignite-db + throw new IgniteException(e); + } + } + } + + return loc; + } + + /** {@inheritDoc} */ + @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException { + lock.writeLock().lock(); + + try { + GridDhtLocalPartition part = locParts.get(p); + + if (part != null && part.state() != EVICTED) + return part; + + part = new GridDhtLocalPartition(ctx, grp, p); + + locParts.set(p, part); + + ctx.pageStore().onPartitionCreated(grp.groupId(), p); + + return part; + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * @param p Partition number. + * @param topVer Topology version. + * @param create If {@code true} create partition if it doesn't exists or evicted. + * @param showRenting If {@code true} return partition in RENTING state if exists. + * @return Local partition. + */ + @SuppressWarnings("TooBroadScope") + private GridDhtLocalPartition localPartition0(int p, + AffinityTopologyVersion topVer, + boolean create, + boolean showRenting) { + GridDhtLocalPartition loc; + + loc = locParts.get(p); + + GridDhtPartitionState state = loc != null ? loc.state() : null; + + if (loc != null && state != EVICTED && (state != RENTING || showRenting)) + return loc; + + if (!create) + return null; + + boolean created = false; + + ctx.database().checkpointReadLock(); + + try { + lock.writeLock().lock(); + + try { + loc = locParts.get(p); + + state = loc != null ? loc.state() : null; + + boolean belongs = partitionLocalNode(p, topVer); + + if (loc != null && state == EVICTED) { + // Make sure that after eviction partition is destroyed. + loc.awaitDestroy(); + + locParts.set(p, loc = null); + + if (!belongs) { + throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " + + "(often may be caused by inconsistent 'key.hashCode()' implementation) " + + "[grp=" + grp.cacheOrGroupName() + ", part=" + p + ", topVer=" + topVer + + ", this.topVer=" + this.readyTopVer + ']'); + } + } + else if (loc != null && state == RENTING && !showRenting) { + throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently " + + "evicted [grp=" + grp.cacheOrGroupName() + ", part=" + p + ", shouldBeMoving=" + + ", belongs=" + belongs + ", topVer=" + topVer + ", curTopVer=" + this.readyTopVer + "]"); + } + + if (loc == null) { + if (!belongs) + throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " + + "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " + + "[grp=" + grp.cacheOrGroupName() + ", part=" + p + ", topVer=" + topVer + + ", this.topVer=" + this.readyTopVer + ']'); + + locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); + + this.updateSeq.incrementAndGet(); + + created = true; + } + } + finally { + lock.writeLock().unlock(); + } + } + finally { + ctx.database().checkpointReadUnlock(); + } + + if (created && ctx.pageStore() != null) { + try { + ctx.pageStore().onPartitionCreated(grp.groupId(), p); + } + catch (IgniteCheckedException e) { + // TODO ignite-db + throw new IgniteException(e); + } + } + + return loc; + } + + /** {@inheritDoc} */ + @Override public void releasePartitions(int... parts) { + assert parts != null; + assert parts.length > 0; + + for (int i = 0; i < parts.length; i++) { + GridDhtLocalPartition part = locParts.get(parts[i]); + + if (part != null) + part.release(); + } + } + + /** {@inheritDoc} */ + @Override public GridDhtLocalPartition localPartition(int part) { + return locParts.get(part); + } + + /** {@inheritDoc} */ + @Override public List<GridDhtLocalPartition> localPartitions() { + List<GridDhtLocalPartition> list = new ArrayList<>(locParts.length()); + + for (int i = 0; i < locParts.length(); i++) { + GridDhtLocalPartition part = locParts.get(i); + + if (part != null && part.state().active()) + list.add(part); + } + + return list; + } + + /** {@inheritDoc} */ + @Override public Iterable<GridDhtLocalPartition> currentLocalPartitions() { + return new Iterable<GridDhtLocalPartition>() { + @Override public Iterator<GridDhtLocalPartition> iterator() { + return new CurrentPartitionsIterator(); + } + }; + } + + /** {@inheritDoc} */ + @Override public void onRemoved(GridDhtCacheEntry e) { + /* + * Make sure not to acquire any locks here as this method + * may be called from sensitive synchronization blocks. + * =================================================== + */ + + GridDhtLocalPartition loc = localPartition(e.partition(), readyTopVer, false); + + if (loc != null) + loc.onRemoved(e); + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionMap localPartitionMap() { + GridPartitionStateMap map = new GridPartitionStateMap(locParts.length()); + + lock.readLock().lock(); + + try { + for (int i = 0; i < locParts.length(); i++) { + GridDhtLocalPartition part = locParts.get(i); + + if (part == null) + continue; + + map.put(i, part.state()); + } + + GridDhtPartitionMap locPartMap = node2part != null ? node2part.get(ctx.localNodeId()) : null; + + return new GridDhtPartitionMap(ctx.localNodeId(), + updateSeq.get(), + locPartMap != null ? locPartMap.topologyVersion() : readyTopVer, + map, + true); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) { + lock.readLock().lock(); + + try { + GridDhtPartitionMap partMap = node2part.get(nodeId); + + if (partMap != null) { + GridDhtPartitionState state = partMap.get(part); + + return state == null ? EVICTED : state; + } + + return EVICTED; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public List<ClusterNode> nodes(int p, + AffinityAssignment affAssignment, + List<ClusterNode> affNodes) { + return nodes0(p, affAssignment, affNodes); + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { + AffinityAssignment affAssignment = grp.affinity().cachedAffinity(topVer); + + List<ClusterNode> affNodes = affAssignment.get(p); + + List<ClusterNode> nodes = nodes0(p, affAssignment, affNodes); + + return nodes != null ? nodes : affNodes; + } + + /** + * @param p Partition. + * @param affAssignment Assignments. + * @param affNodes Node assigned for given partition by affinity. + * @return Nodes responsible for given partition (primary is first). + */ + @Nullable private List<ClusterNode> nodes0(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes) { + if (grp.isReplicated()) + return affNodes; + + AffinityTopologyVersion topVer = affAssignment.topologyVersion(); + + lock.readLock().lock(); + + try { + assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer + + ", topVer2=" + this.readyTopVer + + ", node=" + ctx.igniteInstanceName() + + ", grp=" + grp.cacheOrGroupName() + + ", node2part=" + node2part + ']'; + + List<ClusterNode> nodes = null; + + if (!topVer.equals(diffFromAffinityVer)) { + LT.warn(log, "Requested topology version does not match calculated diff, will require full iteration to" + + "calculate mapping [grp=" + grp.cacheOrGroupName() + ", topVer=" + topVer + + ", diffVer=" + diffFromAffinityVer + "]"); + + nodes = new ArrayList<>(); + + nodes.addAll(affNodes); + + for (Map.Entry<UUID, GridDhtPartitionMap> entry : node2part.entrySet()) { + GridDhtPartitionState state = entry.getValue().get(p); + + ClusterNode n = ctx.discovery().node(entry.getKey()); + + if (n != null && state != null && (state == MOVING || state == OWNING || state == RENTING) + && !nodes.contains(n) && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { + nodes.add(n); + } + + } + + return nodes; + } + + Collection<UUID> diffIds = diffFromAffinity.get(p); + + if (!F.isEmpty(diffIds)) { + HashSet<UUID> affIds = affAssignment.getIds(p); + + for (UUID nodeId : diffIds) { + if (affIds.contains(nodeId)) { + U.warn(log, "Node from diff is affinity node, skipping it [grp=" + grp.cacheOrGroupName() + + ", node=" + nodeId + ']'); + + continue; + } + + if (hasState(p, nodeId, OWNING, MOVING, RENTING)) { + ClusterNode n = ctx.discovery().node(nodeId); + + if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { + if (nodes == null) { + nodes = new ArrayList<>(affNodes.size() + diffIds.size()); + + nodes.addAll(affNodes); + } + + nodes.add(n); + } + } + } + } + + return nodes; + } + finally { + lock.readLock().unlock(); + } + } + + /** + * @param p Partition. + * @param topVer Topology version ({@code -1} for all nodes). + * @param state Partition state. + * @param states Additional partition states. + * @return List of nodes for the partition. + */ + private List<ClusterNode> nodes( + int p, + AffinityTopologyVersion topVer, + GridDhtPartitionState state, + GridDhtPartitionState... states + ) { + Collection<UUID> allIds = F.nodeIds(discoCache.cacheGroupAffinityNodes(grp.groupId())); + + lock.readLock().lock(); + + try { + assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + + ", grp=" + grp.cacheOrGroupName() + + ", allIds=" + allIds + + ", node2part=" + node2part + ']'; + + // Node IDs can be null if both, primary and backup, nodes disappear. + List<ClusterNode> nodes = new ArrayList<>(); + + for (UUID id : allIds) { + if (hasState(p, id, state, states)) { + ClusterNode n = ctx.discovery().node(id); + + if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) + nodes.add(n); + } + } + + return nodes; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) { + if (!grp.rebalanceEnabled()) + return ownersAndMoving(p, topVer); + + return nodes(p, topVer, OWNING, null); + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> owners(int p) { + return owners(p, AffinityTopologyVersion.NONE); + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> allOwners() { + lock.readLock().lock(); + + try { + int parts = partitions(); + + List<List<ClusterNode>> res = new ArrayList<>(parts); + + for (int i = 0; i < parts; i++) + res.add(new ArrayList<>()); + + List<ClusterNode> allNodes = discoCache.cacheGroupAffinityNodes(grp.groupId()); + + for (int i = 0; i < allNodes.size(); i++) { + ClusterNode node = allNodes.get(i); + + GridDhtPartitionMap nodeParts = node2part.get(node.id()); + + if (nodeParts != null) { + for (Map.Entry<Integer, GridDhtPartitionState> e : nodeParts.map().entrySet()) { + if (e.getValue() == OWNING) { + int part = e.getKey(); + + List<ClusterNode> owners = res.get(part); + + owners.add(node); + } + } + } + } + + return res; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> moving(int p) { + if (!grp.rebalanceEnabled()) + return ownersAndMoving(p, AffinityTopologyVersion.NONE); + + return nodes(p, AffinityTopologyVersion.NONE, MOVING, null); + } + + /** + * @param p Partition. + * @param topVer Topology version. + * @return List of nodes in state OWNING or MOVING. + */ + private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer) { + return nodes(p, topVer, OWNING, MOVING_STATES); + } + + /** {@inheritDoc} */ + @Override public long updateSequence() { + return updateSeq.get(); + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) { + lock.readLock().lock(); + + try { + if (node2part == null || stopping) + return null; + + assert node2part.valid() : "Invalid node2part [node2part=" + node2part + + ", grp=" + grp.cacheOrGroupName() + + ", stopping=" + stopping + + ", locNodeId=" + ctx.localNode().id() + + ", locName=" + ctx.igniteInstanceName() + ']'; + + GridDhtPartitionFullMap m = node2part; + + return new GridDhtPartitionFullMap(m.nodeId(), m.nodeOrder(), m.updateSequence(), m, onlyActive); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * Checks should current partition map overwritten by new partition map + * Method returns true if topology version or update sequence of new map are greater than of current map + * + * @param currentMap Current partition map + * @param newMap New partition map + * @return True if current partition map should be overwritten by new partition map, false in other case + */ + private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) { + return newMap != null && + (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 || + newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && newMap.updateSequence() > currentMap.updateSequence()); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) + @Override public boolean update( + @Nullable AffinityTopologyVersion exchangeVer, + GridDhtPartitionFullMap partMap, + @Nullable CachePartitionFullCountersMap incomeCntrMap, + Set<Integer> partsToReload, + @Nullable Map<Integer, Long> partSizes, + @Nullable AffinityTopologyVersion msgTopVer) { + if (log.isDebugEnabled()) { + log.debug("Updating full partition map " + + "[grp=" + grp.cacheOrGroupName() + ", exchVer=" + exchangeVer + ", fullMap=" + fullMapString() + ']'); + } + + assert partMap != null; + + ctx.database().checkpointReadLock(); + + try { + lock.writeLock().lock(); + + try { + if (log.isTraceEnabled() && exchangeVer != null) { + log.trace("Partition states before full update [grp=" + grp.cacheOrGroupName() + + ", exchVer=" + exchangeVer + ", states=" + dumpPartitionStates() + ']'); + } + + if (stopping || !lastTopChangeVer.initialized() || + // Ignore message not-related to exchange if exchange is in progress. + (exchangeVer == null && !lastTopChangeVer.equals(readyTopVer))) + return false; + + if (incomeCntrMap != null) { + // update local counters in partitions + for (int i = 0; i < locParts.length(); i++) { + cntrMap.updateCounter(i, incomeCntrMap.updateCounter(i)); + + GridDhtLocalPartition part = locParts.get(i); + + if (part == null) + continue; + + if (part.state() == OWNING || part.state() == MOVING) { + long updCntr = incomeCntrMap.updateCounter(part.id()); + long curCntr = part.updateCounter(); + + if (updCntr != 0 && updCntr > curCntr) { + part.updateCounter(updCntr); + + if (log.isDebugEnabled()) + log.debug("Partition update counter has updated [grp=" + grp.cacheOrGroupName() + ", p=" + part.id() + + ", state=" + part.state() + ", prevCntr=" + curCntr + ", nextCntr=" + updCntr + "]"); + } + } + } + } + + if (exchangeVer != null) { + // Ignore if exchange already finished or new exchange started. + if (readyTopVer.compareTo(exchangeVer) > 0 || lastTopChangeVer.compareTo(exchangeVer) > 0) { + U.warn(log, "Stale exchange id for full partition map update (will ignore) [" + + "grp=" + grp.cacheOrGroupName() + + ", lastTopChange=" + lastTopChangeVer + + ", readTopVer=" + readyTopVer + + ", exchVer=" + exchangeVer + ']'); + + return false; + } + } + + if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) > 0) { + U.warn(log, "Stale version for full partition map update message (will ignore) [" + + "grp=" + grp.cacheOrGroupName() + + ", lastTopChange=" + lastTopChangeVer + + ", readTopVer=" + readyTopVer + + ", msgVer=" + msgTopVer + ']'); + + return false; + } + + boolean fullMapUpdated = (node2part == null); + + if (node2part != null) { + for (GridDhtPartitionMap part : node2part.values()) { + GridDhtPartitionMap newPart = partMap.get(part.nodeId()); + + if (shouldOverridePartitionMap(part, newPart)) { + fullMapUpdated = true; + + if (log.isDebugEnabled()) { + log.debug("Overriding partition map in full update map [" + + "grp=" + grp.cacheOrGroupName() + + ", exchVer=" + exchangeVer + + ", curPart=" + mapString(part) + + ", newPart=" + mapString(newPart) + ']'); + } + + if (newPart.nodeId().equals(ctx.localNodeId())) + updateSeq.setIfGreater(newPart.updateSequence()); + } + else { + // If for some nodes current partition has a newer map, + // then we keep the newer value. + partMap.put(part.nodeId(), part); + } + } + + // Check that we have new nodes. + for (GridDhtPartitionMap part : partMap.values()) { + if (fullMapUpdated) + break; + + fullMapUpdated = !node2part.containsKey(part.nodeId()); + } + + // Remove entry if node left. + for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) { + UUID nodeId = it.next(); + + if (!ctx.discovery().alive(nodeId)) { + if (log.isTraceEnabled()) + log.trace("Removing left node from full map update [grp=" + grp.cacheOrGroupName() + + ", nodeId=" + nodeId + ", partMap=" + partMap + ']'); + + if (node2part.containsKey(nodeId)) { + GridDhtPartitionMap map = partMap.get(nodeId); + + if (map != null) + leftNode2Part.put(nodeId, map); + } + + it.remove(); + } + } + } + else { + GridDhtPartitionMap locNodeMap = partMap.get(ctx.localNodeId()); + + if (locNodeMap != null) + updateSeq.setIfGreater(locNodeMap.updateSequence()); + } + + if (!fullMapUpdated) { + if (log.isTraceEnabled()) { + log.trace("No updates for full partition map (will ignore) [" + + "grp=" + grp.cacheOrGroupName() + + ", lastExch=" + lastTopChangeVer + + ", exchVer=" + exchangeVer + + ", curMap=" + node2part + + ", newMap=" + partMap + ']'); + } + + return false; + } + + if (exchangeVer != null) { + assert exchangeVer.compareTo(readyTopVer) >= 0 && exchangeVer.compareTo(lastTopChangeVer) >= 0; + + lastTopChangeVer = readyTopVer = exchangeVer; + } + + node2part = partMap; + + if (exchangeVer == null && !grp.isReplicated() && + (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) { + AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer); + + for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { + for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { + int p = e0.getKey(); + + Set<UUID> diffIds = diffFromAffinity.get(p); + + if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue() == RENTING) && + !affAssignment.getIds(p).contains(e.getKey())) { + + if (diffIds == null) + diffFromAffinity.put(p, diffIds = U.newHashSet(3)); + + diffIds.add(e.getKey()); + } + else { + if (diffIds != null && diffIds.remove(e.getKey())) { + if (diffIds.isEmpty()) + diffFromAffinity.remove(p); + } + } + } + } + + diffFromAffinityVer = readyTopVer; + } + + boolean changed = false; + + GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId()); + + // Only in real exchange occurred. + if (exchangeVer != null && + nodeMap != null && + grp.persistenceEnabled() && + readyTopVer.initialized()) { + for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) { + int p = e.getKey(); + GridDhtPartitionState state = e.getValue(); + + if (state == OWNING) { + GridDhtLocalPartition locPart = locParts.get(p); + + assert locPart != null : grp.cacheOrGroupName(); + + if (locPart.state() == MOVING) { + boolean success = locPart.own(); + + assert success : locPart; + + changed |= success; + } + } + else if (state == MOVING) { + boolean haveHistory = !partsToReload.contains(p); + + rebalancePartition(p, haveHistory); + + changed = true; + } + } + } + + long updateSeq = this.updateSeq.incrementAndGet(); + + if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { + AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); + + if (exchangeVer == null) + changed |= checkEvictions(updateSeq, aff); + + updateRebalanceVersion(aff.topologyVersion(), aff.assignment()); + } + + if (partSizes != null) + this.globalPartSizes = partSizes; + + consistencyCheck(); + + if (log.isDebugEnabled()) { + log.debug("Partition map after full update [grp=" + grp.cacheOrGroupName() + + ", map=" + fullMapString() + ']'); + } + + if (log.isTraceEnabled() && exchangeVer != null) { + log.trace("Partition states after full update [grp=" + grp.cacheOrGroupName() + + ", exchVer=" + exchangeVer + ", states=" + dumpPartitionStates() + ']'); + } + + if (changed) { + if (log.isDebugEnabled()) + log.debug("Partitions have been scheduled to resend [reason=" + + "Full map update [grp" + grp.cacheOrGroupName() + "]"); + + ctx.exchange().scheduleResendPartitions(); + } + + return changed; + } finally { + lock.writeLock().unlock(); + } + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) { + assert cntrMap != null; + + long now = U.currentTimeMillis(); + + lock.writeLock().lock(); + + try { + long acquired = U.currentTimeMillis(); + + if (acquired - now >= 100) { + if (timeLog.isInfoEnabled()) + timeLog.info("Waited too long to acquire topology write lock " + + "[grp=" + grp.cacheOrGroupName() + ", waitTime=" + (acquired - now) + ']'); + } + + if (stopping) + return; + + for (int i = 0; i < cntrMap.size(); i++) { + int pId = cntrMap.partitionAt(i); + + long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i); + long updateCntr = cntrMap.updateCounterAt(i); + + if (this.cntrMap.updateCounter(pId) < updateCntr) { + this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr); + this.cntrMap.updateCounter(pId, updateCntr); + } + } + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void applyUpdateCounters() { + long now = U.currentTimeMillis(); + + lock.writeLock().lock(); + + try { + long acquired = U.currentTimeMillis(); + + if (acquired - now >= 100) { + if (timeLog.isInfoEnabled()) + timeLog.info("Waited too long to acquire topology write lock " + + "[grp=" + grp.cacheOrGroupName() + ", waitTime=" + (acquired - now) + ']'); + } + + if (stopping) + return; + + for (int i = 0; i < locParts.length(); i++) { + GridDhtLocalPartition part = locParts.get(i); + + if (part == null) + continue; + + boolean reserve = part.reserve(); + + try { + GridDhtPartitionState state = part.state(); + + if (!reserve || state == EVICTED || state == RENTING) + continue; + + long updCntr = cntrMap.updateCounter(part.id()); + + if (updCntr > part.updateCounter()) + part.updateCounter(updCntr); + else if (part.updateCounter() > 0) { + cntrMap.initialUpdateCounter(part.id(), part.initialUpdateCounter()); + cntrMap.updateCounter(part.id(), part.updateCounter()); + } + } + finally { + if (reserve) + part.release(); + } + } + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * Method checks is new partition map more stale than current partition map + * New partition map is stale if topology version or update sequence are less or equal than of current map + * + * @param currentMap Current partition map + * @param newMap New partition map + * @return True if new partition map is more stale than current partition map, false in other case + */ + private boolean isStaleUpdate(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) { + return currentMap != null && newMap.compareTo(currentMap) <= 0; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) + @Override public boolean update( + @Nullable GridDhtPartitionExchangeId exchId, + GridDhtPartitionMap parts, + boolean force + ) { + if (log.isDebugEnabled()) { + log.debug("Updating single partition map [grp=" + grp.cacheOrGroupName() + ", exchId=" + exchId + + ", parts=" + mapString(parts) + ']'); + } + + if (!ctx.discovery().alive(parts.nodeId())) { + if (log.isTraceEnabled()) { + log.trace("Received partition update for non-existing node (will ignore) [grp=" + grp.cacheOrGroupName() + + ", exchId=" + exchId + ", parts=" + parts + ']'); + } + + return false; + } + + ctx.database().checkpointReadLock(); + + try { + lock.writeLock().lock(); + + try { + if (stopping) + return false; + + if (!force) { + if (lastTopChangeVer.initialized() && exchId != null && lastTopChangeVer.compareTo(exchId.topologyVersion()) > 0) { + U.warn(log, "Stale exchange id for single partition map update (will ignore) [" + + "grp=" + grp.cacheOrGroupName() + + ", lastTopChange=" + lastTopChangeVer + + ", readTopVer=" + readyTopVer + + ", exch=" + exchId.topologyVersion() + ']'); + + return false; + } + } + + if (node2part == null) + // Create invalid partition map. + node2part = new GridDhtPartitionFullMap(); + + GridDhtPartitionMap cur = node2part.get(parts.nodeId()); + + if (force) { + if (cur != null && cur.topologyVersion().initialized()) + parts.updateSequence(cur.updateSequence(), cur.topologyVersion()); + } + else if (isStaleUpdate(cur, parts)) { + assert cur != null; + + String msg = "Stale update for single partition map update (will ignore) [" + + "nodeId=" + parts.nodeId() + + ", grp=" + grp.cacheOrGroupName() + + ", exchId=" + exchId + + ", curMap=" + cur + + ", newMap=" + parts + ']'; + + // This is usual situation when partition maps are equal, just print debug message. + if (cur.compareTo(parts) == 0) { + if (log.isTraceEnabled()) + log.trace(msg); + } + else + U.warn(log, msg); + + return false; + } + + long updateSeq = this.updateSeq.incrementAndGet(); + + node2part.newUpdateSequence(updateSeq); + + boolean changed = false; + + if (cur == null || !cur.equals(parts)) + changed = true; + + node2part.put(parts.nodeId(), parts); + + // During exchange diff is calculated after all messages are received and affinity initialized. + if (exchId == null && !grp.isReplicated()) { + if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) { + AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer); + + // Add new mappings. + for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) { + int p = e.getKey(); + + Set<UUID> diffIds = diffFromAffinity.get(p); + + if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue() == RENTING) + && !affAssignment.getIds(p).contains(parts.nodeId())) { + if (diffIds == null) + diffFromAffinity.put(p, diffIds = U.newHashSet(3)); + + if (diffIds.add(parts.nodeId())) + changed = true; + } + else { + if (diffIds != null && diffIds.remove(parts.nodeId())) { + changed = true; + + if (diffIds.isEmpty()) + diffFromAffinity.remove(p); + } + } + } + + // Remove obsolete mappings. + if (cur != null) { + for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { + Set<UUID> ids = diffFromAffinity.get(p); + + if (ids != null && ids.remove(parts.nodeId())) { + changed = true; + + if (ids.isEmpty()) + diffFromAffinity.remove(p); + } + } + } + + diffFromAffinityVer = readyTopVer; + } + } + + if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { + AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); + + if (exchId == null) + changed |= checkEvictions(updateSeq, aff); + + updateRebalanceVersion(aff.topologyVersion(), aff.assignment()); + } + + consistencyCheck(); + + if (log.isDebugEnabled()) + log.debug("Partition map after single update [grp=" + grp.cacheOrGroupName() + ", map=" + fullMapString() + ']'); + + if (changed && exchId == null) { + if (log.isDebugEnabled()) + log.debug("Partitions have been scheduled to resend [reason=" + + "Single map update [grp" + grp.cacheOrGroupName() + "]"); + + ctx.exchange().scheduleResendPartitions(); + } + + return changed; + } + finally { + lock.writeLock().unlock(); + } + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public void onExchangeDone(@Nullable GridDhtPartitionsExchangeFuture fut, + AffinityAssignment assignment, + boolean updateRebalanceVer) { + lock.writeLock().lock(); + + try { + assert !(topReadyFut instanceof GridDhtPartitionsExchangeFuture) || + assignment.topologyVersion().equals(((GridDhtPartitionsExchangeFuture)topReadyFut).context().events().topologyVersion()); + + readyTopVer = lastTopChangeVer = assignment.topologyVersion(); + + if (fut != null) + discoCache = fut.events().discoveryCache(); + + if (!grp.isReplicated()) { + boolean rebuildDiff = fut == null || fut.localJoinExchange() || fut.serverNodeDiscoveryEvent() || + fut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || !diffFromAffinityVer.initialized(); + + if (rebuildDiff) { + if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0) + rebuildDiff(assignment); + } + else + diffFromAffinityVer = readyTopVer; + + if (!updateRebalanceVer) + updateRebalanceVersion(assignment.topologyVersion(), assignment.assignment()); + } + + if (updateRebalanceVer) + updateRebalanceVersion(assignment.topologyVersion(), assignment.assignment()); + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * @param aff Affinity. + */ + private void createMovingPartitions(AffinityAssignment aff) { + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + GridDhtPartitionMap map = e.getValue(); + + addMoving(map, aff.backupPartitions(e.getKey())); + addMoving(map, aff.primaryPartitions(e.getKey())); + } + } + + /** + * @param map Node partition state map. + * @param parts Partitions assigned to node. + */ + private void addMoving(GridDhtPartitionMap map, Set<Integer> parts) { + if (F.isEmpty(parts)) + return; + + for (Integer p : parts) { + GridDhtPartitionState state = map.get(p); + + if (state == null || state == EVICTED) + map.put(p, MOVING); + } + } + + /** + * Rebuilds {@link #diffFromAffinity} from given assignment. + * + * @param affAssignment New affinity assignment. + */ + private void rebuildDiff(AffinityAssignment affAssignment) { + assert lock.isWriteLockedByCurrentThread(); + + if (node2part == null) + return; + + if (FAST_DIFF_REBUILD) { + Collection<UUID> affNodes = F.nodeIds(ctx.discovery().cacheGroupAffinityNodes(grp.groupId(), + affAssignment.topologyVersion())); + + for (Map.Entry<Integer, Set<UUID>> e : diffFromAffinity.entrySet()) { + int p = e.getKey(); + + Iterator<UUID> iter = e.getValue().iterator(); + + while (iter.hasNext()) { + UUID nodeId = iter.next(); + + if (!affNodes.contains(nodeId) || affAssignment.getIds(p).contains(nodeId)) + iter.remove(); + } + } + } + else { + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + UUID nodeId = e.getKey(); + + for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { + Integer p0 = e0.getKey(); + + GridDhtPartitionState state = e0.getValue(); + + Set<UUID> ids = diffFromAffinity.get(p0); + + if ((state == MOVING || state == OWNING || state == RENTING) && !affAssignment.getIds(p0).contains(nodeId)) { + if (ids == null) + diffFromAffinity.put(p0, ids = U.newHashSet(3)); + + ids.add(nodeId); + } + else { + if (ids != null) + ids.remove(nodeId); + } + } + } + } + + diffFromAffinityVer = affAssignment.topologyVersion(); + } + + /** {@inheritDoc} */ + @Override public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, DiscoveryEvent discoEvt) { + ctx.database().checkpointReadLock(); + + try { + lock.writeLock().lock(); + + try { + if (node2part == null) + return false; + + int parts = grp.affinity().partitions(); + + Set<Integer> lost = new HashSet<>(parts); + + for (int p = 0; p < parts; p++) + lost.add(p); + + for (GridDhtPartitionMap partMap : node2part.values()) { + for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) { + if (e.getValue() == OWNING) { + lost.remove(e.getKey()); + + if (lost.isEmpty()) + break; + } + } + } + + boolean changed = false; + + if (!F.isEmpty(lost)) { + PartitionLossPolicy plc = grp.config().getPartitionLossPolicy(); + + assert plc != null; + + Set<Integer> recentlyLost = new HashSet<>(); + + for (Map.Entry<UUID, GridDhtPartitionMap> leftEntry : leftNode2Part.entrySet()) { + for (Map.Entry<Integer, GridDhtPartitionState> entry : leftEntry.getValue().entrySet()) { + if (entry.getValue() == OWNING) + recentlyLost.add(entry.getKey()); + } + } + + // Update partition state on all nodes. + for (Integer part : lost) { + long updSeq = updateSeq.incrementAndGet(); + + GridDhtLocalPartition locPart = localPartition(part, resTopVer, false, true); + + if (locPart != null) { + if (locPart.state() == LOST) + continue; + + boolean marked = plc == PartitionLossPolicy.IGNORE ? locPart.own() : locPart.markLost(); + + if (marked) + updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer); + + changed |= marked; + } + // Update map for remote node. + else if (plc != PartitionLossPolicy.IGNORE) { + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + if (e.getKey().equals(ctx.localNodeId())) + continue; + + if (e.getValue().get(part) != EVICTED) + e.getValue().put(part, LOST); + } + } + + if (recentlyLost.contains(part) && grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + grp.addRebalanceEvent(part, + EVT_CACHE_REBALANCE_PART_DATA_LOST, + discoEvt.eventNode(), + discoEvt.type(), + discoEvt.timestamp()); + } + } + + if (plc != PartitionLossPolicy.IGNORE) + grp.needsRecovery(true); + } + + leftNode2Part.clear(); + + return changed; + } + finally { + lock.writeLock().unlock(); + } + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public void resetLostPartitions(AffinityTopologyVersion resTopVer) { + ctx.database().checkpointReadLock(); + + try { + lock.writeLock().lock(); + + try { + long updSeq = updateSeq.incrementAndGet(); + + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { + if (e0.getValue() != LOST) + continue; + + e0.setValue(OWNING); + + GridDhtLocalPartition locPart = localPartition(e0.getKey(), resTopVer, false); + + if (locPart != null && locPart.state() == LOST) { + boolean marked = locPart.own(); + + if (marked) + updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer); + } + } + } + + checkEvictions(updSeq, grp.affinity().readyAffinity(resTopVer)); + + grp.needsRecovery(false); + } + finally { + lock.writeLock().unlock(); + } + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public Collection<Integer> lostPartitions() { + if (grp.config().getPartitionLossPolicy() == PartitionLossPolicy.IGNORE) + return Collections.emptySet(); + + lock.readLock().lock(); + + try { + Set<Integer> res = null; + + int parts = grp.affinity().partitions(); + + for (GridDhtPartitionMap partMap : node2part.values()) { + for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) { + if (e.getValue() == LOST) { + if (res == null) + res = new HashSet<>(parts); + + res.add(e.getKey()); + } + } + } + + return res == null ? Collections.<Integer>emptySet() : res; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory) { + Map<UUID, Set<Integer>> result = new HashMap<>(); + + ctx.database().checkpointReadLock(); + + try { + lock.writeLock().lock(); + + try { + // First process local partitions. + for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) { + int part = entry.getKey(); + Set<UUID> newOwners = entry.getValue(); + + GridDhtLocalPartition locPart = localPartition(part); + + if (locPart == null || locPart.state() != OWNING) + continue; + + if (!newOwners.contains(ctx.localNodeId())) { + rebalancePartition(part, haveHistory.contains(part)); + + result.computeIfAbsent(ctx.localNodeId(), n -> new HashSet<>()); + result.get(ctx.localNodeId()).add(part); + } + } + + // Then process remote partitions. + for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) { + int part = entry.getKey(); + Set<UUID> newOwners = entry.getValue(); + + for (Map.Entry<UUID, GridDhtPartitionMap> remotes : node2part.entrySet()) { + UUID remoteNodeId = remotes.getKey(); + GridDhtPartitionMap partMap = remotes.getValue(); + + GridDhtPartitionState state = partMap.get(part); + + if (state == null || state != OWNING) + continue; + + if (!newOwners.contains(remoteNodeId)) { + partMap.put(part, MOVING); + + partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); + + if (partMap.nodeId().equals(ctx.localNodeId())) + updateSeq.setIfGreater(partMap.updateSequence()); + + result.computeIfAbsent(remoteNodeId, n -> new HashSet<>()); + result.get(remoteNodeId).add(part); + } + } + } + + for (Map.Entry<UUID, Set<Integer>> entry : result.entrySet()) { + UUID nodeId = entry.getKey(); + Set<Integer> rebalancedParts = entry.getValue(); + + if (!rebalancedParts.isEmpty()) { + Set<Integer> historical = rebalancedParts.stream() + .filter(haveHistory::contains) + .collect(Collectors.toSet()); + + // Filter out partitions having WAL history. + rebalancedParts.removeAll(historical); + + U.warn(log, "Partitions have been scheduled for rebalancing due to outdated update counter " + + "[grp=" + grp.cacheOrGroupName() + + ", nodeId=" + nodeId + + ", partsFull=" + S.compact(rebalancedParts) + + ", partsHistorical=" + S.compact(historical) + "]"); + } + } + + node2part = new GridDhtPartitionFullMap(node2part, updateSeq.incrementAndGet()); + } finally { + lock.writeLock().unlock(); + } + } + finally { + ctx.database().checkpointReadUnlock(); + } + + return result; + } + + /** + * Prepares given partition {@code p} for rebalance. + * Changes partition state to MOVING and starts clearing if needed. + * Prevents ongoing renting if required. + * + * @param p Partition id. + * @param haveHistory If {@code true} there is WAL history to rebalance partition, + * in other case partition will be cleared for full rebalance. + */ + private GridDhtLocalPartition rebalancePartition(int p, boolean haveHistory) { + GridDhtLocalPartition part = getOrCreatePartition(p); + + // Prevent renting. + if (part.state() == RENTING) { + if (part.reserve()) { + part.moving(); + part.release(); + } + else { + assert part.state() == EVICTED : part; + + part = getOrCreatePartition(p); + } + } + + if (part.state() != MOVING) + part.moving(); + + if (!haveHistory) + part.clearAsync(); + + assert part.state() == MOVING : part; + + return part; + } + + /** + * Finds local partitions which don't belong to affinity and runs eviction process for such partitions. + * + * @param updateSeq Update sequence. + * @param aff Affinity assignments. + * @return {@code True} if there are local partitions need to be evicted. + */ + private boolean checkEvictions(long updateSeq, AffinityAssignment aff) { + if (!ctx.kernalContext().state().evictionsAllowed()) + return false; + + boolean hasEvictedPartitions = false; + + UUID locId = ctx.localNodeId(); + + List<IgniteInternalFuture<?>> rentingFutures = new ArrayLis
<TRUNCATED>