ignite-4296 Optimize GridDhtPartitionsSingleMessage processing - optimized processing of GridDhtPartitionsSingleMessage - minor optimizations for RendezvousAffinityFunction - fixed heartbeats sending in tcp discovery
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/acf20b32 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/acf20b32 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/acf20b32 Branch: refs/heads/ignite-2.0 Commit: acf20b32d8fb68e42b904b091fb2b943f4558cef Parents: b4aedfd Author: sboikov <[email protected]> Authored: Mon Dec 5 14:01:28 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 5 14:01:28 2016 +0300 ---------------------------------------------------------------------- .../rendezvous/RendezvousAffinityFunction.java | 80 ++++-- .../discovery/GridDiscoveryManager.java | 118 +------- .../GridCachePartitionExchangeManager.java | 6 +- .../processors/cache/GridCacheUtils.java | 17 -- .../binary/CacheObjectBinaryProcessorImpl.java | 3 +- .../dht/GridClientPartitionTopology.java | 120 ++++---- .../distributed/dht/GridDhtLocalPartition.java | 1 - .../dht/GridDhtPartitionTopology.java | 28 +- .../dht/GridDhtPartitionTopologyImpl.java | 284 +++++++++++-------- .../dht/preloader/GridDhtPartitionFullMap.java | 18 +- .../GridDhtPartitionsExchangeFuture.java | 56 +++- .../cache/transactions/IgniteTxHandler.java | 4 +- .../service/GridServiceProcessor.java | 4 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 40 ++- .../tcp/internal/TcpDiscoveryStatistics.java | 4 + .../AbstractAffinityFunctionSelfTest.java | 2 +- .../GridDiscoveryManagerAliveCacheSelfTest.java | 2 +- .../GridCachePartitionedAffinitySpreadTest.java | 7 +- .../distributed/dht/GridCacheDhtTestUtils.java | 232 --------------- .../h2/twostep/GridReduceQueryExecutor.java | 14 +- 20 files changed, 437 insertions(+), 603 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java index ec12973..75e7c92 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java @@ -17,7 +17,6 @@ package org.apache.ignite.cache.affinity.rendezvous; -import java.io.ByteArrayOutputStream; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -354,46 +353,69 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** * Returns collection of nodes (primary first) for specified partition. + * + * @param d Message digest. + * @param part Partition. + * @param nodes Nodes. + * @param nodesHash Serialized nodes hashes. + * @param backups Number of backups. + * @param neighborhoodCache Neighborhood. + * @return Assignment. */ - public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups, + public List<ClusterNode> assignPartition(MessageDigest d, + int part, + List<ClusterNode> nodes, + Map<ClusterNode, byte[]> nodesHash, + int backups, @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) { if (nodes.size() <= 1) return nodes; - List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(); + if (d == null) + d = digest.get(); - MessageDigest d = digest.get(); + List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size()); - for (ClusterNode node : nodes) { - Object nodeHash = resolveNodeHash(node); + try { + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] nodeHashBytes = nodesHash.get(node); - byte[] nodeHashBytes = U.marshal(ignite.configuration().getMarshaller(), nodeHash); + if (nodeHashBytes == null) { + Object nodeHash = resolveNodeHash(node); - out.write(U.intToBytes(part), 0, 4); // Avoid IOException. - out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException. + byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash); + + // Add 4 bytes for partition bytes. + nodeHashBytes = new byte[nodeHashBytes0.length + 4]; + + System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length); + + nodesHash.put(node, nodeHashBytes); + } + + U.intToBytes(part, nodeHashBytes, 0); d.reset(); - byte[] bytes = d.digest(out.toByteArray()); + byte[] bytes = d.digest(nodeHashBytes); long hash = - (bytes[0] & 0xFFL) - | ((bytes[1] & 0xFFL) << 8) - | ((bytes[2] & 0xFFL) << 16) - | ((bytes[3] & 0xFFL) << 24) - | ((bytes[4] & 0xFFL) << 32) - | ((bytes[5] & 0xFFL) << 40) - | ((bytes[6] & 0xFFL) << 48) - | ((bytes[7] & 0xFFL) << 56); + (bytes[0] & 0xFFL) + | ((bytes[1] & 0xFFL) << 8) + | ((bytes[2] & 0xFFL) << 16) + | ((bytes[3] & 0xFFL) << 24) + | ((bytes[4] & 0xFFL) << 32) + | ((bytes[5] & 0xFFL) << 40) + | ((bytes[6] & 0xFFL) << 48) + | ((bytes[7] & 0xFFL) << 56); lst.add(F.t(hash, node)); } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); } Collections.sort(lst, COMPARATOR); @@ -474,8 +496,18 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ? GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null; + MessageDigest d = digest.get(); + + List<ClusterNode> nodes = affCtx.currentTopologySnapshot(); + + Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size()); + for (int i = 0; i < parts; i++) { - List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(), + List<ClusterNode> partAssignment = assignPartition(d, + i, + nodes, + nodesHash, + affCtx.backups(), neighborhoodCache); assignments.add(partAssignment); http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index d24f900..ddd4ee3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -42,7 +42,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -1623,17 +1623,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * Gets cache remote nodes for cache with given name. * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of cache nodes. - */ - public Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheName, topVer).remoteCacheNodes(cacheName, topVer.topologyVersion()); - } - - /** - * Gets cache remote nodes for cache with given name. - * * @param topVer Topology version. * @return Collection of cache nodes. */ @@ -1648,7 +1637,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param topVer Topology version. * @return Collection of cache nodes. */ - public Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { + Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { return resolveDiscoCache(cacheName, topVer).aliveCacheNodes(cacheName, topVer.topologyVersion()); } @@ -1659,38 +1648,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param topVer Topology version. * @return Collection of cache nodes. */ - public Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { + Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { return resolveDiscoCache(cacheName, topVer).aliveRemoteCacheNodes(cacheName, topVer.topologyVersion()); } /** - * Gets alive remote server nodes with at least one cache configured. - * * @param topVer Topology version (maximum allowed node order). - * @return Collection of alive cache nodes. + * @return Oldest alive server nodes with at least one cache configured. */ - public Collection<ClusterNode> aliveRemoteServerNodesWithCaches(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).aliveRemoteServerNodesWithCaches(topVer.topologyVersion()); - } + @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) { + DiscoCache cache = resolveDiscoCache(null, topVer); - /** - * Gets alive server nodes with at least one cache configured. - * - * @param topVer Topology version (maximum allowed node order). - * @return Collection of alive cache nodes. - */ - public Collection<ClusterNode> aliveServerNodesWithCaches(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).aliveServerNodesWithCaches(topVer.topologyVersion()); - } + Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry(); - /** - * Gets alive nodes with at least one cache configured. - * - * @param topVer Topology version (maximum allowed node order). - * @return Collection of alive cache nodes. - */ - public Collection<ClusterNode> aliveNodesWithCaches(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).aliveNodesWithCaches(topVer.topologyVersion()); + return e != null ? e.getKey() : null; } /** @@ -2580,19 +2551,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes; /** - * Cached alive remote nodes with caches. - */ - private final Collection<ClusterNode> aliveNodesWithCaches; - - /** * Cached alive server remote nodes with caches. */ - private final Collection<ClusterNode> aliveSrvNodesWithCaches; - - /** - * Cached alive remote server nodes with caches. - */ - private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches; + private final ConcurrentSkipListMap<ClusterNode, Boolean> aliveSrvNodesWithCaches; /** * @param loc Local node. @@ -2625,9 +2586,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); - aliveNodesWithCaches = new ConcurrentSkipListSet<>(); - aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>(); - aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>(); + aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE); nodesByVer = new TreeMap<>(); long maxOrder0 = 0; @@ -2681,18 +2640,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - if (hasCaches) { - if (alive(node.id())) { - aliveNodesWithCaches.add(node); - - if (!CU.clientNode(node)) { - aliveSrvNodesWithCaches.add(node); - - if (!loc.id().equals(node.id())) - aliveRmtSrvNodesWithCaches.add(node); - } - } - } + if (hasCaches && alive(node.id()) && !CU.clientNode(node)) + aliveSrvNodesWithCaches.put(node, Boolean.TRUE); IgniteProductVersion nodeVer = U.productVersion(node); @@ -2821,17 +2770,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets all remote nodes that have cache with given name. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, rmtCacheNodes.get(cacheName)); - } - - /** * Gets all remote nodes that have at least one cache configured. * * @param topVer Topology version. @@ -2876,36 +2814,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets all alive remote server nodes with at least one cache configured. - * - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) { - return filter(topVer, aliveRmtSrvNodesWithCaches); - } - - /** - * Gets all alive server nodes with at least one cache configured. - * - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) { - return filter(topVer, aliveSrvNodesWithCaches); - } - - /** - * Gets all alive remote nodes with at least one cache configured. - * - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> aliveNodesWithCaches(final long topVer) { - return filter(topVer, aliveNodesWithCaches); - } - - /** * Checks if cache with given name has at least one node with near cache enabled. * * @param cacheName Cache name. @@ -2928,9 +2836,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { filterNodeMap(aliveRmtCacheNodes, leftNode); - aliveNodesWithCaches.remove(leftNode); aliveSrvNodesWithCaches.remove(leftNode); - aliveRmtSrvNodesWithCaches.remove(leftNode); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 503b334..7a24aa1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -740,7 +740,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * Partition refresh callback. */ private void refreshPartitions() { - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest == null) { if (log.isDebugEnabled()) @@ -1224,7 +1224,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top = cacheCtx.topology(); if (top != null) - updated |= top.update(null, entry.getValue(), null) != null; + updated |= top.update(null, entry.getValue(), null); } if (!cctx.kernalContext().clientNode() && updated) @@ -1273,7 +1273,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top = cacheCtx.topology(); if (top != null) { - updated |= top.update(null, entry.getValue(), null) != null; + updated |= top.update(null, entry.getValue(), null, true); cctx.affinity().checkRebalanceState(top, cacheId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 90e428c..d32f4c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -490,23 +490,6 @@ public class GridCacheUtils { } /** - * Gets oldest alive server node with at least one cache configured for specified topology version. - * - * @param ctx Context. - * @param topVer Maximum allowed topology version. - * @return Oldest alive cache server node. - */ - @Nullable public static ClusterNode oldestAliveCacheServerNode(GridCacheSharedContext ctx, - AffinityTopologyVersion topVer) { - Collection<ClusterNode> nodes = ctx.discovery().aliveServerNodesWithCaches(topVer); - - if (nodes.isEmpty()) - return null; - - return oldest(nodes); - } - - /** * @param nodes Nodes. * @return Oldest node for the given topology version. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 568a4da..1d60c42 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -304,8 +304,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm assert !metaDataCache.context().affinityNode(); while (true) { - ClusterNode oldestSrvNode = - CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE); + ClusterNode oldestSrvNode = ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldestSrvNode == null) break; http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 5efb317..816132d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -271,7 +271,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); assert oldest != null; @@ -536,7 +536,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { try { assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + - ", locNodeId=" + cctx.localNodeId() + ", gridName=" + cctx.gridName() + ']'; + ", locNodeId=" + cctx.localNodeId() + + ", gridName=" + cctx.gridName() + ']'; GridDhtPartitionFullMap m = node2part; @@ -549,7 +550,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, Map<Integer, Long> cntrMap) { if (log.isDebugEnabled()) @@ -563,7 +564,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ']'); - return null; + return false; } if (node2part != null && node2part.compareTo(partMap) >= 0) { @@ -571,7 +572,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']'); - return null; + return false; } updateSeq.incrementAndGet(); @@ -634,7 +635,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Partition map after full update: " + fullMapString()); - return null; + return false; } finally { lock.writeLock().unlock(); @@ -642,10 +643,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, - Map<Integer, Long> cntrMap) { + Map<Integer, Long> cntrMap, + boolean checkEvictions) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@ -654,29 +655,27 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + ", parts=" + parts + ']'); - return null; + return false; } lock.writeLock().lock(); try { if (stopping) - return null; + return false; if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { if (log.isDebugEnabled()) log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ']'); - return null; + return false; } if (exchId != null) lastExchangeId = exchId; if (node2part == null) { - U.dumpStack(log, "Created invalid: " + node2part); - // Create invalid partition map. node2part = new GridDhtPartitionFullMap(); } @@ -688,43 +687,45 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId + ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']'); - return null; + return false; } long updateSeq = this.updateSeq.incrementAndGet(); - node2part = new GridDhtPartitionFullMap(node2part, updateSeq); - - boolean changed = false; + node2part.updateSequence(updateSeq); - if (cur == null || !cur.equals(parts)) - changed = true; + boolean changed = cur == null || !cur.equals(parts); - node2part.put(parts.nodeId(), parts); + if (changed) { + node2part.put(parts.nodeId(), parts); - part2node = new HashMap<>(part2node); + // Add new mappings. + for (Integer p : parts.keySet()) { + Set<UUID> ids = part2node.get(p); - // Add new mappings. - for (Integer p : parts.keySet()) { - Set<UUID> ids = part2node.get(p); + if (ids == null) + // Initialize HashSet to size 3 in anticipation that there won't be + // more than 3 nodes per partition. + part2node.put(p, ids = U.newHashSet(3)); - if (ids == null) - // Initialize HashSet to size 3 in anticipation that there won't be - // more than 3 nodes per partition. - part2node.put(p, ids = U.newHashSet(3)); + ids.add(parts.nodeId()); + } - changed |= ids.add(parts.nodeId()); - } + // Remove obsolete mappings. + if (cur != null) { + for (Integer p : cur.keySet()) { + if (parts.containsKey(p)) + continue; - // Remove obsolete mappings. - if (cur != null) { - for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { - Set<UUID> ids = part2node.get(p); + Set<UUID> ids = part2node.get(p); - if (ids != null) - changed |= ids.remove(parts.nodeId()); + if (ids != null) + ids.remove(parts.nodeId()); + } } } + else + cur.updateSequence(parts.updateSequence(), parts.topologyVersion()); if (cntrMap != null) { for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { @@ -740,13 +741,18 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Partition map after single update: " + fullMapString()); - return changed ? localPartitionMap() : null; + return changed; } finally { lock.writeLock().unlock(); } } + /** {@inheritDoc} */ + @Override public void checkEvictions() { + // No-op. + } + /** * Updates value for single partition. * @@ -755,13 +761,12 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { * @param state State. * @param updateSeq Update sequence. */ - @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) { assert lock.isWriteLockedByCurrentThread(); assert nodeId.equals(cctx.localNodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); // If this node became the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -811,7 +816,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId != null; assert lock.writeLock().isHeldByCurrentThread(); - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); ClusterNode loc = cctx.localNode(); @@ -877,18 +882,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Nullable @Override public GridDhtPartitionMap2 partitions(UUID nodeId) { - lock.readLock().lock(); - - try { - return node2part.get(nodeId); - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ @Override public Map<Integer, Long> updateCounters(boolean skipZeros) { lock.readLock().lock(); @@ -919,6 +912,27 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public boolean hasMovingPartitions() { + lock.readLock().lock(); + + try { + assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + + ", locNodeId=" + cctx.localNodeId() + + ", gridName=" + cctx.gridName() + ']'; + + for (GridDhtPartitionMap2 map : node2part.values()) { + if (map.hasMovingPartitions()) + return true; + } + + return false; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 39a3e08..668a1cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -479,7 +479,6 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, if ((reservations & 0xFFFF) == 0 && casState(reservations, RENTING)) { shouldBeRenting = false; - if (log.isDebugEnabled()) log.debug("Moved partition to RENTING state: " + this); http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 0f75a5d..14ce1f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -195,6 +195,11 @@ public interface GridDhtPartitionTopology { public GridDhtPartitionFullMap partitionMap(boolean onlyActive); /** + * @return {@code True} If one of cache nodes has partitions in {@link GridDhtPartitionState#MOVING} state. + */ + public boolean hasMovingPartitions(); + + /** * @param e Entry removed from cache. */ public void onRemoved(GridDhtCacheEntry e); @@ -203,9 +208,9 @@ public interface GridDhtPartitionTopology { * @param exchId Exchange ID. * @param partMap Update partition map. * @param cntrMap Partition update counters. - * @return Local partition map if there were evictions or {@code null} otherwise. + * @return {@code True} if topology state changed. */ - public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, @Nullable Map<Integer, Long> cntrMap); @@ -213,11 +218,18 @@ public interface GridDhtPartitionTopology { * @param exchId Exchange ID. * @param parts Partitions. * @param cntrMap Partition update counters. - * @return Local partition map if there were evictions or {@code null} otherwise. + * @param checkEvictions Check evictions flag. + * @return {@code True} if topology state changed. */ - @Nullable public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, - @Nullable Map<Integer, Long> cntrMap); + @Nullable Map<Integer, Long> cntrMap, + boolean checkEvictions); + + /** + * + */ + public void checkEvictions(); /** * @param skipZeros If {@code true} then filters out zero counters. @@ -238,12 +250,6 @@ public interface GridDhtPartitionTopology { public void onEvicted(GridDhtLocalPartition part, boolean updateSeq); /** - * @param nodeId Node to get partitions for. - * @return Partitions for node. - */ - @Nullable public GridDhtPartitionMap2 partitions(UUID nodeId); - - /** * Prints memory stats. * * @param threshold Threshold for number of entries. http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index ab573bd..1b4dcc9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -44,7 +44,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; -import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -330,8 +329,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void initPartitions( - GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException { + @Override public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) + throws IgniteInterruptedCheckedException + { U.writeLock(lock); try { @@ -356,9 +356,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { ClusterNode loc = cctx.localNode(); - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); - - assert oldest != null || cctx.kernalContext().clientNode(); + ClusterNode oldest = currentCoordinator(); GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); @@ -397,7 +395,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Owned partition for oldest node: " + locPart); - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateSeq = updateLocal(p, locPart.state(), updateSeq); } } } @@ -419,7 +417,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (state.active()) { locPart.rent(false); - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateSeq = updateLocal(p, locPart.state(), updateSeq); if (log.isDebugEnabled()) log.debug("Evicting partition with rebalancing disabled " + @@ -443,8 +441,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param updateSeq Update sequence. */ private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) { - ClusterNode loc = cctx.localNode(); - int num = cctx.affinity().partitions(); for (int p = 0; p < num; p++) { @@ -454,7 +450,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // will be created in MOVING state. GridDhtLocalPartition locPart = createPartition(p); - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateSeq = updateLocal(p, locPart.state(), updateSeq); } } // If this node's map is empty, we pre-create local partitions, @@ -485,10 +481,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (exchId.isLeft()) removeNode(exchId.nodeId()); - // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); - - assert oldest != null || cctx.kernalContext().clientNode(); + ClusterNode oldest = currentCoordinator(); if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); @@ -548,8 +541,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { boolean changed = waitForRent(); - ClusterNode loc = cctx.localNode(); - int num = cctx.affinity().partitions(); AffinityTopologyVersion topVer = exchFut.topologyVersion(); @@ -600,7 +591,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" + locPart + ']'; - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateSeq = updateLocal(p, locPart.state(), updateSeq); changed = true; @@ -620,7 +611,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { locPart + ", owners = " + owners + ']'); } else - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateSeq = updateLocal(p, locPart.state(), updateSeq); } } else { @@ -630,7 +621,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (state == MOVING) { locPart.rent(false); - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateSeq = updateLocal(p, locPart.state(), updateSeq); changed = true; @@ -803,8 +794,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { map.put(i, part.state()); } - return new GridDhtPartitionMap2(cctx.nodeId(), updateSeq.get(), topVer, - Collections.unmodifiableMap(map), true); + return new GridDhtPartitionMap2(cctx.nodeId(), + updateSeq.get(), + topVer, + Collections.unmodifiableMap(map), + true); } finally { lock.readLock().unlock(); @@ -985,7 +979,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, @Nullable Map<Integer, Long> cntrMap) { if (log.isDebugEnabled()) @@ -997,7 +991,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { if (stopping) - return null; + return false; if (cntrMap != null) { for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { @@ -1025,7 +1019,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ']'); - return null; + return false; } if (node2part != null && node2part.compareTo(partMap) >= 0) { @@ -1033,7 +1027,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']'); - return null; + return false; } long updateSeq = this.updateSeq.incrementAndGet(); @@ -1076,7 +1070,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { node2part = partMap; - Map<Integer, Set<UUID>> p2n = new HashMap<>(cctx.affinity().partitions(), 1.0f); + Map<Integer, Set<UUID>> p2n = U.newHashMap(cctx.affinity().partitions()); for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) { for (Integer p : e.getValue().keySet()) { @@ -1110,7 +1104,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Partition map after full update: " + fullMapString()); - return changed ? localPartitionMap() : null; + return changed; } finally { lock.writeLock().unlock(); @@ -1118,10 +1112,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, - @Nullable Map<Integer, Long> cntrMap) { + @Nullable Map<Integer, Long> cntrMap, + boolean checkEvictions) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@ -1130,33 +1124,28 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + ", parts=" + parts + ']'); - return null; + return false; } lock.writeLock().lock(); try { if (stopping) - return null; + return false; if (cntrMap != null) { for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { - Long cntr = this.cntrMap.get(e.getKey()); + Integer p = e.getKey(); - if (cntr == null || cntr < e.getValue()) - this.cntrMap.put(e.getKey(), e.getValue()); - } - - for (int i = 0; i < locParts.length(); i++) { - GridDhtLocalPartition part = locParts.get(i); + Long cntr = this.cntrMap.get(p); - if (part == null) - continue; + if (cntr == null || cntr < e.getValue()) + this.cntrMap.put(p, e.getValue()); - Long cntr = cntrMap.get(part.id()); + GridDhtLocalPartition part = locParts.get(p); - if (cntr != null) - part.updateCounter(cntr); + if (part != null) + part.updateCounter(e.getValue()); } } @@ -1165,7 +1154,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ']'); - return null; + return false; } if (exchId != null) @@ -1182,60 +1171,91 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId + ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']'); - return null; + return false; } long updateSeq = this.updateSeq.incrementAndGet(); - node2part = new GridDhtPartitionFullMap(node2part, updateSeq); - - boolean changed = false; + node2part.newUpdateSequence(updateSeq); - if (cur == null || !cur.equals(parts)) - changed = true; + boolean changed = cur == null || !cur.equals(parts); - node2part.put(parts.nodeId(), parts); + if (changed) { + node2part.put(parts.nodeId(), parts); - part2node = new HashMap<>(part2node); + // Add new mappings. + for (Integer p : parts.keySet()) { + Set<UUID> ids = part2node.get(p); - // Add new mappings. - for (Integer p : parts.keySet()) { - Set<UUID> ids = part2node.get(p); + if (ids == null) + // Initialize HashSet to size 3 in anticipation that there won't be + // more than 3 nodes per partition. + part2node.put(p, ids = U.newHashSet(3)); - if (ids == null) - // Initialize HashSet to size 3 in anticipation that there won't be - // more than 3 nodes per partition. - part2node.put(p, ids = U.newHashSet(3)); + ids.add(parts.nodeId()); + } - changed |= ids.add(parts.nodeId()); - } + // Remove obsolete mappings. + if (cur != null) { + for (Integer p : cur.keySet()) { + if (parts.containsKey(p)) + continue; - // Remove obsolete mappings. - if (cur != null) { - for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { - Set<UUID> ids = part2node.get(p); + Set<UUID> ids = part2node.get(p); - if (ids != null) - changed |= ids.remove(parts.nodeId()); + if (ids != null) + ids.remove(parts.nodeId()); + } } } + else + cur.updateSequence(parts.updateSequence(), parts.topologyVersion()); - AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); - - if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { - List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); - - changed |= checkEvictions(updateSeq, aff); - - updateRebalanceVersion(aff); - } + if (checkEvictions) + changed |= checkEvictions(updateSeq); consistencyCheck(); if (log.isDebugEnabled()) log.debug("Partition map after single update: " + fullMapString()); - return changed ? localPartitionMap() : null; + return changed; + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * @param updateSeq Update sequence. + * @return {@code True} if state changed. + */ + private boolean checkEvictions(long updateSeq) { + AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + + boolean changed = false; + + if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { + List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); + + changed = checkEvictions(updateSeq, aff); + + updateRebalanceVersion(aff); + } + + return changed; + } + + /** {@inheritDoc} */ + @Override public void checkEvictions() { + lock.writeLock().lock(); + + try { + long updateSeq = this.updateSeq.incrementAndGet(); + + node2part.newUpdateSequence(updateSeq); + + checkEvictions(updateSeq); } finally { lock.writeLock().unlock(); @@ -1270,7 +1290,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (nodeIds.containsAll(F.nodeIds(affNodes))) { part.rent(false); - updateLocal(part.id(), locId, part.state(), updateSeq); + updateSeq = updateLocal(part.id(), part.state(), updateSeq); changed = true; @@ -1295,7 +1315,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (locId.equals(n.id())) { part.rent(false); - updateLocal(part.id(), locId, part.state(), updateSeq); + updateSeq = updateLocal(part.id(), part.state(), updateSeq); changed = true; @@ -1316,19 +1336,27 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** + * @return Current coordinator node. + */ + @Nullable private ClusterNode currentCoordinator() { + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + + assert oldest != null || cctx.kernalContext().clientNode(); + + return oldest; + } + + /** * Updates value for single partition. * * @param p Partition. - * @param nodeId Node ID. * @param state State. * @param updateSeq Update sequence. + * @return Update sequence. */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) { - assert nodeId.equals(cctx.nodeId()); - - // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) { + ClusterNode oldest = currentCoordinator(); assert oldest != null || cctx.kernalContext().clientNode(); @@ -1338,12 +1366,16 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (seq != updateSeq) { if (seq > updateSeq) { - if (this.updateSeq.get() < seq) { + long seq0 = this.updateSeq.get(); + + if (seq0 < seq) { // Update global counter if necessary. - boolean b = this.updateSeq.compareAndSet(this.updateSeq.get(), seq + 1); + boolean b = this.updateSeq.compareAndSet(seq0, seq + 1); - assert b : "Invalid update sequence [updateSeq=" + updateSeq + ", seq=" + seq + - ", curUpdateSeq=" + this.updateSeq.get() + ", node2part=" + node2part.toFullString() + ']'; + assert b : "Invalid update sequence [updateSeq=" + updateSeq + + ", seq=" + seq + + ", curUpdateSeq=" + this.updateSeq.get() + + ", node2part=" + node2part.toFullString() + ']'; updateSeq = seq + 1; } @@ -1355,11 +1387,19 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - GridDhtPartitionMap2 map = node2part.get(nodeId); + UUID locNodeId = cctx.localNodeId(); + + GridDhtPartitionMap2 map = node2part.get(locNodeId); - if (map == null) - node2part.put(nodeId, map = new GridDhtPartitionMap2(nodeId, updateSeq, topVer, - Collections.<Integer, GridDhtPartitionState>emptyMap(), false)); + if (map == null) { + map = new GridDhtPartitionMap2(locNodeId, + updateSeq, + topVer, + Collections.<Integer, GridDhtPartitionState>emptyMap(), + false); + + node2part.put(locNodeId, map); + } map.updateSequence(updateSeq, topVer); @@ -1370,7 +1410,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (ids == null) part2node.put(p, ids = U.newHashSet(3)); - ids.add(nodeId); + ids.add(locNodeId); + + return updateSeq; } /** @@ -1395,8 +1437,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { else node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence()); - part2node = new HashMap<>(part2node); - GridDhtPartitionMap2 parts = node2part.remove(nodeId); if (parts != null) { @@ -1418,13 +1458,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public boolean own(GridDhtLocalPartition part) { - ClusterNode loc = cctx.localNode(); - lock.writeLock().lock(); try { if (part.own()) { - updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet()); + updateLocal(part.id(), part.state(), updateSeq.incrementAndGet()); consistencyCheck(); @@ -1452,7 +1490,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get(); - updateLocal(part.id(), cctx.localNodeId(), part.state(), seq); + updateLocal(part.id(), part.state(), seq); consistencyCheck(); } @@ -1462,18 +1500,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Nullable @Override public GridDhtPartitionMap2 partitions(UUID nodeId) { - lock.readLock().lock(); - - try { - return node2part.get(nodeId); - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ @Override public Map<Integer, Long> updateCounters(boolean skipZeros) { lock.readLock().lock(); @@ -1526,6 +1552,30 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public boolean hasMovingPartitions() { + lock.readLock().lock(); + + try { + assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + + ", cache=" + cctx.name() + + ", started=" + cctx.started() + + ", stopping=" + stopping + + ", locNodeId=" + cctx.localNode().id() + + ", locName=" + cctx.gridName() + ']'; + + for (GridDhtPartitionMap2 map : node2part.values()) { + if (map.hasMovingPartitions()) + return true; + } + + return false; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']'); @@ -1607,10 +1657,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (state == match) return true; - if (matches != null && matches.length > 0) - for (GridDhtPartitionState s : matches) + if (matches != null && matches.length > 0) { + for (GridDhtPartitionState s : matches) { if (state == s) return true; + } + } } return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java index 8f5ad17..e8860f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java @@ -103,10 +103,13 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2> for (Map.Entry<UUID, GridDhtPartitionMap2> e : m.entrySet()) { GridDhtPartitionMap2 part = e.getValue(); - if (onlyActive) - put(e.getKey(), new GridDhtPartitionMap2(part.nodeId(), part.updateSequence(), part.topologyVersion(), part.map(), true)); - else - put(e.getKey(), part); + GridDhtPartitionMap2 cpy = new GridDhtPartitionMap2(part.nodeId(), + part.updateSequence(), + part.topologyVersion(), + part.map(), + onlyActive); + + put(e.getKey(), cpy); } } @@ -177,6 +180,13 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2> /** * @param updateSeq New update sequence value. + */ + public void newUpdateSequence(long updateSeq) { + this.updateSeq = updateSeq; + } + + /** + * @param updateSeq New update sequence value. * @return Old update sequence value. */ public long updateSequence(long updateSeq) { http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index f391265..e945de9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -112,6 +112,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** */ @GridToStringExclude + private int pendingSingleUpdates; + + /** */ + @GridToStringExclude private List<ClusterNode> srvNodes; /** */ @@ -1162,13 +1166,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) { boolean allReceived = false; + boolean updateSingleMap = false; synchronized (mux) { assert crd != null; if (crd.isLocal()) { if (remaining.remove(node.id())) { - updatePartitionSingleMap(msg); + updateSingleMap = true; + + pendingSingleUpdates++; allReceived = remaining.isEmpty(); } @@ -1177,8 +1184,42 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT singleMsgs.put(node, msg); } - if (allReceived) + if (updateSingleMap) { + try { + updatePartitionSingleMap(msg); + } + finally { + synchronized (mux) { + assert pendingSingleUpdates > 0; + + pendingSingleUpdates--; + + if (pendingSingleUpdates == 0) + mux.notifyAll(); + } + } + } + + if (allReceived) { + awaitSingleMapUpdates(); + onAllReceived(false); + } + } + + /** + * + */ + private void awaitSingleMapUpdates() { + synchronized (mux) { + try { + while (pendingSingleUpdates > 0) + U.wait(mux); + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e); + } + } } /** @@ -1218,6 +1259,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (!cacheCtx.isLocal()) + cacheCtx.topology().checkEvictions(); + } + updateLastVersion(cctx.versions().last()); cctx.versions().onExchange(lastVer.get().order()); @@ -1374,7 +1420,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (cacheCtx != null) cacheCtx.topology().update(exchId, entry.getValue(), cntrMap); else { - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest != null && oldest.isLocal()) cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap); @@ -1395,7 +1441,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : cctx.exchange().clientTopology(cacheId, this); - top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId)); + top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId), false); } } @@ -1557,6 +1603,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (crd0.isLocal()) { if (allReceived) { + awaitSingleMapUpdates(); + onAllReceived(true); return; http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index fbd8ce5..cf69264 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -679,14 +679,14 @@ public class IgniteTxHandler { * @param req Request. * @return Future. */ - @Nullable public IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest(UUID nodeId, + @Nullable private IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest req) { if (txFinishMsgLog.isDebugEnabled()) txFinishMsgLog.debug("Received near finish request [txId=" + req.version() + ", node=" + nodeId + ']'); IgniteInternalFuture<IgniteInternalTx> fut = finish(nodeId, null, req); - assert req.txState() != null || fut.error() != null || + assert req.txState() != null || (fut != null && fut.error() != null) || (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null); return fut; http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 6c26363..b9b92b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -1265,7 +1265,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { try { if (!cache.context().affinityNode()) { ClusterNode oldestSrvNode = - CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE); + ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldestSrvNode == null) return new GridEmptyIterator<>(); @@ -1589,7 +1589,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { depExe.submit(new BusyRunnable() { @Override public void run0() { - ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer); + ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer); if (oldest != null && oldest.isLocal()) { final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 8814745..a660ec8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -129,6 +129,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -174,8 +175,7 @@ class ServerImpl extends TcpDiscoveryImpl { IgniteProductVersion.fromString("1.5.0"); /** */ - private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>()); + private IgniteThreadPoolExecutor utilityPool; /** Nodes ring. */ @GridToStringExclude @@ -297,6 +297,13 @@ class ServerImpl extends TcpDiscoveryImpl { spiState = DISCONNECTED; } + utilityPool = new IgniteThreadPoolExecutor("disco-pool", + spi.ignite().name(), + 0, + 1, + 2000, + new LinkedBlockingQueue<Runnable>()); + if (debugMode) { if (!log.isInfoEnabled()) throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " + @@ -2403,9 +2410,12 @@ class ServerImpl extends TcpDiscoveryImpl { /** Connection check threshold. */ private long connCheckThreshold; + /** */ + private long lastRingMsgTime; + /** */ - protected RingMessageWorker() { + RingMessageWorker() { super("tcp-disco-msg-worker", 10); initConnectionCheckFrequency(); @@ -2500,6 +2510,8 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message to process. */ @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + sendHeartbeatMessage(); + DebugLogger log = messageLogger(msg); if (log.isDebugEnabled()) @@ -2508,6 +2520,11 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode) debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); + boolean ensured = spi.ensured(msg); + + if (!locNode.id().equals(msg.senderNodeId()) && ensured) + lastRingMsgTime = U.currentTimeMillis(); + if (locNode.internalOrder() == 0) { boolean proc = false; @@ -2564,7 +2581,7 @@ class ServerImpl extends TcpDiscoveryImpl { else assert false : "Unknown message type: " + msg.getClass().getSimpleName(); - if (spi.ensured(msg) && redirectToClients(msg)) + if (ensured && redirectToClients(msg)) msgHist.add(msg); if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) { @@ -5336,12 +5353,9 @@ class ServerImpl extends TcpDiscoveryImpl { * Sends heartbeat message if needed. */ private void sendHeartbeatMessage() { - if (!isLocalNodeCoordinator()) - return; - long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis(); - if (elapsed > 0) + if (elapsed > 0 || !isLocalNodeCoordinator()) return; TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId()); @@ -5361,7 +5375,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (lastTimeStatusMsgSent < locNode.lastUpdateTime()) lastTimeStatusMsgSent = locNode.lastUpdateTime(); - long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis(); + long updateTime = Math.max(lastTimeStatusMsgSent, lastRingMsgTime); + + long elapsed = (updateTime + hbCheckFreq) - U.currentTimeMillis(); if (elapsed > 0) return; @@ -6062,11 +6078,11 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState state = spiStateCopy(); - long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : + long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout(); if (state == CONNECTED) { - spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); if (log.isDebugEnabled()) log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']'); @@ -6103,7 +6119,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Local node is stopping. Remote node should try next one. res = RES_CONTINUE_JOIN; - spi.writeToSocket(msg, sock, res, socketTimeout); + spi.writeToSocket(msg, sock, res, sockTimeout); if (log.isDebugEnabled()) log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java index 9e73632..c790644 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java @@ -28,6 +28,8 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; @@ -317,7 +319,9 @@ public class TcpDiscoveryStatistics { assert time >= 0 : time; if (crdSinceTs.get() > 0 && + (msg instanceof TcpDiscoveryCustomEventMessage) || (msg instanceof TcpDiscoveryNodeAddedMessage) || + (msg instanceof TcpDiscoveryNodeAddFinishedMessage) || (msg instanceof TcpDiscoveryNodeLeftMessage) || (msg instanceof TcpDiscoveryNodeFailedMessage)) { ringMsgsSndTs.put(msg.id(), U.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java index 878d7d1..43017db 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java @@ -104,6 +104,7 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac } /** + * @param backups Number of backups. * @throws Exception If failed. */ protected void checkNodeRemoved(int backups) throws Exception { @@ -247,7 +248,6 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac } } - /** * @param assignment Assignment to verify. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java index 390c83e..31b4bc7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java @@ -239,7 +239,7 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe GridCacheSharedContext<?, ?> ctx = k.context().cache().context(); ClusterNode oldest = - GridCacheUtils.oldestAliveCacheServerNode(ctx, new AffinityTopologyVersion(currVer)); + ctx.discovery().oldestAliveCacheServerNode(new AffinityTopologyVersion(currVer)); assertNotNull(oldest); http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java index a59ca8b..2d46cf4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java @@ -76,7 +76,12 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size()); for (int part = 0; part < aff.getPartitions(); part++) { - Collection<ClusterNode> affNodes = aff.assignPartition(part, new ArrayList(nodes), 0, null); + Collection<ClusterNode> affNodes = aff.assignPartition(null, + part, + new ArrayList<>(nodes), + new HashMap<ClusterNode, byte[]>(), + 0, + null); assertEquals(1, affNodes.size());
