ignite-5068 : WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cc528344 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cc528344 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cc528344 Branch: refs/heads/ignite-5267-merge-ea Commit: cc528344169ab89453ad376a3ee8ff2b56a7513f Parents: c67dc6f Author: Ilya Lantukh <ilant...@gridgain.com> Authored: Fri Apr 28 15:48:30 2017 +0300 Committer: Ilya Lantukh <ilant...@gridgain.com> Committed: Fri Apr 28 15:48:30 2017 +0300 ---------------------------------------------------------------------- .../dht/GridDhtPartitionTopologyImpl.java | 68 ++++++++++---------- .../GridDhtPartitionsExchangeFuture.java | 13 ++-- 2 files changed, 38 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cc528344/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 898cb55..6d776d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -987,7 +987,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh HashSet<UUID> affIds = affAssignment.getIds(p); for (UUID nodeId : diffIds) { -// assert !affIds.contains(nodeId); + assert !affIds.contains(nodeId); if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) { ClusterNode n = cctx.discovery().node(nodeId); @@ -1018,44 +1018,46 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (node2part == null) return; -// for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { -// UUID nodeId = e.getKey(); -// -// for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { -// int p0 = e0.getKey(); -// -// GridDhtPartitionState state = e0.getValue(); -// -// Set<UUID> ids = diffFromAffinity.get(p0); -// -// if ((state == MOVING || state == OWNING || state == RENTING) && !affAssignment.getIds(p0).contains(nodeId)) { -// if (ids == null) -// diffFromAffinity.put(p0, ids = U.newHashSet(3)); -// -// ids.add(nodeId); -// } -// else { -// if (ids != null) -// ids.remove(nodeId); -// } -// } -// } + diffFromAffinity.clear(); - Collection<UUID> affNodes = F.nodeIds(cctx.discovery().cacheAffinityNodes(cctx.cacheId(), affAssignment.topologyVersion())); + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + UUID nodeId = e.getKey(); - for (Map.Entry<Integer, Set<UUID>> e : diffFromAffinity.entrySet()) { - int p = e.getKey(); + for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { + int p0 = e0.getKey(); - Iterator<UUID> iter = e.getValue().iterator(); + GridDhtPartitionState state = e0.getValue(); - while (iter.hasNext()) { - UUID nodeId = iter.next(); + Set<UUID> ids = diffFromAffinity.get(p0); - if (!affNodes.contains(nodeId) || affAssignment.getIds(p).contains(nodeId)) - iter.remove(); + if ((state == MOVING || state == OWNING || state == RENTING) && !affAssignment.getIds(p0).contains(nodeId)) { + if (ids == null) + diffFromAffinity.put(p0, ids = U.newHashSet(3)); + + ids.add(nodeId); + } + else { + if (ids != null) + ids.remove(nodeId); + } } } +// Collection<UUID> affNodes = F.nodeIds(cctx.discovery().cacheAffinityNodes(cctx.cacheId(), affAssignment.topologyVersion())); +// +// for (Map.Entry<Integer, Set<UUID>> e : diffFromAffinity.entrySet()) { +// int p = e.getKey(); +// +// Iterator<UUID> iter = e.getValue().iterator(); +// +// while (iter.hasNext()) { +// UUID nodeId = iter.next(); +// +// if (!affNodes.contains(nodeId) || affAssignment.getIds(p).contains(nodeId)) +// iter.remove(); +// } +// } + diffFromAffinityVer = affAssignment.topologyVersion(); } @@ -1433,7 +1435,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); - if (diffFromAffinityVer.compareTo(affVer) <= 0 && false) { + if (exchId == null) { AffinityAssignment affAssignment = cctx.affinity().assignment(affVer); int diffFromAffinitySize = 0; @@ -1484,8 +1486,6 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } } - diffFromAffinityVer = affVer; - if (diffFromAffinitySize > 0) U.error(log, "??? S diffFromAffinitySize=" + diffFromAffinitySize + " [exchId=" + exchId + ",cacheId=" + cctx.cacheId() + ",cacheName=" + cctx.name() + "]"); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cc528344/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 328f730..9eb7e50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1216,18 +1216,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cctx.kernalContext().state().onExchangeDone(); if (err == null && realExchange) { -// if (discoEvt.type() == EVT_NODE_JOINED || discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED || (discoEvt instanceof DiscoveryCustomEvent && ((DiscoveryCustomEvent)discoEvt).customMessage() instanceof CacheAffinityChangeMessage)) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) - continue; +// if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED || (discoEvt instanceof DiscoveryCustomEvent && ((DiscoveryCustomEvent)discoEvt).customMessage() instanceof CacheAffinityChangeMessage)) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; - try { cacheCtx.topology().onAffinityInitialized(cacheCtx.affinity().assignment(topologyVersion())); } - catch (Exception e) { - System.out.println("???"); - } - } // } }