http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java new file mode 100644 index 0000000..9140322 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java @@ -0,0 +1,1369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.topology; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; +import org.apache.ignite.internal.util.F0; +import org.apache.ignite.internal.util.GridAtomicLong; +import org.apache.ignite.internal.util.GridPartitionStateMap; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; + +/** + * Partition topology for node which does not have any local partitions. + */ +@GridToStringExclude +public class GridClientPartitionTopology implements GridDhtPartitionTopology { + /** */ + private static final GridDhtPartitionState[] MOVING_STATES = new GridDhtPartitionState[] {MOVING}; + + /** If true, then check consistency. */ + private static final boolean CONSISTENCY_CHECK = false; + + /** Flag to control amount of output for full map. */ + private static final boolean FULL_MAP_DEBUG = false; + + /** Cache shared context. */ + private GridCacheSharedContext cctx; + + /** Cache ID. */ + private int grpId; + + /** Logger. */ + private final IgniteLogger log; + + /** Node to partition map. */ + private GridDhtPartitionFullMap node2part; + + /** Partition to node map. */ + private final Map<Integer, Set<UUID>> part2node = new HashMap<>(); + + /** */ + private AffinityTopologyVersion lastExchangeVer; + + /** */ + private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; + + /** */ + private volatile boolean stopping; + + /** A future that will be completed when topology with version topVer will be ready to use. */ + private volatile GridDhtTopologyFuture topReadyFut; + + /** */ + private final GridAtomicLong updateSeq = new GridAtomicLong(1); + + /** Lock. */ + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + /** Partition update counters. */ + private CachePartitionFullCountersMap cntrMap; + + /** */ + private final Object similarAffKey; + + /** */ + private volatile DiscoCache discoCache; + + /** */ + private final int parts; + + /** */ + private volatile Map<Integer, Long> globalPartSizes; + + /** + * @param cctx Context. + * @param discoCache Discovery data cache. + * @param grpId Group ID. + * @param parts Number of partitions in the group. + * @param similarAffKey Key to find caches with similar affinity. + */ + public GridClientPartitionTopology( + GridCacheSharedContext<?, ?> cctx, + DiscoCache discoCache, + int grpId, + int parts, + Object similarAffKey + ) { + this.cctx = cctx; + this.discoCache = discoCache; + this.grpId = grpId; + this.similarAffKey = similarAffKey; + this.parts = parts; + + topVer = AffinityTopologyVersion.NONE; + + log = cctx.logger(getClass()); + + node2part = new GridDhtPartitionFullMap(cctx.localNode().id(), + cctx.localNode().order(), + updateSeq.get()); + + cntrMap = new CachePartitionFullCountersMap(parts); + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return parts; + } + + /** + * @return Key to find caches with similar affinity. + */ + @Nullable public Object similarAffinityKey() { + return similarAffKey; + } + + /** + * @return Full map string representation. + */ + @SuppressWarnings({"ConstantConditions"}) + private String fullMapString() { + return node2part == null ? "null" : FULL_MAP_DEBUG ? node2part.toFullString() : node2part.toString(); + } + + /** + * @param map Map to get string for. + * @return Full map string representation. + */ + @SuppressWarnings({"ConstantConditions"}) + private String mapString(GridDhtPartitionMap map) { + return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString(); + } + + /** {@inheritDoc} */ + @Override public int groupId() { + return grpId; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) + @Override public void readLock() { + lock.readLock().lock(); + } + + /** {@inheritDoc} */ + @Override public void readUnlock() { + lock.readLock().unlock(); + } + + /** {@inheritDoc} */ + @Override public MvccCoordinator mvccCoordinator() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean holdsLock() { + return lock.isWriteLockedByCurrentThread() || lock.getReadHoldCount() > 0; + } + + /** {@inheritDoc} */ + @Override public void updateTopologyVersion( + GridDhtTopologyFuture exchFut, + DiscoCache discoCache, + MvccCoordinator mvccCrd, + long updSeq, + boolean stopping + ) throws IgniteInterruptedCheckedException { + U.writeLock(lock); + + try { + AffinityTopologyVersion exchTopVer = exchFut.initialVersion(); + + assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [grp=" + grpId + + ", topVer=" + topVer + + ", exchVer=" + exchTopVer + + ", discoCacheVer=" + (this.discoCache != null ? this.discoCache.version() : "None") + + ", exchDiscoCacheVer=" + discoCache.version() + ']'; + + this.stopping = stopping; + + topVer = exchTopVer; + this.discoCache = discoCache; + + updateSeq.setIfGreater(updSeq); + + topReadyFut = exchFut; + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion readyTopologyVersion() { + lock.readLock().lock(); + + try { + assert topVer.topologyVersion() > 0; + + return topVer; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion lastTopologyChangeVersion() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public GridDhtTopologyFuture topologyVersionFuture() { + assert topReadyFut != null; + + return topReadyFut; + } + + /** {@inheritDoc} */ + @Override public boolean stopping() { + return stopping; + } + + /** {@inheritDoc} */ + @Override public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, + GridDhtPartitionsExchangeFuture exchFut) { + return false; + } + + /** {@inheritDoc} */ + @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, + boolean initParts, + boolean updateMoving) + throws IgniteCheckedException + { + ClusterNode loc = cctx.localNode(); + + U.writeLock(lock); + + try { + if (stopping) + return; + + discoCache = exchFut.events().discoveryCache(); + + beforeExchange0(loc, exchFut); + + if (updateMoving) { + ExchangeDiscoveryEvents evts = exchFut.context().events(); + + GridAffinityAssignmentCache aff = cctx.affinity().affinity(grpId); + + assert aff.lastVersion().equals(evts.topologyVersion()); + + createMovingPartitions(aff.readyAffinity(evts.topologyVersion())); + } + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * @param aff Affinity. + */ + private void createMovingPartitions(AffinityAssignment aff) { + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + GridDhtPartitionMap map = e.getValue(); + + addMoving(map, aff.backupPartitions(e.getKey())); + addMoving(map, aff.primaryPartitions(e.getKey())); + } + } + + /** + * @param map Node partition state map. + * @param parts Partitions assigned to node. + */ + private void addMoving(GridDhtPartitionMap map, Set<Integer> parts) { + if (F.isEmpty(parts)) + return; + + for (Integer p : parts) { + GridDhtPartitionState state = map.get(p); + + if (state == null || state == EVICTED) + map.put(p, MOVING); + } + } + + /** + * @param loc Local node. + * @param exchFut Exchange future. + */ + private void beforeExchange0(ClusterNode loc, GridDhtPartitionsExchangeFuture exchFut) { + GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + + if (exchFut.context().events().hasServerLeft()) { + List<DiscoveryEvent> evts0 = exchFut.context().events().events(); + + for (int i = 0; i < evts0.size(); i++) { + DiscoveryEvent evt = evts0.get(i); + + if (ExchangeDiscoveryEvents.serverLeftEvent(evt)) + removeNode(evt.eventNode().id()); + } + } + + // In case if node joins, get topology at the time of joining node. + ClusterNode oldest = discoCache.oldestAliveServerNode(); + + assert oldest != null; + + if (log.isDebugEnabled()) + log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); + + long updateSeq = this.updateSeq.incrementAndGet(); + + // If this is the oldest node. + if (oldest.id().equals(loc.id()) || exchFut.dynamicCacheGroupStarted(grpId)) { + if (node2part == null) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); + + if (log.isDebugEnabled()) + log.debug("Created brand new full topology map on oldest node [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } + else if (!node2part.valid()) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); + + if (log.isDebugEnabled()) + log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + + node2part + ']'); + } + else if (!node2part.nodeId().equals(loc.id())) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); + + if (log.isDebugEnabled()) + log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } + } + + consistencyCheck(); + + if (log.isDebugEnabled()) + log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + + fullMapString() + ']'); + } + + /** {@inheritDoc} */ + @Override public void afterStateRestored(AffinityTopologyVersion topVer) { + // no-op + } + + /** {@inheritDoc} */ + @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { + AffinityTopologyVersion topVer = exchFut.topologyVersion(); + + lock.writeLock().lock(); + + try { + assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology version [topVer=" + + topVer + ", exchId=" + exchFut.exchangeId() + ']'; + + if (log.isDebugEnabled()) + log.debug("Partition map before afterExchange [exchId=" + exchFut.exchangeId() + ", fullMap=" + + fullMapString() + ']'); + + updateSeq.incrementAndGet(); + + consistencyCheck(); + } + finally { + lock.writeLock().unlock(); + } + + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDhtLocalPartition localPartition( + int p, + AffinityTopologyVersion topVer, + boolean create + ) + throws GridDhtInvalidPartitionException { + if (!create) + return null; + + throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition (often may be caused by " + + "inconsistent 'key.hashCode()' implementation) " + + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, + boolean create, boolean showRenting) throws GridDhtInvalidPartitionException { + return localPartition(p, topVer, create); + } + + /** {@inheritDoc} */ + @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public GridDhtLocalPartition localPartition(int p) { + return localPartition(p, AffinityTopologyVersion.NONE, false); + } + + /** {@inheritDoc} */ + @Override public void releasePartitions(int... parts) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public List<GridDhtLocalPartition> localPartitions() { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public Collection<GridDhtLocalPartition> currentLocalPartitions() { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public void onRemoved(GridDhtCacheEntry e) { + assert false : "Entry should not be removed from client topology: " + e; + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionMap localPartitionMap() { + lock.readLock().lock(); + + try { + return new GridDhtPartitionMap(cctx.localNodeId(), updateSeq.get(), topVer, + GridPartitionStateMap.EMPTY, true); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) { + lock.readLock().lock(); + + try { + GridDhtPartitionMap partMap = node2part.get(nodeId); + + if (partMap != null) { + GridDhtPartitionState state = partMap.get(part); + + return state == null ? EVICTED : state; + } + + return EVICTED; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public List<ClusterNode> nodes(int p, + AffinityAssignment affAssignment, + List<ClusterNode> affNodes) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { + lock.readLock().lock(); + + try { + assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + + ", node2part=" + node2part + ']'; + + List<ClusterNode> nodes = null; + + Collection<UUID> nodeIds = part2node.get(p); + + if (!F.isEmpty(nodeIds)) { + for (UUID nodeId : nodeIds) { + ClusterNode n = discoCache.node(nodeId); + + if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { + if (nodes == null) + nodes = new ArrayList<>(nodeIds.size()); + + nodes.add(n); + } + } + } + + return nodes; + } + finally { + lock.readLock().unlock(); + } + } + + /** + * @param p Partition. + * @param topVer Topology version ({@code -1} for all nodes). + * @param state Partition state. + * @param states Additional partition states. + * @return List of nodes for the partition. + */ + private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { + Collection<UUID> allIds = F.nodeIds(discoCache.cacheGroupAffinityNodes(grpId)); + + lock.readLock().lock(); + + try { + assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + + ", allIds=" + allIds + ", node2part=" + node2part + ']'; + + Collection<UUID> nodeIds = part2node.get(p); + + // Node IDs can be null if both, primary and backup, nodes disappear. + int size = nodeIds == null ? 0 : nodeIds.size(); + + if (size == 0) + return Collections.emptyList(); + + List<ClusterNode> nodes = new ArrayList<>(size); + + for (UUID id : nodeIds) { + if (topVer.topologyVersion() > 0 && !F.contains(allIds, id)) + continue; + + if (hasState(p, id, state, states)) { + ClusterNode n = discoCache.node(id); + + if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) + nodes.add(n); + } + } + + return nodes; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) { + return nodes(p, topVer, OWNING, null); + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> owners(int p) { + return owners(p, AffinityTopologyVersion.NONE); + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> allOwners() { + lock.readLock().lock(); + + try { + int parts = partitions(); + + List<List<ClusterNode>> res = new ArrayList<>(parts); + + for (int i = 0; i < parts; i++) + res.add(new ArrayList<>()); + + List<ClusterNode> allNodes = discoCache.cacheGroupAffinityNodes(grpId); + + for (int i = 0; i < allNodes.size(); i++) { + ClusterNode node = allNodes.get(i); + + GridDhtPartitionMap nodeParts = node2part.get(node.id()); + + if (nodeParts != null) { + for (Map.Entry<Integer, GridDhtPartitionState> e : nodeParts.map().entrySet()) { + if (e.getValue() == OWNING) { + int part = e.getKey(); + + List<ClusterNode> owners = res.get(part); + + owners.add(node); + } + } + } + } + + return res; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> moving(int p) { + return nodes(p, AffinityTopologyVersion.NONE, MOVING, null); + } + + /** + * @param p Partition. + * @param topVer Topology version. + * @return List of nodes in state OWNING or MOVING. + */ + private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer) { + return nodes(p, topVer, OWNING, MOVING_STATES); + } + + /** {@inheritDoc} */ + @Override public long updateSequence() { + return updateSeq.get(); + } + + /** + * @return Last update sequence. + */ + public long lastUpdateSequence() { + lock.writeLock().lock(); + + try { + return updateSeq.incrementAndGet(); + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) { + lock.readLock().lock(); + + try { + if (stopping || node2part == null) + return null; + + assert node2part.valid() : "Invalid node2part [node2part: " + node2part + + ", locNodeId=" + cctx.localNodeId() + + ", igniteInstanceName=" + cctx.igniteInstanceName() + ']'; + + GridDhtPartitionFullMap m = node2part; + + return new GridDhtPartitionFullMap(m.nodeId(), m.nodeOrder(), m.updateSequence(), m, onlyActive); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * Checks should current partition map overwritten by new partition map + * Method returns true if topology version or update sequence of new map are greater than of current map. + * + * @param currentMap Current partition map. + * @param newMap New partition map. + * @return True if current partition map should be overwritten by new partition map, false in other case. + */ + private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) { + return newMap != null && + (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 || + newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && newMap.updateSequence() > currentMap.updateSequence()); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) + @Override public boolean update( + @Nullable AffinityTopologyVersion exchangeVer, + GridDhtPartitionFullMap partMap, + @Nullable CachePartitionFullCountersMap cntrMap, + Set<Integer> partsToReload, + @Nullable Map<Integer, Long> partSizes, + @Nullable AffinityTopologyVersion msgTopVer) { + if (log.isDebugEnabled()) + log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']'); + + lock.writeLock().lock(); + + try { + if (exchangeVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchangeVer) >= 0) { + if (log.isDebugEnabled()) + log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + + lastExchangeVer + ", exchVer=" + exchangeVer + ']'); + + return false; + } + + if (msgTopVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(msgTopVer) > 0) { + if (log.isDebugEnabled()) + log.debug("Stale topology version for full partition map update message (will ignore) " + + "[lastExchId=" + lastExchangeVer + ", topVersion=" + msgTopVer + ']'); + + return false; + } + + boolean fullMapUpdated = (node2part == null); + + if (node2part != null) { + for (GridDhtPartitionMap part : node2part.values()) { + GridDhtPartitionMap newPart = partMap.get(part.nodeId()); + + if (shouldOverridePartitionMap(part, newPart)) { + fullMapUpdated = true; + + if (log.isDebugEnabled()) + log.debug("Overriding partition map in full update map [exchId=" + exchangeVer + ", curPart=" + + mapString(part) + ", newPart=" + mapString(newPart) + ']'); + } + else { + // If for some nodes current partition has a newer map, + // then we keep the newer value. + partMap.put(part.nodeId(), part); + } + } + + // Check that we have new nodes. + for (GridDhtPartitionMap part : partMap.values()) { + if (fullMapUpdated) + break; + + fullMapUpdated = !node2part.containsKey(part.nodeId()); + } + + // Remove entry if node left. + for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) { + UUID nodeId = it.next(); + + if (!cctx.discovery().alive(nodeId)) { + if (log.isDebugEnabled()) + log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" + + partMap + ']'); + + it.remove(); + } + } + } + + if (!fullMapUpdated) { + if (log.isDebugEnabled()) + log.debug("No updates for full partition map (will ignore) [lastExch=" + + lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']'); + + return false; + } + + if (exchangeVer != null) + lastExchangeVer = exchangeVer; + + node2part = partMap; + + updateSeq.incrementAndGet(); + + part2node.clear(); + + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { + if (e0.getValue() != MOVING && e0.getValue() != OWNING) + continue; + + int p = e0.getKey(); + + Set<UUID> ids = part2node.get(p); + + if (ids == null) + // Initialize HashSet to size 3 in anticipation that there won't be + // more than 3 nodes per partitions. + part2node.put(p, ids = U.newHashSet(3)); + + ids.add(e.getKey()); + } + } + + if (cntrMap != null) + this.cntrMap = new CachePartitionFullCountersMap(cntrMap); + + if (partSizes != null) + this.globalPartSizes = partSizes; + + consistencyCheck(); + + if (log.isDebugEnabled()) + log.debug("Partition map after full update: " + fullMapString()); + + return false; + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) { + assert cntrMap != null; + + lock.writeLock().lock(); + + try { + for (int i = 0; i < cntrMap.size(); i++) { + int pId = cntrMap.partitionAt(i); + + long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i); + long updateCntr = cntrMap.updateCounterAt(i); + + if (this.cntrMap.updateCounter(pId) < updateCntr) { + this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr); + this.cntrMap.updateCounter(pId, updateCntr); + } + } + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void applyUpdateCounters() { + // No-op on client topology. + } + + /** + * Method checks is new partition map more stale than current partition map + * New partition map is stale if topology version or update sequence are less or equal than of current map + * + * @param currentMap Current partition map + * @param newMap New partition map + * @return True if new partition map is more stale than current partition map, false in other case + */ + private boolean isStaleUpdate(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) { + return currentMap != null && newMap.compareTo(currentMap) <= 0; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) + @Override public boolean update( + @Nullable GridDhtPartitionExchangeId exchId, + GridDhtPartitionMap parts, + boolean force + ) { + if (log.isDebugEnabled()) + log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); + + if (!cctx.discovery().alive(parts.nodeId())) { + if (log.isDebugEnabled()) + log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + + ", parts=" + parts + ']'); + + return false; + } + + lock.writeLock().lock(); + + try { + if (stopping) + return false; + + if (!force) { + if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) { + if (log.isDebugEnabled()) + log.debug("Stale exchange id for single partition map update (will ignore) [lastExchVer=" + + lastExchangeVer + ", exchId=" + exchId + ']'); + + return false; + } + } + + if (exchId != null) + lastExchangeVer = exchId.topologyVersion(); + + if (node2part == null) + // Create invalid partition map. + node2part = new GridDhtPartitionFullMap(); + + GridDhtPartitionMap cur = node2part.get(parts.nodeId()); + + if (force) { + if (cur != null && cur.topologyVersion().initialized()) + parts.updateSequence(cur.updateSequence(), cur.topologyVersion()); + } + else if (isStaleUpdate(cur, parts)) { + if (log.isDebugEnabled()) + log.debug("Stale update for single partition map update (will ignore) [exchId=" + exchId + + ", curMap=" + cur + ", newMap=" + parts + ']'); + + return false; + } + + long updateSeq = this.updateSeq.incrementAndGet(); + + node2part = new GridDhtPartitionFullMap(node2part, updateSeq); + + boolean changed = false; + + if (cur == null || !cur.equals(parts)) + changed = true; + + node2part.put(parts.nodeId(), parts); + + // Add new mappings. + for (Map.Entry<Integer,GridDhtPartitionState> e : parts.entrySet()) { + int p = e.getKey(); + + Set<UUID> ids = part2node.get(p); + + if (e.getValue() == MOVING || e.getValue() == OWNING) { + if (ids == null) + // Initialize HashSet to size 3 in anticipation that there won't be + // more than 3 nodes per partition. + part2node.put(p, ids = U.newHashSet(3)); + + changed |= ids.add(parts.nodeId()); + } + else { + if (ids != null) + changed |= ids.remove(parts.nodeId()); + } + } + + // Remove obsolete mappings. + if (cur != null) { + for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { + Set<UUID> ids = part2node.get(p); + + if (ids != null) + changed |= ids.remove(parts.nodeId()); + } + } + + consistencyCheck(); + + if (log.isDebugEnabled()) + log.debug("Partition map after single update: " + fullMapString()); + + return changed; + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, AffinityAssignment assignment, + boolean updateRebalanceVer) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean detectLostPartitions(AffinityTopologyVersion affVer, DiscoveryEvent discoEvt) { + assert false : "detectLostPartitions should never be called on client topology"; + + return false; + } + + /** {@inheritDoc} */ + @Override public void resetLostPartitions(AffinityTopologyVersion affVer) { + assert false : "resetLostPartitions should never be called on client topology"; + } + + /** {@inheritDoc} */ + @Override public Collection<Integer> lostPartitions() { + assert false : "lostPartitions should never be called on client topology"; + + return Collections.emptyList(); + } + + /** + * Updates value for single partition. + * + * @param p Partition. + * @param nodeId Node ID. + * @param state State. + * @param updateSeq Update sequence. + */ + private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) { + assert lock.isWriteLockedByCurrentThread(); + assert nodeId.equals(cctx.localNodeId()); + + // In case if node joins, get topology at the time of joining node. + ClusterNode oldest = discoCache.oldestAliveServerNode(); + + // If this node became the oldest node. + if (cctx.localNode().equals(oldest)) { + long seq = node2part.updateSequence(); + + if (seq != updateSeq) { + if (seq > updateSeq) { + if (this.updateSeq.get() < seq) { + // Update global counter if necessary. + boolean b = this.updateSeq.compareAndSet(this.updateSeq.get(), seq + 1); + + assert b : "Invalid update sequence [updateSeq=" + updateSeq + ", seq=" + seq + + ", curUpdateSeq=" + this.updateSeq.get() + ", node2part=" + node2part.toFullString() + ']'; + + updateSeq = seq + 1; + } + else + updateSeq = seq; + } + + node2part.updateSequence(updateSeq); + } + } + + GridDhtPartitionMap map = node2part.get(nodeId); + + if (map == null) + node2part.put(nodeId, map = new GridDhtPartitionMap(nodeId, updateSeq, topVer, + GridPartitionStateMap.EMPTY, false)); + + map.updateSequence(updateSeq, topVer); + + map.put(p, state); + + Set<UUID> ids = part2node.get(p); + + if (ids == null) + part2node.put(p, ids = U.newHashSet(3)); + + ids.add(nodeId); + } + + /** + * @param nodeId Node to remove. + */ + private void removeNode(UUID nodeId) { + assert nodeId != null; + assert lock.writeLock().isHeldByCurrentThread(); + + ClusterNode loc = cctx.localNode(); + + if (node2part != null) { + if (!node2part.nodeId().equals(loc.id())) { + updateSeq.setIfGreater(node2part.updateSequence()); + + node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.incrementAndGet(), + node2part, false); + } + else + node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence()); + + GridDhtPartitionMap parts = node2part.remove(nodeId); + + if (parts != null) { + for (Integer p : parts.keySet()) { + Set<UUID> nodeIds = part2node.get(p); + + if (nodeIds != null) { + nodeIds.remove(nodeId); + + if (nodeIds.isEmpty()) + part2node.remove(p); + } + } + } + + consistencyCheck(); + } + } + + /** {@inheritDoc} */ + @Override public boolean own(GridDhtLocalPartition part) { + assert false : "Client topology should never own a partition: " + part; + + return false; + } + + /** {@inheritDoc} */ + @Override public void ownMoving(AffinityTopologyVersion topVer) { + // No-op + } + + /** {@inheritDoc} */ + @Override public void onEvicted(GridDhtLocalPartition part, boolean updateSeq) { + assert updateSeq || lock.isWriteLockedByCurrentThread(); + + lock.writeLock().lock(); + + try { + if (stopping) + return; + + assert part.state() == EVICTED; + + long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get(); + + updateLocal(part.id(), cctx.localNodeId(), part.state(), seq); + + consistencyCheck(); + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDhtPartitionMap partitions(UUID nodeId) { + lock.readLock().lock(); + + try { + return node2part.get(nodeId); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory) { + Map<UUID, Set<Integer>> result = new HashMap<>(); + + lock.writeLock().lock(); + + try { + // Process remote partitions. + for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) { + int part = entry.getKey(); + Set<UUID> newOwners = entry.getValue(); + + for (Map.Entry<UUID, GridDhtPartitionMap> remotes : node2part.entrySet()) { + UUID remoteNodeId = remotes.getKey(); + GridDhtPartitionMap partMap = remotes.getValue(); + + GridDhtPartitionState state = partMap.get(part); + + if (state == null || state != OWNING) + continue; + + if (!newOwners.contains(remoteNodeId)) { + partMap.put(part, MOVING); + + partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); + + result.computeIfAbsent(remoteNodeId, n -> new HashSet<>()); + result.get(remoteNodeId).add(part); + } + } + } + + for (Map.Entry<UUID, Set<Integer>> entry : result.entrySet()) { + UUID nodeId = entry.getKey(); + Set<Integer> partsToRebalance = entry.getValue(); + + if (!partsToRebalance.isEmpty()) { + Set<Integer> historical = partsToRebalance.stream() + .filter(haveHistory::contains) + .collect(Collectors.toSet()); + + // Filter out partitions having WAL history. + partsToRebalance.removeAll(historical); + + U.warn(log, "Partitions have been scheduled for rebalancing due to outdated update counter " + + "[grpId=" + grpId + + ", nodeId=" + nodeId + + ", partsFull=" + S.compact(partsToRebalance) + + ", partsHistorical=" + S.compact(historical) + "]"); + } + } + + for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) + part2node.put(entry.getKey(), entry.getValue()); + + updateSeq.incrementAndGet(); + } finally { + lock.writeLock().unlock(); + } + + return result; + } + + /** {@inheritDoc} */ + @Override public CachePartitionFullCountersMap fullUpdateCounters() { + lock.readLock().lock(); + + try { + return new CachePartitionFullCountersMap(cntrMap); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) { + return CachePartitionPartialCountersMap.EMPTY; + } + + /** {@inheritDoc} */ + @Override public Map<Integer, Long> partitionSizes() { + return Collections.emptyMap(); + } + + /** {@inheritDoc} */ + @Override @Nullable public Map<Integer, Long> globalPartSizes() { + lock.readLock().lock(); + + try { + if (globalPartSizes == null) + return Collections.emptyMap(); + + return Collections.unmodifiableMap(globalPartSizes); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void globalPartSizes(@Nullable Map<Integer, Long> partSizes) { + lock.writeLock().lock(); + + try { + this.globalPartSizes = partSizes; + } + finally { + lock.writeLock().unlock(); + } + } + + + /** {@inheritDoc} */ + @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { + assert false : "Should not be called on non-affinity node"; + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean hasMovingPartitions() { + lock.readLock().lock(); + + try { + assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + + ", locNodeId=" + cctx.localNodeId() + + ", igniteInstanceName=" + cctx.igniteInstanceName() + ']'; + + for (GridDhtPartitionMap map : node2part.values()) { + if (map.hasMovingPartitions()) + return true; + } + + return false; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats(int threshold) { + X.println(">>> Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() + + ", grpId=" + grpId + ']'); + } + + /** + * @param p Partition. + * @param nodeId Node ID. + * @param match State to match. + * @param matches Additional states. + * @return Filter for owners of this partition. + */ + private boolean hasState(final int p, @Nullable UUID nodeId, final GridDhtPartitionState match, + final GridDhtPartitionState... matches) { + if (nodeId == null) + return false; + + GridDhtPartitionMap parts = node2part.get(nodeId); + + // Set can be null if node has been removed. + if (parts != null) { + GridDhtPartitionState state = parts.get(p); + + if (state == match) + return true; + + if (matches != null && matches.length > 0) + for (GridDhtPartitionState s : matches) + if (state == s) + return true; + } + + return false; + } + + /** + * Checks consistency after all operations. + */ + private void consistencyCheck() { + if (CONSISTENCY_CHECK) { + assert lock.writeLock().isHeldByCurrentThread(); + + if (node2part == null) + return; + + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + for (Integer p : e.getValue().keySet()) { + Set<UUID> nodeIds = part2node.get(p); + + assert nodeIds != null : "Failed consistency check [part=" + p + ", nodeId=" + e.getKey() + ']'; + assert nodeIds.contains(e.getKey()) : "Failed consistency check [part=" + p + ", nodeId=" + + e.getKey() + ", nodeIds=" + nodeIds + ']'; + } + } + + for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) { + for (UUID nodeId : e.getValue()) { + GridDhtPartitionMap map = node2part.get(nodeId); + + assert map != null : "Failed consistency check [part=" + e.getKey() + ", nodeId=" + nodeId + ']'; + assert map.containsKey(e.getKey()) : "Failed consistency check [part=" + e.getKey() + + ", nodeId=" + nodeId + ']'; + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtInvalidPartitionException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtInvalidPartitionException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtInvalidPartitionException.java new file mode 100644 index 0000000..6a2ca6b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtInvalidPartitionException.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.topology; + +/** + * Exception thrown whenever entry is created for invalid partition. + */ +public class GridDhtInvalidPartitionException extends RuntimeException { + /** */ + private static final long serialVersionUID = 0L; + + /** Partition. */ + private final int part; + + /** + * @param part Partition. + * @param msg Message. + */ + public GridDhtInvalidPartitionException(int part, String msg) { + super(msg); + + this.part = part; + } + + /** + * @return Partition. + */ + public int partition() { + return part; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return getClass() + " [part=" + part + ", msg=" + getMessage() + ']'; + } +} \ No newline at end of file