http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java deleted file mode 100644 index 1256a8b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ /dev/null @@ -1,2962 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceArray; -import java.util.stream.Collectors; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.PartitionLossPolicy; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.managers.discovery.DiscoCache; -import org.apache.ignite.internal.processors.affinity.AffinityAssignment; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheGroupContext; -import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; -import org.apache.ignite.internal.util.F0; -import org.apache.ignite.internal.util.GridAtomicLong; -import org.apache.ignite.internal.util.GridPartitionStateMap; -import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.SB; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; -import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING; - -/** - * Partition topology. - */ -@GridToStringExclude -public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { - /** */ - private static final GridDhtPartitionState[] MOVING_STATES = new GridDhtPartitionState[] {MOVING}; - - /** Flag to control amount of output for full map. */ - private static final boolean FULL_MAP_DEBUG = false; - - /** */ - private static final boolean FAST_DIFF_REBUILD = false; - - /** */ - private final GridCacheSharedContext ctx; - - /** */ - private final CacheGroupContext grp; - - /** Logger. */ - private final IgniteLogger log; - - /** Time logger. */ - private final IgniteLogger timeLog; - - /** */ - private final AtomicReferenceArray<GridDhtLocalPartition> locParts; - - /** Node to partition map. */ - private GridDhtPartitionFullMap node2part; - - /** Partitions map for left nodes. */ - private GridDhtPartitionFullMap leftNode2Part = new GridDhtPartitionFullMap(); - - /** */ - private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>(); - - /** */ - private volatile AffinityTopologyVersion diffFromAffinityVer = AffinityTopologyVersion.NONE; - - /** Last started exchange version (always >= readyTopVer). */ - private volatile AffinityTopologyVersion lastTopChangeVer = AffinityTopologyVersion.NONE; - - /** Last finished exchange version. */ - private volatile AffinityTopologyVersion readyTopVer = AffinityTopologyVersion.NONE; - - /** Discovery cache. */ - private volatile DiscoCache discoCache; - - /** */ - private volatile boolean stopping; - - /** A future that will be completed when topology with version topVer will be ready to use. */ - private volatile GridDhtTopologyFuture topReadyFut; - - /** */ - private final GridAtomicLong updateSeq = new GridAtomicLong(1); - - /** Lock. */ - private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16); - - /** Partition update counter. */ - private final CachePartitionFullCountersMap cntrMap; - - /** */ - private volatile Map<Integer, Long> globalPartSizes; - - /** */ - private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE; - - /** */ - private volatile MvccCoordinator mvccCrd; - - /** - * @param ctx Cache shared context. - * @param grp Cache group. - */ - public GridDhtPartitionTopologyImpl( - GridCacheSharedContext ctx, - CacheGroupContext grp - ) { - assert ctx != null; - assert grp != null; - - this.ctx = ctx; - this.grp = grp; - - log = ctx.logger(getClass()); - - timeLog = ctx.logger(GridDhtPartitionsExchangeFuture.EXCHANGE_LOG); - - locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions()); - - cntrMap = new CachePartitionFullCountersMap(locParts.length()); - } - - /** {@inheritDoc} */ - @Override public int partitions() { - return grp.affinityFunction().partitions(); - } - - /** {@inheritDoc} */ - @Override public int groupId() { - return grp.groupId(); - } - - /** - * - */ - public void onReconnected() { - lock.writeLock().lock(); - - try { - node2part = null; - - diffFromAffinity.clear(); - - updateSeq.set(1); - - topReadyFut = null; - - diffFromAffinityVer = AffinityTopologyVersion.NONE; - - rebalancedTopVer = AffinityTopologyVersion.NONE; - - readyTopVer = AffinityTopologyVersion.NONE; - - lastTopChangeVer = AffinityTopologyVersion.NONE; - - discoCache = ctx.discovery().discoCache(); - } - finally { - lock.writeLock().unlock(); - } - } - - /** - * @return Full map string representation. - */ - @SuppressWarnings({"ConstantConditions"}) - private String fullMapString() { - return node2part == null ? "null" : FULL_MAP_DEBUG ? node2part.toFullString() : node2part.toString(); - } - - /** - * @param map Map to get string for. - * @return Full map string representation. - */ - @SuppressWarnings({"ConstantConditions"}) - private String mapString(GridDhtPartitionMap map) { - return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString(); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) - @Override public void readLock() { - lock.readLock().lock(); - } - - /** {@inheritDoc} */ - @Override public void readUnlock() { - lock.readLock().unlock(); - } - - /** {@inheritDoc} */ - @Override public MvccCoordinator mvccCoordinator() { - return mvccCrd; - } - - /** {@inheritDoc} */ - @Override public boolean holdsLock() { - return lock.isWriteLockedByCurrentThread() || lock.getReadHoldCount() > 0; - } - - /** {@inheritDoc} */ - @Override public void updateTopologyVersion( - GridDhtTopologyFuture exchFut, - @NotNull DiscoCache discoCache, - MvccCoordinator mvccCrd, - long updSeq, - boolean stopping - ) throws IgniteInterruptedCheckedException { - U.writeLock(lock); - - try { - AffinityTopologyVersion exchTopVer = exchFut.initialVersion(); - - assert exchTopVer.compareTo(readyTopVer) > 0 : "Invalid topology version [grp=" + grp.cacheOrGroupName() + - ", topVer=" + readyTopVer + - ", exchTopVer=" + exchTopVer + - ", discoCacheVer=" + (this.discoCache != null ? this.discoCache.version() : "None") + - ", exchDiscoCacheVer=" + discoCache.version() + - ", fut=" + exchFut + ']'; - - this.stopping = stopping; - - updateSeq.setIfGreater(updSeq); - - topReadyFut = exchFut; - - rebalancedTopVer = AffinityTopologyVersion.NONE; - - lastTopChangeVer = exchTopVer; - - this.discoCache = discoCache; - this.mvccCrd = mvccCrd; - } - finally { - lock.writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public AffinityTopologyVersion readyTopologyVersion() { - AffinityTopologyVersion topVer = this.readyTopVer; - - assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + - ", group=" + grp.cacheOrGroupName() + ']'; - - return topVer; - } - - /** {@inheritDoc} */ - @Override public AffinityTopologyVersion lastTopologyChangeVersion() { - AffinityTopologyVersion topVer = this.lastTopChangeVer; - - assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + - ", group=" + grp.cacheOrGroupName() + ']'; - - return topVer; - } - - /** {@inheritDoc} */ - @Override public GridDhtTopologyFuture topologyVersionFuture() { - assert topReadyFut != null; - - return topReadyFut; - } - - /** {@inheritDoc} */ - @Override public boolean stopping() { - return stopping; - } - - /** {@inheritDoc} */ - @Override public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, - GridDhtPartitionsExchangeFuture exchFut) - throws IgniteInterruptedCheckedException - { - boolean needRefresh; - - ctx.database().checkpointReadLock(); - - try { - U.writeLock(lock); - - try { - if (stopping) - return false; - - long updateSeq = this.updateSeq.incrementAndGet(); - - needRefresh = initPartitions(affVer, grp.affinity().readyAssignments(affVer), exchFut, updateSeq); - - consistencyCheck(); - } - finally { - lock.writeLock().unlock(); - } - } - finally { - ctx.database().checkpointReadUnlock(); - } - - return needRefresh; - } - - /** - * Creates and initializes partitions using given {@code affVer} and {@code affAssignment}. - * - * @param affVer Affinity version to use. - * @param affAssignment Affinity assignment to use. - * @param exchFut Exchange future. - * @param updateSeq Update sequence. - * @return {@code True} if partitions must be refreshed. - */ - private boolean initPartitions(AffinityTopologyVersion affVer, List<List<ClusterNode>> affAssignment, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { - boolean needRefresh = false; - - if (grp.affinityNode()) { - ClusterNode loc = ctx.localNode(); - - ClusterNode oldest = discoCache.oldestAliveServerNode(); - - GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); - - int num = grp.affinity().partitions(); - - if (grp.rebalanceEnabled()) { - boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); - - boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || exchFut.activateCluster(); - - if (first) { - assert exchId.isJoined() || added || exchFut.activateCluster(); - - for (int p = 0; p < num; p++) { - if (localNode(p, affAssignment)) { - // Partition is created first time, so it's safe to own it. - boolean shouldOwn = locParts.get(p) == null; - - GridDhtLocalPartition locPart = getOrCreatePartition(p); - - if (shouldOwn) { - locPart.own(); - - if (log.isDebugEnabled()) - log.debug("Owned partition for oldest node [grp=" + grp.cacheOrGroupName() + - ", part=" + locPart + ']'); - } - - needRefresh = true; - - updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer); - } - } - } - else - createPartitions(affVer, affAssignment, updateSeq); - } - else { - // If preloader is disabled, then we simply clear out - // the partitions this node is not responsible for. - for (int p = 0; p < num; p++) { - GridDhtLocalPartition locPart = localPartition0(p, affVer, false, true); - - boolean belongs = localNode(p, affAssignment); - - if (locPart != null) { - if (!belongs) { - GridDhtPartitionState state = locPart.state(); - - if (state.active()) { - locPart.rent(false); - - updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer); - - if (log.isDebugEnabled()) { - log.debug("Evicting partition with rebalancing disabled (it does not belong to " + - "affinity) [grp=" + grp.cacheOrGroupName() + ", part=" + locPart + ']'); - } - } - } - else - locPart.own(); - } - else if (belongs) { - locPart = getOrCreatePartition(p); - - locPart.own(); - - updateLocal(p, locPart.state(), updateSeq, affVer); - } - } - } - } - - updateRebalanceVersion(affVer, affAssignment); - - return needRefresh; - } - - /** - * Creates non-existing partitions belong to given affinity {@code aff}. - * - * @param affVer Affinity version. - * @param aff Affinity assignments. - * @param updateSeq Update sequence. - */ - private void createPartitions(AffinityTopologyVersion affVer, List<List<ClusterNode>> aff, long updateSeq) { - if (!grp.affinityNode()) - return; - - int num = grp.affinity().partitions(); - - for (int p = 0; p < num; p++) { - if (node2part != null && node2part.valid()) { - if (localNode(p, aff)) { - // This will make sure that all non-existing partitions - // will be created in MOVING state. - GridDhtLocalPartition locPart = getOrCreatePartition(p); - - updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer); - } - } - // If this node's map is empty, we pre-create local partitions, - // so local map will be sent correctly during exchange. - else if (localNode(p, aff)) - getOrCreatePartition(p); - } - } - - /** {@inheritDoc} */ - @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, - boolean affReady, - boolean updateMoving) - throws IgniteCheckedException { - ctx.database().checkpointReadLock(); - - try { - synchronized (ctx.exchange().interruptLock()) { - if (Thread.currentThread().isInterrupted()) - throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread()); - - U.writeLock(lock); - - try { - if (stopping) - return; - - assert lastTopChangeVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + lastTopChangeVer + - ", exchId=" + exchFut.exchangeId() + ']'; - - ExchangeDiscoveryEvents evts = exchFut.context().events(); - - if (affReady) { - assert grp.affinity().lastVersion().equals(evts.topologyVersion()) : "Invalid affinity version [" + - "grp=" + grp.cacheOrGroupName() + - ", affVer=" + grp.affinity().lastVersion() + - ", evtsVer=" + evts.topologyVersion() + ']'; - - lastTopChangeVer = readyTopVer = evts.topologyVersion(); - - discoCache = evts.discoveryCache(); - } - - if (log.isDebugEnabled()) { - log.debug("Partition map beforeExchange [grp=" + grp.cacheOrGroupName() + - ", exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); - } - - long updateSeq = this.updateSeq.incrementAndGet(); - - cntrMap.clear(); - - initializeFullMap(updateSeq); - - boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); - - if (evts.hasServerLeft()) { - List<DiscoveryEvent> evts0 = evts.events(); - - for (int i = 0; i < evts0.size(); i++) { - DiscoveryEvent evt = evts0.get(i); - - if (ExchangeDiscoveryEvents.serverLeftEvent(evt)) - removeNode(evt.eventNode().id()); - } - } - - if (grp.affinityNode()) { - if (grpStarted || - exchFut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || - exchFut.serverNodeDiscoveryEvent()) { - - AffinityTopologyVersion affVer; - List<List<ClusterNode>> affAssignment; - - if (affReady) { - affVer = evts.topologyVersion(); - - assert grp.affinity().lastVersion().equals(affVer) : - "Invalid affinity [topVer=" + grp.affinity().lastVersion() + - ", grp=" + grp.cacheOrGroupName() + - ", affVer=" + affVer + - ", fut=" + exchFut + ']'; - - affAssignment = grp.affinity().readyAssignments(affVer); - } - else { - assert !exchFut.context().mergeExchanges(); - - affVer = exchFut.initialVersion(); - affAssignment = grp.affinity().idealAssignment(); - } - - initPartitions(affVer, affAssignment, exchFut, updateSeq); - } - } - - consistencyCheck(); - - if (updateMoving) { - assert grp.affinity().lastVersion().equals(evts.topologyVersion()); - - createMovingPartitions(grp.affinity().readyAffinity(evts.topologyVersion())); - } - - if (log.isDebugEnabled()) { - log.debug("Partition map after beforeExchange [grp=" + grp.cacheOrGroupName() + ", " + - "exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); - } - - if (log.isTraceEnabled()) { - log.trace("Partition states after beforeExchange [grp=" + grp.cacheOrGroupName() - + ", exchId=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']'); - } - } - finally { - lock.writeLock().unlock(); - } - } - } - finally { - ctx.database().checkpointReadUnlock(); - } - } - - /** - * Initializes full map if current full map is empty or invalid in case of coordinator or cache groups start. - * - * @param updateSeq Update sequence to initialize full map. - */ - private void initializeFullMap(long updateSeq) { - if (!(topReadyFut instanceof GridDhtPartitionsExchangeFuture)) - return; - - GridDhtPartitionsExchangeFuture exchFut = (GridDhtPartitionsExchangeFuture) topReadyFut; - - boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); - - ClusterNode oldest = discoCache.oldestAliveServerNode(); - - // If this is the oldest node. - if (oldest != null && (ctx.localNode().equals(oldest) || grpStarted)) { - if (node2part == null) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); - - if (log.isDebugEnabled()) { - log.debug("Created brand new full topology map on oldest node [" + - "grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() + - ", fullMap=" + fullMapString() + ']'); - } - } - else if (!node2part.valid()) { - node2part = new GridDhtPartitionFullMap(oldest.id(), - oldest.order(), - updateSeq, - node2part, - false); - - if (log.isDebugEnabled()) { - log.debug("Created new full topology map on oldest node [" + - "grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() + - ", fullMap=" + node2part + ']'); - } - } - else if (!node2part.nodeId().equals(ctx.localNode().id())) { - node2part = new GridDhtPartitionFullMap(oldest.id(), - oldest.order(), - updateSeq, - node2part, - false); - - if (log.isDebugEnabled()) { - log.debug("Copied old map into new map on oldest node (previous oldest node left) [" + - "grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() + - ", fullMap=" + fullMapString() + ']'); - } - } - } - } - - /** - * @param p Partition number. - * @param topVer Topology version. - * @return {@code True} if given partition belongs to local node. - */ - private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) { - return grp.affinity().nodes(p, topVer).contains(ctx.localNode()); - } - - /** {@inheritDoc} */ - @Override public void afterStateRestored(AffinityTopologyVersion topVer) { - lock.writeLock().lock(); - - try { - long updateSeq = this.updateSeq.incrementAndGet(); - - initializeFullMap(updateSeq); - - for (int p = 0; p < grp.affinity().partitions(); p++) { - GridDhtLocalPartition locPart = locParts.get(p); - - if (locPart == null) - updateLocal(p, EVICTED, updateSeq, topVer); - else { - GridDhtPartitionState state = locPart.state(); - - updateLocal(p, state, updateSeq, topVer); - - // Restart cleaning. - if (state == RENTING) - locPart.clearAsync(); - } - } - } - finally { - lock.writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) { - boolean changed = false; - - int num = grp.affinity().partitions(); - - AffinityTopologyVersion topVer = exchFut.context().events().topologyVersion(); - - assert grp.affinity().lastVersion().equals(topVer) : "Affinity is not initialized " + - "[grp=" + grp.cacheOrGroupName() + - ", topVer=" + topVer + - ", affVer=" + grp.affinity().lastVersion() + - ", fut=" + exchFut + ']'; - - ctx.database().checkpointReadLock(); - - try { - - lock.writeLock().lock(); - - try { - if (stopping) - return false; - - assert readyTopVer.initialized() : readyTopVer; - assert lastTopChangeVer.equals(readyTopVer); - - if (log.isDebugEnabled()) { - log.debug("Partition map before afterExchange [grp=" + grp.cacheOrGroupName() + - ", exchId=" + exchFut.exchangeId() + - ", fullMap=" + fullMapString() + ']'); - } - - if (log.isTraceEnabled()) { - log.trace("Partition states before afterExchange [grp=" + grp.cacheOrGroupName() - + ", exchVer=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']'); - } - - long updateSeq = this.updateSeq.incrementAndGet(); - - for (int p = 0; p < num; p++) { - GridDhtLocalPartition locPart = localPartition0(p, topVer, false, true); - - if (partitionLocalNode(p, topVer)) { - // Prepare partition to rebalance if it's not happened on full map update phase. - if (locPart == null || locPart.state() == RENTING || locPart.state() == EVICTED) - locPart = rebalancePartition(p, false); - - GridDhtPartitionState state = locPart.state(); - - if (state == MOVING) { - if (grp.rebalanceEnabled()) { - Collection<ClusterNode> owners = owners(p); - - // If an owner node left during exchange, then new exchange should be started with detecting lost partitions. - - if (!F.isEmpty(owners)) { - if (log.isDebugEnabled()) - log.debug("Will not own partition (there are owners to rebalance from) [grp=" + grp.cacheOrGroupName() + - ", locPart=" + locPart + ", owners = " + owners + ']'); - } - } - else - updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer); - } - } - else { - if (locPart != null) { - GridDhtPartitionState state = locPart.state(); - - if (state == MOVING) { - locPart.rent(false); - - updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer); - - changed = true; - - if (log.isDebugEnabled()) { - log.debug("Evicting " + state + " partition (it does not belong to affinity) [" + - "grp=" + grp.cacheOrGroupName() + ", part=" + locPart + ']'); - } - } - } - } - } - - AffinityAssignment aff = grp.affinity().readyAffinity(topVer); - - if (node2part != null && node2part.valid()) - changed |= checkEvictions(updateSeq, aff); - - updateRebalanceVersion(aff.topologyVersion(), aff.assignment()); - - consistencyCheck(); - - if (log.isTraceEnabled()) { - log.trace("Partition states after afterExchange [grp=" + grp.cacheOrGroupName() - + ", exchVer=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']'); - } - } - finally { - lock.writeLock().unlock(); - } - } - finally { - ctx.database().checkpointReadUnlock(); - } - - return changed; - } - - /** {@inheritDoc} */ - @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, - boolean create) - throws GridDhtInvalidPartitionException { - return localPartition0(p, topVer, create, false); - } - - /** {@inheritDoc} */ - @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, - boolean create, boolean showRenting) throws GridDhtInvalidPartitionException { - return localPartition0(p, topVer, create, showRenting); - } - - /** - * Creates partition with id {@code p} if it doesn't exist or evicted. - * In other case returns existing partition. - * - * @param p Partition number. - * @return Partition. - */ - private GridDhtLocalPartition getOrCreatePartition(int p) { - assert lock.isWriteLockedByCurrentThread(); - - assert ctx.database().checkpointLockIsHeldByThread(); - - GridDhtLocalPartition loc = locParts.get(p); - - if (loc == null || loc.state() == EVICTED) { - // Make sure that after eviction partition is destroyed. - if (loc != null) - loc.awaitDestroy(); - - locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); - - long updCntr = cntrMap.updateCounter(p); - - if (updCntr != 0) - loc.updateCounter(updCntr); - - if (ctx.pageStore() != null) { - try { - ctx.pageStore().onPartitionCreated(grp.groupId(), p); - } - catch (IgniteCheckedException e) { - // TODO ignite-db - throw new IgniteException(e); - } - } - } - - return loc; - } - - /** {@inheritDoc} */ - @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException { - lock.writeLock().lock(); - - try { - GridDhtLocalPartition part = locParts.get(p); - - if (part != null && part.state() != EVICTED) - return part; - - part = new GridDhtLocalPartition(ctx, grp, p); - - locParts.set(p, part); - - ctx.pageStore().onPartitionCreated(grp.groupId(), p); - - return part; - } - finally { - lock.writeLock().unlock(); - } - } - - /** - * @param p Partition number. - * @param topVer Topology version. - * @param create If {@code true} create partition if it doesn't exists or evicted. - * @param showRenting If {@code true} return partition in RENTING state if exists. - * @return Local partition. - */ - @SuppressWarnings("TooBroadScope") - private GridDhtLocalPartition localPartition0(int p, - AffinityTopologyVersion topVer, - boolean create, - boolean showRenting) { - GridDhtLocalPartition loc; - - loc = locParts.get(p); - - GridDhtPartitionState state = loc != null ? loc.state() : null; - - if (loc != null && state != EVICTED && (state != RENTING || showRenting)) - return loc; - - if (!create) - return null; - - boolean created = false; - - ctx.database().checkpointReadLock(); - - try { - lock.writeLock().lock(); - - try { - loc = locParts.get(p); - - state = loc != null ? loc.state() : null; - - boolean belongs = partitionLocalNode(p, topVer); - - if (loc != null && state == EVICTED) { - // Make sure that after eviction partition is destroyed. - loc.awaitDestroy(); - - locParts.set(p, loc = null); - - if (!belongs) { - throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " + - "(often may be caused by inconsistent 'key.hashCode()' implementation) " + - "[grp=" + grp.cacheOrGroupName() + ", part=" + p + ", topVer=" + topVer + - ", this.topVer=" + this.readyTopVer + ']'); - } - } - else if (loc != null && state == RENTING && !showRenting) { - throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently " + - "evicted [grp=" + grp.cacheOrGroupName() + ", part=" + p + ", shouldBeMoving=" - + ", belongs=" + belongs + ", topVer=" + topVer + ", curTopVer=" + this.readyTopVer + "]"); - } - - if (loc == null) { - if (!belongs) - throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " + - "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " + - "[grp=" + grp.cacheOrGroupName() + ", part=" + p + ", topVer=" + topVer + - ", this.topVer=" + this.readyTopVer + ']'); - - locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); - - this.updateSeq.incrementAndGet(); - - created = true; - - if (log.isDebugEnabled()) - log.debug("Created local partition [grp=" + grp.cacheOrGroupName() + ", part=" + loc + ']'); - } - } - finally { - lock.writeLock().unlock(); - } - } - finally { - ctx.database().checkpointReadUnlock(); - } - - if (created && ctx.pageStore() != null) { - try { - ctx.pageStore().onPartitionCreated(grp.groupId(), p); - } - catch (IgniteCheckedException e) { - // TODO ignite-db - throw new IgniteException(e); - } - } - - return loc; - } - - /** {@inheritDoc} */ - @Override public void releasePartitions(int... parts) { - assert parts != null; - assert parts.length > 0; - - for (int i = 0; i < parts.length; i++) { - GridDhtLocalPartition part = locParts.get(parts[i]); - - if (part != null) - part.release(); - } - } - - /** {@inheritDoc} */ - @Override public GridDhtLocalPartition localPartition(int part) { - return locParts.get(part); - } - - /** {@inheritDoc} */ - @Override public List<GridDhtLocalPartition> localPartitions() { - List<GridDhtLocalPartition> list = new ArrayList<>(locParts.length()); - - for (int i = 0; i < locParts.length(); i++) { - GridDhtLocalPartition part = locParts.get(i); - - if (part != null && part.state().active()) - list.add(part); - } - - return list; - } - - /** {@inheritDoc} */ - @Override public Iterable<GridDhtLocalPartition> currentLocalPartitions() { - return new Iterable<GridDhtLocalPartition>() { - @Override public Iterator<GridDhtLocalPartition> iterator() { - return new CurrentPartitionsIterator(); - } - }; - } - - /** {@inheritDoc} */ - @Override public void onRemoved(GridDhtCacheEntry e) { - /* - * Make sure not to acquire any locks here as this method - * may be called from sensitive synchronization blocks. - * =================================================== - */ - - GridDhtLocalPartition loc = localPartition(e.partition(), readyTopVer, false); - - if (loc != null) - loc.onRemoved(e); - } - - /** {@inheritDoc} */ - @Override public GridDhtPartitionMap localPartitionMap() { - GridPartitionStateMap map = new GridPartitionStateMap(locParts.length()); - - lock.readLock().lock(); - - try { - for (int i = 0; i < locParts.length(); i++) { - GridDhtLocalPartition part = locParts.get(i); - - if (part == null) - continue; - - map.put(i, part.state()); - } - - GridDhtPartitionMap locPartMap = node2part != null ? node2part.get(ctx.localNodeId()) : null; - - return new GridDhtPartitionMap(ctx.localNodeId(), - updateSeq.get(), - locPartMap != null ? locPartMap.topologyVersion() : readyTopVer, - map, - true); - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) { - lock.readLock().lock(); - - try { - GridDhtPartitionMap partMap = node2part.get(nodeId); - - if (partMap != null) { - GridDhtPartitionState state = partMap.get(part); - - return state == null ? EVICTED : state; - } - - return EVICTED; - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public List<ClusterNode> nodes(int p, - AffinityAssignment affAssignment, - List<ClusterNode> affNodes) { - return nodes0(p, affAssignment, affNodes); - } - - /** {@inheritDoc} */ - @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { - AffinityAssignment affAssignment = grp.affinity().cachedAffinity(topVer); - - List<ClusterNode> affNodes = affAssignment.get(p); - - List<ClusterNode> nodes = nodes0(p, affAssignment, affNodes); - - return nodes != null ? nodes : affNodes; - } - - /** - * @param p Partition. - * @param affAssignment Assignments. - * @param affNodes Node assigned for given partition by affinity. - * @return Nodes responsible for given partition (primary is first). - */ - @Nullable private List<ClusterNode> nodes0(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes) { - if (grp.isReplicated()) - return affNodes; - - AffinityTopologyVersion topVer = affAssignment.topologyVersion(); - - lock.readLock().lock(); - - try { - assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer + - ", topVer2=" + this.readyTopVer + - ", node=" + ctx.igniteInstanceName() + - ", grp=" + grp.cacheOrGroupName() + - ", node2part=" + node2part + ']'; - - List<ClusterNode> nodes = null; - - if (!topVer.equals(diffFromAffinityVer)) { - LT.warn(log, "Requested topology version does not match calculated diff, will require full iteration to" + - "calculate mapping [grp=" + grp.cacheOrGroupName() + ", topVer=" + topVer + - ", diffVer=" + diffFromAffinityVer + "]"); - - nodes = new ArrayList<>(); - - nodes.addAll(affNodes); - - for (Map.Entry<UUID, GridDhtPartitionMap> entry : node2part.entrySet()) { - GridDhtPartitionState state = entry.getValue().get(p); - - ClusterNode n = ctx.discovery().node(entry.getKey()); - - if (n != null && state != null && (state == MOVING || state == OWNING || state == RENTING) - && !nodes.contains(n) && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { - nodes.add(n); - } - - } - - return nodes; - } - - Collection<UUID> diffIds = diffFromAffinity.get(p); - - if (!F.isEmpty(diffIds)) { - HashSet<UUID> affIds = affAssignment.getIds(p); - - for (UUID nodeId : diffIds) { - if (affIds.contains(nodeId)) { - U.warn(log, "Node from diff is affinity node, skipping it [grp=" + grp.cacheOrGroupName() + - ", node=" + nodeId + ']'); - - continue; - } - - if (hasState(p, nodeId, OWNING, MOVING, RENTING)) { - ClusterNode n = ctx.discovery().node(nodeId); - - if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { - if (nodes == null) { - nodes = new ArrayList<>(affNodes.size() + diffIds.size()); - - nodes.addAll(affNodes); - } - - nodes.add(n); - } - } - } - } - - return nodes; - } - finally { - lock.readLock().unlock(); - } - } - - /** - * @param p Partition. - * @param topVer Topology version ({@code -1} for all nodes). - * @param state Partition state. - * @param states Additional partition states. - * @return List of nodes for the partition. - */ - private List<ClusterNode> nodes( - int p, - AffinityTopologyVersion topVer, - GridDhtPartitionState state, - GridDhtPartitionState... states - ) { - Collection<UUID> allIds = F.nodeIds(discoCache.cacheGroupAffinityNodes(grp.groupId())); - - lock.readLock().lock(); - - try { - assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + - ", grp=" + grp.cacheOrGroupName() + - ", allIds=" + allIds + - ", node2part=" + node2part + ']'; - - // Node IDs can be null if both, primary and backup, nodes disappear. - List<ClusterNode> nodes = new ArrayList<>(); - - for (UUID id : allIds) { - if (hasState(p, id, state, states)) { - ClusterNode n = ctx.discovery().node(id); - - if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) - nodes.add(n); - } - } - - return nodes; - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) { - if (!grp.rebalanceEnabled()) - return ownersAndMoving(p, topVer); - - return nodes(p, topVer, OWNING, null); - } - - /** {@inheritDoc} */ - @Override public List<ClusterNode> owners(int p) { - return owners(p, AffinityTopologyVersion.NONE); - } - - /** {@inheritDoc} */ - @Override public List<List<ClusterNode>> allOwners() { - lock.readLock().lock(); - - try { - int parts = partitions(); - - List<List<ClusterNode>> res = new ArrayList<>(parts); - - for (int i = 0; i < parts; i++) - res.add(new ArrayList<>()); - - List<ClusterNode> allNodes = discoCache.cacheGroupAffinityNodes(grp.groupId()); - - for (int i = 0; i < allNodes.size(); i++) { - ClusterNode node = allNodes.get(i); - - GridDhtPartitionMap nodeParts = node2part.get(node.id()); - - if (nodeParts != null) { - for (Map.Entry<Integer, GridDhtPartitionState> e : nodeParts.map().entrySet()) { - if (e.getValue() == OWNING) { - int part = e.getKey(); - - List<ClusterNode> owners = res.get(part); - - owners.add(node); - } - } - } - } - - return res; - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public List<ClusterNode> moving(int p) { - if (!grp.rebalanceEnabled()) - return ownersAndMoving(p, AffinityTopologyVersion.NONE); - - return nodes(p, AffinityTopologyVersion.NONE, MOVING, null); - } - - /** - * @param p Partition. - * @param topVer Topology version. - * @return List of nodes in state OWNING or MOVING. - */ - private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer) { - return nodes(p, topVer, OWNING, MOVING_STATES); - } - - /** {@inheritDoc} */ - @Override public long updateSequence() { - return updateSeq.get(); - } - - /** {@inheritDoc} */ - @Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) { - lock.readLock().lock(); - - try { - if (node2part == null || stopping) - return null; - - assert node2part.valid() : "Invalid node2part [node2part=" + node2part + - ", grp=" + grp.cacheOrGroupName() + - ", stopping=" + stopping + - ", locNodeId=" + ctx.localNode().id() + - ", locName=" + ctx.igniteInstanceName() + ']'; - - GridDhtPartitionFullMap m = node2part; - - return new GridDhtPartitionFullMap(m.nodeId(), m.nodeOrder(), m.updateSequence(), m, onlyActive); - } - finally { - lock.readLock().unlock(); - } - } - - /** - * Checks should current partition map overwritten by new partition map - * Method returns true if topology version or update sequence of new map are greater than of current map - * - * @param currentMap Current partition map - * @param newMap New partition map - * @return True if current partition map should be overwritten by new partition map, false in other case - */ - private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) { - return newMap != null && - (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 || - newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && newMap.updateSequence() > currentMap.updateSequence()); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Override public boolean update( - @Nullable AffinityTopologyVersion exchangeVer, - GridDhtPartitionFullMap partMap, - @Nullable CachePartitionFullCountersMap incomeCntrMap, - Set<Integer> partsToReload, - @Nullable Map<Integer, Long> partSizes, - @Nullable AffinityTopologyVersion msgTopVer) { - if (log.isDebugEnabled()) { - log.debug("Updating full partition map [grp=" + grp.cacheOrGroupName() + ", exchVer=" + exchangeVer + - ", fullMap=" + fullMapString() + ']'); - } - - assert partMap != null; - - ctx.database().checkpointReadLock(); - - try { - lock.writeLock().lock(); - - try { - if (log.isTraceEnabled() && exchangeVer != null) { - log.trace("Partition states before full update [grp=" + grp.cacheOrGroupName() - + ", exchVer=" + exchangeVer + ", states=" + dumpPartitionStates() + ']'); - } - - if (stopping || !lastTopChangeVer.initialized() || - // Ignore message not-related to exchange if exchange is in progress. - (exchangeVer == null && !lastTopChangeVer.equals(readyTopVer))) - return false; - - if (incomeCntrMap != null) { - // update local counters in partitions - for (int i = 0; i < locParts.length(); i++) { - cntrMap.updateCounter(i, incomeCntrMap.updateCounter(i)); - - GridDhtLocalPartition part = locParts.get(i); - - if (part == null) - continue; - - if (part.state() == OWNING || part.state() == MOVING) { - long updCntr = incomeCntrMap.updateCounter(part.id()); - - if (updCntr != 0 && updCntr > part.updateCounter()) - part.updateCounter(updCntr); - } - } - } - - if (exchangeVer != null) { - // Ignore if exchange already finished or new exchange started. - if (readyTopVer.compareTo(exchangeVer) > 0 || lastTopChangeVer.compareTo(exchangeVer) > 0) { - U.warn(log, "Stale exchange id for full partition map update (will ignore) [" + - "grp=" + grp.cacheOrGroupName() + - ", lastTopChange=" + lastTopChangeVer + - ", readTopVer=" + readyTopVer + - ", exchVer=" + exchangeVer + ']'); - - return false; - } - } - - if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) > 0) { - U.warn(log, "Stale version for full partition map update message (will ignore) [" + - "grp=" + grp.cacheOrGroupName() + - ", lastTopChange=" + lastTopChangeVer + - ", readTopVer=" + readyTopVer + - ", msgVer=" + msgTopVer + ']'); - - return false; - } - - boolean fullMapUpdated = (node2part == null); - - if (node2part != null) { - for (GridDhtPartitionMap part : node2part.values()) { - GridDhtPartitionMap newPart = partMap.get(part.nodeId()); - - if (shouldOverridePartitionMap(part, newPart)) { - fullMapUpdated = true; - - if (log.isDebugEnabled()) { - log.debug("Overriding partition map in full update map [" + - "grp=" + grp.cacheOrGroupName() + - ", exchVer=" + exchangeVer + - ", curPart=" + mapString(part) + - ", newPart=" + mapString(newPart) + ']'); - } - - if (newPart.nodeId().equals(ctx.localNodeId())) - updateSeq.setIfGreater(newPart.updateSequence()); - } - else { - // If for some nodes current partition has a newer map, - // then we keep the newer value. - partMap.put(part.nodeId(), part); - } - } - - // Check that we have new nodes. - for (GridDhtPartitionMap part : partMap.values()) { - if (fullMapUpdated) - break; - - fullMapUpdated = !node2part.containsKey(part.nodeId()); - } - - // Remove entry if node left. - for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) { - UUID nodeId = it.next(); - - if (!ctx.discovery().alive(nodeId)) { - if (log.isDebugEnabled()) - log.debug("Removing left node from full map update [grp=" + grp.cacheOrGroupName() + - ", nodeId=" + nodeId + ", partMap=" + partMap + ']'); - - if (node2part.containsKey(nodeId)) { - GridDhtPartitionMap map = partMap.get(nodeId); - - if (map != null) - leftNode2Part.put(nodeId, map); - } - - it.remove(); - } - } - } - else { - GridDhtPartitionMap locNodeMap = partMap.get(ctx.localNodeId()); - - if (locNodeMap != null) - updateSeq.setIfGreater(locNodeMap.updateSequence()); - } - - if (!fullMapUpdated) { - if (log.isDebugEnabled()) { - log.debug("No updates for full partition map (will ignore) [" + - "grp=" + grp.cacheOrGroupName() + - ", lastExch=" + lastTopChangeVer + - ", exchVer=" + exchangeVer + - ", curMap=" + node2part + - ", newMap=" + partMap + ']'); - } - - return false; - } - - if (exchangeVer != null) { - assert exchangeVer.compareTo(readyTopVer) >= 0 && exchangeVer.compareTo(lastTopChangeVer) >= 0; - - lastTopChangeVer = readyTopVer = exchangeVer; - } - - node2part = partMap; - - if (exchangeVer == null && !grp.isReplicated() && - (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) { - AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer); - - for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { - for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { - int p = e0.getKey(); - - Set<UUID> diffIds = diffFromAffinity.get(p); - - if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue() == RENTING) && - !affAssignment.getIds(p).contains(e.getKey())) { - - if (diffIds == null) - diffFromAffinity.put(p, diffIds = U.newHashSet(3)); - - diffIds.add(e.getKey()); - } - else { - if (diffIds != null && diffIds.remove(e.getKey())) { - if (diffIds.isEmpty()) - diffFromAffinity.remove(p); - } - } - } - } - - diffFromAffinityVer = readyTopVer; - } - - boolean changed = false; - - GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId()); - - // Only in real exchange occurred. - if (exchangeVer != null && - nodeMap != null && - grp.persistenceEnabled() && - readyTopVer.initialized()) { - for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) { - int p = e.getKey(); - GridDhtPartitionState state = e.getValue(); - - if (state == OWNING) { - GridDhtLocalPartition locPart = locParts.get(p); - - assert locPart != null : grp.cacheOrGroupName(); - - if (locPart.state() == MOVING) { - boolean success = locPart.own(); - - assert success : locPart; - - changed |= success; - } - } - else if (state == MOVING) { - boolean haveHistory = !partsToReload.contains(p); - - rebalancePartition(p, haveHistory); - - changed = true; - } - } - } - - long updateSeq = this.updateSeq.incrementAndGet(); - - if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { - AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); - - if (exchangeVer == null) - changed |= checkEvictions(updateSeq, aff); - - updateRebalanceVersion(aff.topologyVersion(), aff.assignment()); - } - - if (partSizes != null) - this.globalPartSizes = partSizes; - - consistencyCheck(); - - if (log.isDebugEnabled()) { - log.debug("Partition map after full update [grp=" + grp.cacheOrGroupName() + - ", map=" + fullMapString() + ']'); - } - - if (log.isTraceEnabled() && exchangeVer != null) { - log.trace("Partition states after full update [grp=" + grp.cacheOrGroupName() - + ", exchVer=" + exchangeVer + ", states=" + dumpPartitionStates() + ']'); - } - - if (changed) - ctx.exchange().scheduleResendPartitions(); - - return changed; - } finally { - lock.writeLock().unlock(); - } - } - finally { - ctx.database().checkpointReadUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) { - assert cntrMap != null; - - long now = U.currentTimeMillis(); - - lock.writeLock().lock(); - - try { - long acquired = U.currentTimeMillis(); - - if (acquired - now >= 100) { - if (timeLog.isInfoEnabled()) - timeLog.info("Waited too long to acquire topology write lock " + - "[grp=" + grp.cacheOrGroupName() + ", waitTime=" + (acquired - now) + ']'); - } - - if (stopping) - return; - - for (int i = 0; i < cntrMap.size(); i++) { - int pId = cntrMap.partitionAt(i); - - long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i); - long updateCntr = cntrMap.updateCounterAt(i); - - if (this.cntrMap.updateCounter(pId) < updateCntr) { - this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr); - this.cntrMap.updateCounter(pId, updateCntr); - } - } - } - finally { - lock.writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public void applyUpdateCounters() { - long now = U.currentTimeMillis(); - - lock.writeLock().lock(); - - try { - long acquired = U.currentTimeMillis(); - - if (acquired - now >= 100) { - if (timeLog.isInfoEnabled()) - timeLog.info("Waited too long to acquire topology write lock " + - "[grp=" + grp.cacheOrGroupName() + ", waitTime=" + (acquired - now) + ']'); - } - - if (stopping) - return; - - for (int i = 0; i < locParts.length(); i++) { - GridDhtLocalPartition part = locParts.get(i); - - if (part == null) - continue; - - boolean reserve = part.reserve(); - - try { - GridDhtPartitionState state = part.state(); - - if (!reserve || state == EVICTED || state == RENTING) - continue; - - long updCntr = cntrMap.updateCounter(part.id()); - - if (updCntr > part.updateCounter()) - part.updateCounter(updCntr); - else if (part.updateCounter() > 0) { - cntrMap.initialUpdateCounter(part.id(), part.initialUpdateCounter()); - cntrMap.updateCounter(part.id(), part.updateCounter()); - } - } - finally { - if (reserve) - part.release(); - } - } - } - finally { - lock.writeLock().unlock(); - } - } - - /** - * Method checks is new partition map more stale than current partition map - * New partition map is stale if topology version or update sequence are less or equal than of current map - * - * @param currentMap Current partition map - * @param newMap New partition map - * @return True if new partition map is more stale than current partition map, false in other case - */ - private boolean isStaleUpdate(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) { - return currentMap != null && newMap.compareTo(currentMap) <= 0; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Override public boolean update( - @Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts, - boolean force - ) { - if (log.isDebugEnabled()) { - log.debug("Updating single partition map [grp=" + grp.cacheOrGroupName() + ", exchId=" + exchId + - ", parts=" + mapString(parts) + ']'); - } - - if (!ctx.discovery().alive(parts.nodeId())) { - if (log.isDebugEnabled()) { - log.debug("Received partition update for non-existing node (will ignore) [grp=" + grp.cacheOrGroupName() + - ", exchId=" + exchId + ", parts=" + parts + ']'); - } - - return false; - } - - ctx.database().checkpointReadLock(); - - try { - lock.writeLock().lock(); - - try { - if (stopping) - return false; - - if (!force) { - if (lastTopChangeVer.initialized() && exchId != null && lastTopChangeVer.compareTo(exchId.topologyVersion()) > 0) { - U.warn(log, "Stale exchange id for single partition map update (will ignore) [" + - "grp=" + grp.cacheOrGroupName() + - ", lastTopChange=" + lastTopChangeVer + - ", readTopVer=" + readyTopVer + - ", exch=" + exchId.topologyVersion() + ']'); - - return false; - } - } - - if (node2part == null) - // Create invalid partition map. - node2part = new GridDhtPartitionFullMap(); - - GridDhtPartitionMap cur = node2part.get(parts.nodeId()); - - if (force) { - if (cur != null && cur.topologyVersion().initialized()) - parts.updateSequence(cur.updateSequence(), cur.topologyVersion()); - } - else if (isStaleUpdate(cur, parts)) { - assert cur != null; - - String msg = "Stale update for single partition map update (will ignore) [" + - "nodeId=" + parts.nodeId() + - ", grp=" + grp.cacheOrGroupName() + - ", exchId=" + exchId + - ", curMap=" + cur + - ", newMap=" + parts + ']'; - - // This is usual situation when partition maps are equal, just print debug message. - if (cur.compareTo(parts) == 0) { - if (log.isDebugEnabled()) - log.debug(msg); - } - else - U.warn(log, msg); - - return false; - } - - long updateSeq = this.updateSeq.incrementAndGet(); - - node2part.newUpdateSequence(updateSeq); - - boolean changed = false; - - if (cur == null || !cur.equals(parts)) - changed = true; - - node2part.put(parts.nodeId(), parts); - - // During exchange diff is calculated after all messages are received and affinity initialized. - if (exchId == null && !grp.isReplicated()) { - if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) { - AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer); - - // Add new mappings. - for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) { - int p = e.getKey(); - - Set<UUID> diffIds = diffFromAffinity.get(p); - - if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue() == RENTING) - && !affAssignment.getIds(p).contains(parts.nodeId())) { - if (diffIds == null) - diffFromAffinity.put(p, diffIds = U.newHashSet(3)); - - if (diffIds.add(parts.nodeId())) - changed = true; - } - else { - if (diffIds != null && diffIds.remove(parts.nodeId())) { - changed = true; - - if (diffIds.isEmpty()) - diffFromAffinity.remove(p); - } - } - } - - // Remove obsolete mappings. - if (cur != null) { - for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { - Set<UUID> ids = diffFromAffinity.get(p); - - if (ids != null && ids.remove(parts.nodeId())) { - changed = true; - - if (ids.isEmpty()) - diffFromAffinity.remove(p); - } - } - } - - diffFromAffinityVer = readyTopVer; - } - } - - if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { - AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); - - if (exchId == null) - changed |= checkEvictions(updateSeq, aff); - - updateRebalanceVersion(aff.topologyVersion(), aff.assignment()); - } - - consistencyCheck(); - - if (log.isDebugEnabled()) - log.debug("Partition map after single update [grp=" + grp.cacheOrGroupName() + ", map=" + fullMapString() + ']'); - - if (changed && exchId == null) - ctx.exchange().scheduleResendPartitions(); - - return changed; - } - finally { - lock.writeLock().unlock(); - } - } - finally { - ctx.database().checkpointReadUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public void onExchangeDone(@Nullable GridDhtPartitionsExchangeFuture fut, - AffinityAssignment assignment, - boolean updateRebalanceVer) { - lock.writeLock().lock(); - - try { - assert !(topReadyFut instanceof GridDhtPartitionsExchangeFuture) || - assignment.topologyVersion().equals(((GridDhtPartitionsExchangeFuture)topReadyFut).context().events().topologyVersion()); - - readyTopVer = lastTopChangeVer = assignment.topologyVersion(); - - if (fut != null) - discoCache = fut.events().discoveryCache(); - - if (!grp.isReplicated()) { - boolean rebuildDiff = fut == null || fut.localJoinExchange() || fut.serverNodeDiscoveryEvent() || - fut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || !diffFromAffinityVer.initialized(); - - if (rebuildDiff) { - if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0) - rebuildDiff(assignment); - } - else - diffFromAffinityVer = readyTopVer; - - if (!updateRebalanceVer) - updateRebalanceVersion(assignment.topologyVersion(), assignment.assignment()); - } - - if (updateRebalanceVer) - updateRebalanceVersion(assignment.topologyVersion(), assignment.assignment()); - } - finally { - lock.writeLock().unlock(); - } - } - - /** - * @param aff Affinity. - */ - private void createMovingPartitions(AffinityAssignment aff) { - for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { - GridDhtPartitionMap map = e.getValue(); - - addMoving(map, aff.backupPartitions(e.getKey())); - addMoving(map, aff.primaryPartitions(e.getKey())); - } - } - - /** - * @param map Node partition state map. - * @param parts Partitions assigned to node. - */ - private void addMoving(GridDhtPartitionMap map, Set<Integer> parts) { - if (F.isEmpty(parts)) - return; - - for (Integer p : parts) { - GridDhtPartitionState state = map.get(p); - - if (state == null || state == EVICTED) - map.put(p, MOVING); - } - } - - /** - * Rebuilds {@link #diffFromAffinity} from given assignment. - * - * @param affAssignment New affinity assignment. - */ - private void rebuildDiff(AffinityAssignment affAssignment) { - assert lock.isWriteLockedByCurrentThread(); - - if (node2part == null) - return; - - if (FAST_DIFF_REBUILD) { - Collection<UUID> affNodes = F.nodeIds(ctx.discovery().cacheGroupAffinityNodes(grp.groupId(), - affAssignment.topologyVersion())); - - for (Map.Entry<Integer, Set<UUID>> e : diffFromAffinity.entrySet()) { - int p = e.getKey(); - - Iterator<UUID> iter = e.getValue().iterator(); - - while (iter.hasNext()) { - UUID nodeId = iter.next(); - - if (!affNodes.contains(nodeId) || affAssignment.getIds(p).contains(nodeId)) - iter.remove(); - } - } - } - else { - for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { - UUID nodeId = e.getKey(); - - for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { - Integer p0 = e0.getKey(); - - GridDhtPartitionState state = e0.getValue(); - - Set<UUID> ids = diffFromAffinity.get(p0); - - if ((state == MOVING || state == OWNING || state == RENTING) && !affAssignment.getIds(p0).contains(nodeId)) { - if (ids == null) - diffFromAffinity.put(p0, ids = U.newHashSet(3)); - - ids.add(nodeId); - } - else { - if (ids != null) - ids.remove(nodeId); - } - } - } - } - - diffFromAffinityVer = affAssignment.topologyVersion(); - } - - /** {@inheritDoc} */ - @Override public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, DiscoveryEvent discoEvt) { - ctx.database().checkpointReadLock(); - - try { - lock.writeLock().lock(); - - try { - if (node2part == null) - return false; - - int parts = grp.affinity().partitions(); - - Set<Integer> lost = new HashSet<>(parts); - - for (int p = 0; p < parts; p++) - lost.add(p); - - for (GridDhtPartitionMap partMap : node2part.values()) { - for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) { - if (e.getValue() == OWNING) { - lost.remove(e.getKey()); - - if (lost.isEmpty()) - break; - } - } - } - - boolean changed = false; - - if (!F.isEmpty(lost)) { - PartitionLossPolicy plc = grp.config().getPartitionLossPolicy(); - - assert plc != null; - - Set<Integer> recentlyLost = new HashSet<>(); - - for (Map.Entry<UUID, GridDhtPartitionMap> leftEntry : leftNode2Part.entrySet()) { - for (Map.Entry<Integer, GridDhtPartitionState> entry : leftEntry.getValue().entrySet()) { - if (entry.getValue() == OWNING) - recentlyLost.add(entry.getKey()); - } - } - - // Update partition state on all nodes. - for (Integer part : lost) { - long updSeq = updateSeq.incrementAndGet(); - - GridDhtLocalPartition locPart = localPartition(part, resTopVer, false, true); - - if (locPart != null) { - if (locPart.state() == LOST) - continue; - - boolean marked = plc == PartitionLossPolicy.IGNORE ? locPart.own() : locPart.markLost(); - - if (marked) - updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer); - - changed |= marked; - } - // Update map for remote node. - else if (plc != PartitionLossPolicy.IGNORE) { - for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { - if (e.getKey().equals(ctx.localNodeId())) - continue; - - if (e.getValue().get(part) != EVICTED) - e.getValue().put(part, LOST); - } - } - - if (recentlyLost.contains(part) && grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) { - grp.addRebalanceEvent(part, - EVT_CACHE_REBALANCE_PART_DATA_LOST, - discoEvt.eventNode(), - discoEvt.type(), - discoEvt.timestamp()); - } - } - - if (plc != PartitionLossPolicy.IGNORE) - grp.needsRecovery(true); - } - - leftNode2Part.clear(); - - return changed; - } - finally { - lock.writeLock().unlock(); - } - } - finally { - ctx.database().checkpointReadUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public void resetLostPartitions(AffinityTopologyVersion resTopVer) { - ctx.database().checkpointReadLock(); - - try { - lock.writeLock().lock(); - - try { - long updSeq = updateSeq.incrementAndGet(); - - for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { - for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { - if (e0.getValue() != LOST) - continue; - - e0.setValue(OWNING); - - GridDhtLocalPartition locPart = localPartition(e0.getKey(), resTopVer, false); - - if (locPart != null && locPart.state() == LOST) { - boolean marked = locPart.own(); - - if (marked) - updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer); - } - } - } - - checkEvictions(updSeq, grp.affinity().readyAffinity(resTopVer)); - - grp.needsRecovery(false); - } - finally { - lock.writeLock().unlock(); - } - } - finally { - ctx.database().checkpointReadUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public Collection<Integer> lostPartitions() { - if (grp.config().getPartitionLossPolicy() == PartitionLossPolicy.IGNORE) - return Collections.emptySet(); - - lock.readLock().lock(); - - try { - Set<Integer> res = null; - - int parts = grp.affinity().partitions(); - - for (GridDhtPartitionMap partMap : node2part.values()) { - for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) { - if (e.getValue() == LOST) { - if (res == null) - res = new HashSet<>(parts); - - res.add(e.getKey()); - } - } - } - - return res == null ? Collections.<Integer>emptySet() : res; - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory) { - Map<UUID, Set<Integer>> result = new HashMap<>(); - - ctx.database().checkpointReadLock(); - - try { - lock.writeLock().lock(); - - try { - // First process local partitions. - for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) { - int part = entry.getKey(); - Set<UUID> newOwners = entry.getValue(); - - GridDhtLocalPartition locPart = localPartition(part); - - if (locPart == null || locPart.state() != OWNING) - continue; - - if (!newOwners.contains(ctx.localNodeId())) { - rebalancePartition(part, haveHistory.contains(part)); - - result.computeIfAbsent(ctx.localNodeId(), n -> new HashSet<>()); - result.get(ctx.localNodeId()).add(part); - } - } - - // Then process remote partitions. - for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) { - int part = entry.getKey(); - Set<UUID> newOwners = entry.getValue(); - - for (Map.Entry<UUID, GridDhtPartitionMap> remotes : node2part.entrySet()) { - UUID remoteNodeId = remotes.getKey(); - GridDhtPartitionMap partMap = remotes.getValue(); - - GridDhtPartitionState state = partMap.get(part); - - if (state == null || state != OWNING) - continue; - - if (!newOwners.contains(remoteNodeId)) { - partMap.put(part, MOVING); - - partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); - - if (partMap.nodeId().equals(ctx.localNodeId())) - updateSeq.setIfGreater(partMap.updateSequence()); - - result.computeIfAbsent(remoteNodeId, n -> new HashSet<>()); - result.get(remoteNodeId).add(part); - } - } - } - - for (Map.Entry<UUID, Set<Integer>> entry : result.entrySet()) { - UUID nodeId = entry.getKey(); - Set<Integer> rebalancedParts = entry.getValue(); - - if (!rebalancedParts.isEmpty()) { - Set<Integer> historical = rebalancedParts.stream() - .filter(haveHistory::contains) - .collect(Collectors.toSet()); - - // Filter out partitions having WAL history. - rebalancedParts.removeAll(historical); - - U.warn(log, "Partitions have been scheduled for rebalancing due to outdated update counter " - + "[grp=" + grp.cacheOrGroupName() - + ", nodeId=" + nodeId - + ", partsFull=" + S.compact(rebalancedParts) - + ", partsHistorical=" + S.compact(historical) + "]"); - } - } - - node2part = new GridDhtPartitionFullMap(node2part, updateSeq.incrementAndGet()); - } finally { - lock.writeLock().unlock(); - } - } - finally { - ctx.database().checkpointReadUnlock(); - } - - return result; - } - - /** - * Prepares given partition {@code p} for rebalance. - * Changes partition state to MOVING and starts clearing if needed. - * Prevents ongoing renting if required. - * - * @param p Partition id. - * @param haveHistory If {@code true} there is WAL history to rebalance partition, - * in other case partition will be cleared for full rebalance. - */ - private GridDhtLocalPartition rebalancePartition(int p, boolean haveHistory) { - GridDhtLocalPartition part = getOrCreatePartition(p); - - // Prevent renting. - if (part.state() == RENTING) { - if (part.reserve()) { - part.moving(); - part.release(); - } - else { - assert part.state() == EVICTED : part; - - part = getOrCreatePartition(p); - } - } - - if (part.state() != MOVING) - part.moving(); - - if (!haveHistory) - part.clearAsync(); - - assert part.state() == MOVING : part; - - return part; - } - - /** - * Finds local partitions which don't belong to affinity and runs eviction process for such partitions. - * - * @param updateSeq Update sequence. - * @param aff Affinity assignments. - * @return {@code True} if there are local partitions need to be evicted. - */ - private boolean checkEvictions(long updateSeq, AffinityAssignment aff) { - if (!ctx.kernalContext().state().evictionsAllowed()) - return false; - - boolean changed = false; - - UUID locId = ctx.localNodeId(); - - List<IgniteInternalFuture<?>> rentingFutures = new ArrayList<>(); - - for (int p = 0; p < locParts.length(); p++) { - GridDhtLocalPartition part = locParts.get(p); - - if (part == null) - continue; - - GridDhtPartitionState state = part.state(); - - if (state.active()) { - List<ClusterNode> affNodes = aff.get(p); - - if (!affNodes.contains(ctx.localNode())) { - List<ClusterNode> nodes = nodes(p, aff.topologyVersion(), OWNING, null); - Collection<UUID> nodeIds = F.nodeIds(nodes); - - // If all affinity nodes are owners, then evict partition from local node. - if (nodeIds.containsAll(F.nodeIds(affNodes))) { - GridDhtPartitionState state0 = part.state(); - - IgniteInternalFuture<?> rentFut = part.rent(false); - - rentingFutures.add(rentFut); - - updateSeq = updateLocal(part.id(), part.state(), updateSeq, aff.topologyVersion()); - - changed = state0 != part.state(); - - if (log.isDebugEnabled()) { - log.debug("Evicted local partition (all affinity nodes are owners) [grp=" + grp.cacheOrGroupName() + - ", part=" + part + ']'); - } - } - else { - int ownerCnt = nodeIds.size(); - int affCnt = affNodes.size(); - - if (ownerCnt > affCnt) { //TODO !!! we could loss all owners in such case. Should be fixed by GG-13223 - // Sort by node orders in ascending order. -
<TRUNCATED>