IGNITE-5684 - Fixed update sequence handling in GridDhtPartitionFullMap
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4f3b69c7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4f3b69c7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4f3b69c7 Branch: refs/heads/ignite-2.1 Commit: 4f3b69c7018913618e7bbc724ab83ac6c274bc1f Parents: 2a2c803 Author: Pavel Kovalenko <jokse...@gmail.com> Authored: Tue Jul 11 11:27:57 2017 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Tue Jul 11 11:27:57 2017 +0300 ---------------------------------------------------------------------- .../dht/GridClientPartitionTopology.java | 71 +++++++++++++------- .../dht/GridDhtPartitionTopologyImpl.java | 27 +++++--- .../dht/preloader/GridDhtPartitionFullMap.java | 24 +------ .../CacheLateAffinityAssignmentTest.java | 2 +- 4 files changed, 67 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4f3b69c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index e751961..f4ed517 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -572,56 +572,66 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } } + /** + * Checks should current partition map overwritten by new partition map + * Method returns true if topology version or update sequence of new map are greater than of current map. + * + * @param currentMap Current partition map. + * @param newMap New partition map. + * @return True if current partition map should be overwritten by new partition map, false in other case. + */ + private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) { + return newMap != null && + (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 || + newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && newMap.updateSequence() > currentMap.updateSequence()); + } + /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Override public boolean update( - @Nullable AffinityTopologyVersion exchVer, + @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, Map<Integer, T2<Long, Long>> cntrMap, Set<Integer> partsToReload ) { if (log.isDebugEnabled()) - log.debug("Updating full partition map [exchVer=" + exchVer + ", parts=" + fullMapString() + ']'); + log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']'); lock.writeLock().lock(); try { - if (exchVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchVer) >= 0) { + if (exchangeVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchangeVer) >= 0) { if (log.isDebugEnabled()) log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + - lastExchangeVer + ", exchVer=" + exchVer + ']'); - - return false; - } - - if (node2part != null && node2part.compareTo(partMap) >= 0) { - if (log.isDebugEnabled()) - log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" + - lastExchangeVer + ", exchVer=" + exchVer + ", curMap=" + node2part + ", newMap=" + partMap + ']'); + lastExchangeVer + ", exchVer=" + exchangeVer + ']'); return false; } - updateSeq.incrementAndGet(); - - if (exchVer != null) - lastExchangeVer = exchVer; + boolean fullMapUpdated = (node2part == null); if (node2part != null) { for (GridDhtPartitionMap part : node2part.values()) { GridDhtPartitionMap newPart = partMap.get(part.nodeId()); - // If for some nodes current partition has a newer map, - // then we keep the newer value. - if (newPart != null && newPart.updateSequence() < part.updateSequence()) { - if (log.isDebugEnabled()) - log.debug("Overriding partition map in full update map [exchVer=" + exchVer + ", curPart=" + - mapString(part) + ", newPart=" + mapString(newPart) + ']'); + if (shouldOverridePartitionMap(part, newPart)) { + fullMapUpdated = true; + if (log.isDebugEnabled()) + log.debug("Overriding partition map in full update map [exchId=" + exchangeVer + ", curPart=" + + mapString(part) + ", newPart=" + mapString(newPart) + ']'); + } + else { + // If for some nodes current partition has a newer map, + // then we keep the newer value. partMap.put(part.nodeId(), part); } } + for (GridDhtPartitionMap part : partMap.values()) + fullMapUpdated |= !node2part.containsKey(part); + + // Remove entry if node left. for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) { UUID nodeId = it.next(); @@ -635,9 +645,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } } + if (!fullMapUpdated) { + if (log.isDebugEnabled()) + log.debug("No updates for full partition map (will ignore) [lastExch=" + + lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']'); + + return false; + } + + if (exchangeVer != null) + lastExchangeVer = exchangeVer; + + node2part = partMap; + + updateSeq.incrementAndGet(); + part2node.clear(); - for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { if (e0.getValue() != MOVING && e0.getValue() != OWNING) continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/4f3b69c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index fe0a0c6..601da1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1134,22 +1134,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { return false; } - if (node2part != null && node2part.compareTo(partMap) > 0) { - if (log.isDebugEnabled()) - log.debug("Stale partition map for full partition map update (will ignore) [lastExch=" + - lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']'); - - return false; - } - - if (exchangeVer != null) - lastExchangeVer = exchangeVer; + boolean fullMapUpdated = (node2part == null); if (node2part != null) { for (GridDhtPartitionMap part : node2part.values()) { GridDhtPartitionMap newPart = partMap.get(part.nodeId()); if (shouldOverridePartitionMap(part, newPart)) { + fullMapUpdated = true; + if (log.isDebugEnabled()) log.debug("Overriding partition map in full update map [exchId=" + exchangeVer + ", curPart=" + mapString(part) + ", newPart=" + mapString(newPart) + ']'); @@ -1161,6 +1154,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } + for (GridDhtPartitionMap part : partMap.values()) + fullMapUpdated |= !node2part.containsKey(part); + // Remove entry if node left. for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) { UUID nodeId = it.next(); @@ -1175,6 +1171,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } + if (!fullMapUpdated) { + if (log.isDebugEnabled()) + log.debug("No updates for full partition map (will ignore) [lastExch=" + + lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']'); + + return false; + } + + if (exchangeVer != null) + lastExchangeVer = exchangeVer; + node2part = partMap; AffinityTopologyVersion affVer = grp.affinity().lastVersion(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4f3b69c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java index 27e6777..73b7714 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java @@ -31,8 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; /** * Full partition map. */ -public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap> - implements Comparable<GridDhtPartitionFullMap>, Externalizable { +public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap> implements Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -178,27 +177,6 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap> } /** {@inheritDoc} */ - @Override public int compareTo(GridDhtPartitionFullMap o) { - assert nodeId == null || (nodeOrder != o.nodeOrder && !nodeId.equals(o.nodeId)) || - (nodeOrder == o.nodeOrder && nodeId.equals(o.nodeId)): "Inconsistent node order and ID [id1=" + nodeId + - ", order1=" + nodeOrder + ", id2=" + o.nodeId + ", order2=" + o.nodeOrder + ']'; - - if (nodeId == null && o.nodeId != null) - return -1; - else if (nodeId != null && o.nodeId == null) - return 1; - else if (nodeId == null && o.nodeId == null) - return 0; - - int res = Long.compare(nodeOrder, o.nodeOrder); - - if (res == 0) - res = Long.compare(updateSeq, o.updateSeq); - - return res; - } - - /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeUuid(out, nodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/4f3b69c7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index 6174209..23043d1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -181,7 +181,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { MemoryConfiguration cfg1 = new MemoryConfiguration(); - cfg1.setDefaultMemoryPolicySize(50 * 1024 * 1024L); + cfg1.setDefaultMemoryPolicySize(100 * 1024 * 1024L); cfg.setMemoryConfiguration(cfg1);