http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 6634e98..e942b5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -549,7 +549,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { cntrMap.clear(); // If this is the oldest node. - if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) { + if (oldest != null && (loc.equals(oldest) || grp.localStartVersion().equals(exchId.topologyVersion()))) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); @@ -598,6 +598,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { waitForRent(); } + private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) { + return grp.affinity().nodes(p, topVer).contains(ctx.localNode()); + } + /** {@inheritDoc} */ @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { treatAllPartAsLoc = false; @@ -631,7 +635,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { for (int p = 0; p < num; p++) { GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); - if (grp.affinity().partitionLocalNode(p, topVer)) { + if (partitionLocalNode(p, topVer)) { // This partition will be created during next topology event, // which obviously has not happened at this point. if (locPart == null) { @@ -724,11 +728,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtLocalPartition loc = locParts.get(p); if (loc == null || loc.state() == EVICTED) { - locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory)); + locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, entryFactory)); - if (cctx.shared().pageStore() != null) { + if (ctx.pageStore() != null) { try { - cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p); + // TODO IGNITE-5075. + ctx.pageStore().onPartitionCreated(grp.groupId(), p); } catch (IgniteCheckedException e) { // TODO ignite-db @@ -758,7 +763,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionState state = loc != null ? loc.state() : null; - if (loc != null && state != EVICTED && (state != RENTING || !cctx.allowFastEviction())) + if (loc != null && state != EVICTED && (state != RENTING || !grp.allowFastEviction())) return loc; if (!create) @@ -773,7 +778,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { state = loc != null ? loc.state() : null; - boolean belongs = cctx.affinity().partitionLocalNode(p, topVer); + boolean belongs = partitionLocalNode(p, topVer); if (loc != null && state == EVICTED) { locParts.set(p, loc = null); @@ -783,7 +788,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { "(often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); } - else if (loc != null && state == RENTING && cctx.allowFastEviction()) + else if (loc != null && state == RENTING && grp.allowFastEviction()) throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted."); if (loc == null) { @@ -792,7 +797,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); - locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory)); + locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, entryFactory)); if (updateSeq) this.updateSeq.incrementAndGet(); @@ -807,9 +812,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lock.writeLock().unlock(); } - if (created && cctx.shared().pageStore() != null) { + if (created && ctx.pageStore() != null) { try { - cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p); + // TODO IGNITE-5075. + ctx.pageStore().onPartitionCreated(grp.groupId(), p); } catch (IgniteCheckedException e) { // TODO ignite-db @@ -834,8 +840,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public GridDhtLocalPartition localPartition(Object key, boolean create) { - return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create); + @Override public GridDhtLocalPartition localPartition(int part) { + return locParts.get(part); } /** {@inheritDoc} */ @@ -891,7 +897,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { map.put(i, part.state()); } - return new GridDhtPartitionMap(cctx.nodeId(), + return new GridDhtPartitionMap(ctx.localNodeId(), updateSeq.get(), topVer, Collections.unmodifiableMap(map), @@ -931,7 +937,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { - AffinityAssignment affAssignment = cctx.affinity().assignment(topVer); + AffinityAssignment affAssignment = grp.affinity().cachedAffinity(topVer); List<ClusterNode> affNodes = affAssignment.get(p); @@ -954,8 +960,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer + ", topVer2=" + this.topVer + - ", node=" + cctx.igniteInstanceName() + - ", cache=" + cctx.name() + + ", node=" + ctx.igniteInstanceName() + + ", grp=" + grp.name() + ", node2part=" + node2part + ']'; List<ClusterNode> nodes = null; @@ -967,7 +973,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { HashSet<UUID> affIds = affAssignment.getIds(p); if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) { - ClusterNode n = cctx.discovery().node(nodeId); + ClusterNode n = ctx.discovery().node(nodeId); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { if (nodes == null) { @@ -1001,7 +1007,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionState state, GridDhtPartitionState... states) { Collection<UUID> allIds = topVer.topologyVersion() > 0 ? - F.nodeIds(discoCache.cacheGroupAffinityNodes(cctx.group().groupId())) : + F.nodeIds(discoCache.cacheGroupAffinityNodes(grp.groupId())) : null; lock.readLock().lock(); @@ -1010,7 +1016,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + ", allIds=" + allIds + ", node2part=" + node2part + - ", cache=" + cctx.name() + ']'; + ", grp=" + grp.name() + ']'; Collection<UUID> nodeIds = part2node.get(p); @@ -1027,7 +1033,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { continue; if (hasState(p, id, state, states)) { - ClusterNode n = cctx.discovery().node(id); + ClusterNode n = ctx.discovery().node(id); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) nodes.add(n); @@ -1043,7 +1049,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) { - if (!cctx.rebalanceEnabled()) + if (!grp.rebalanceEnabled()) return ownersAndMoving(p, topVer); return nodes(p, topVer, OWNING); @@ -1056,7 +1062,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public List<ClusterNode> moving(int p) { - if (!cctx.rebalanceEnabled()) + if (!grp.rebalanceEnabled()) return ownersAndMoving(p, AffinityTopologyVersion.NONE); return nodes(p, AffinityTopologyVersion.NONE, MOVING); @@ -1082,11 +1088,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + - ", cache=" + cctx.name() + - ", started=" + cctx.started() + + ", grp=" + grp.name() + ", stopping=" + stopping + - ", locNodeId=" + cctx.localNode().id() + - ", locName=" + cctx.igniteInstanceName() + ']'; + ", locNodeId=" + ctx.localNode().id() + + ", locName=" + ctx.igniteInstanceName() + ']'; GridDhtPartitionFullMap m = node2part; @@ -1168,7 +1173,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // then we keep the newer value. if (newPart != null && (newPart.updateSequence() < part.updateSequence() || - (cctx.cacheStartTopologyVersion().compareTo(newPart.topologyVersion()) > 0)) + (grp.groupStartVersion().compareTo(newPart.topologyVersion()) > 0)) ) { if (log.isDebugEnabled()) log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" + @@ -1182,7 +1187,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) { UUID nodeId = it.next(); - if (!cctx.discovery().alive(nodeId)) { + if (!ctx.discovery().alive(nodeId)) { if (log.isDebugEnabled()) log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" + partMap + ']'); @@ -1194,7 +1199,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { node2part = partMap; - Map<Integer, Set<UUID>> p2n = new HashMap<>(cctx.affinity().partitions(), 1.0f); + Map<Integer, Set<UUID>> p2n = new HashMap<>(grp.affinity().partitions(), 1.0f); for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { for (Integer p : e.getValue().keySet()) { @@ -1213,11 +1218,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { boolean changed = false; - AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion affVer = grp.affinity().lastVersion(); - GridDhtPartitionMap nodeMap = partMap.get(cctx.localNodeId()); + GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId()); - if (nodeMap != null && cctx.shared().database().persistenceEnabled()) { + if (nodeMap != null && ctx.database().persistenceEnabled()) { for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) { int p = e.getKey(); GridDhtPartitionState state = e.getValue(); @@ -1244,7 +1249,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { - List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); + List<List<ClusterNode>> aff = grp.affinity().assignments(topVer); changed |= checkEvictions(updateSeq, aff); @@ -1257,7 +1262,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Partition map after full update: " + fullMapString()); if (changed) - cctx.shared().exchange().scheduleResendPartitions(); + ctx.exchange().scheduleResendPartitions(); return changed ? localPartitionMap() : null; } @@ -1276,7 +1281,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); - if (!cctx.discovery().alive(parts.nodeId())) { + if (!ctx.discovery().alive(parts.nodeId())) { if (log.isDebugEnabled()) log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + ", parts=" + parts + ']'); @@ -1371,10 +1376,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion affVer = grp.affinity().lastVersion(); if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { - List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); + List<List<ClusterNode>> aff = grp.affinity().assignments(topVer); changed |= checkEvictions(updateSeq, aff); @@ -1387,7 +1392,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Partition map after single update: " + fullMapString()); if (changed) - cctx.shared().exchange().scheduleResendPartitions(); + ctx.exchange().scheduleResendPartitions(); return changed ? localPartitionMap() : null; } @@ -1401,7 +1406,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lock.writeLock().lock(); try { - int parts = cctx.affinity().partitions(); + int parts = grp.affinity().partitions(); Collection<Integer> lost = null; @@ -1435,7 +1440,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { boolean changed = false; if (lost != null) { - PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy(); + PartitionLossPolicy plc = grp.config().getPartitionLossPolicy(); assert plc != null; @@ -1467,13 +1472,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) - cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST, - discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); +// TODO: IGNITE-5075. +// if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) +// cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST, +// discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); } if (plc != PartitionLossPolicy.IGNORE) - cctx.needsRecovery(true); + grp.needsRecovery(true); } return changed; @@ -1488,7 +1494,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lock.writeLock().lock(); try { - int parts = cctx.affinity().partitions(); + int parts = grp.affinity().partitions(); long updSeq = updateSeq.incrementAndGet(); for (int part = 0; part < parts; part++) { @@ -1527,9 +1533,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - checkEvictions(updSeq, cctx.affinity().assignments(topVer)); + checkEvictions(updSeq, grp.affinity().assignments(topVer)); - cctx.needsRecovery(false); + grp.needsRecovery(false); } finally { lock.writeLock().unlock(); @@ -1543,7 +1549,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { Collection<Integer> res = null; - int parts = cctx.affinity().partitions(); + int parts = grp.affinity().partitions(); for (int part = 0; part < parts; part++) { Set<UUID> nodeIds = part2node.get(part); @@ -1580,7 +1586,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtLocalPartition locPart = locParts.get(p); if (locPart != null) { - if (locPart.state() == OWNING && !owners.contains(cctx.localNodeId())) + if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId())) locPart.moving(); } @@ -1605,12 +1611,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return {@code True} if state changed. */ private boolean checkEvictions(long updateSeq) { - AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion affVer = grp.affinity().lastVersion(); boolean changed = false; if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { - List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); + List<List<ClusterNode>> aff = grp.affinity().assignments(topVer); changed = checkEvictions(updateSeq, aff); @@ -1642,12 +1648,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return Checks if any of the local partitions need to be evicted. */ private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) { - if (!cctx.kernalContext().state().active()) + if (!ctx.kernalContext().state().active()) return false; boolean changed = false; - UUID locId = cctx.nodeId(); + UUID locId = ctx.localNodeId(); for (int p = 0; p < locParts.length(); p++) { GridDhtLocalPartition part = locParts.get(p); @@ -1660,7 +1666,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (state.active()) { List<ClusterNode> affNodes = aff.get(p); - if (!affNodes.contains(cctx.localNode())) { + if (!affNodes.contains(ctx.localNode())) { List<ClusterNode> nodes = nodes(p, topVer, OWNING); Collection<UUID> nodeIds = F.nodeIds(nodes); @@ -1723,10 +1729,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) { ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); - assert oldest != null || cctx.kernalContext().clientNode(); + assert oldest != null || ctx.kernalContext().clientNode(); // If this node became the oldest node. - if (cctx.localNode().equals(oldest)) { + if (ctx.localNode().equals(oldest)) { long seq = node2part.updateSequence(); if (seq != updateSeq) { @@ -1753,7 +1759,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } if (node2part != null) { - UUID locNodeId = cctx.localNodeId(); + UUID locNodeId = ctx.localNodeId(); GridDhtPartitionMap map = node2part.get(locNodeId); @@ -1790,9 +1796,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ClusterNode oldest = discoCache.oldestAliveServerNode(); - assert oldest != null || cctx.kernalContext().clientNode(); + assert oldest != null || ctx.kernalContext().clientNode(); - ClusterNode loc = cctx.localNode(); + ClusterNode loc = ctx.localNode(); if (node2part != null) { if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) { @@ -1937,11 +1943,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + - ", cache=" + cctx.name() + - ", started=" + cctx.started() + + ", grp=" + grp.name() + ", stopping=" + stopping + - ", locNodeId=" + cctx.localNode().id() + - ", locName=" + cctx.igniteInstanceName() + ']'; + ", locNodeId=" + ctx.localNodeId() + + ", locName=" + ctx.igniteInstanceName() + ']'; for (GridDhtPartitionMap map : node2part.values()) { if (map.hasMovingPartitions()) @@ -1957,8 +1962,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { - X.println(">>> Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() + - ", cache=" + cctx.name() + ']'); + X.println(">>> Cache partition topology stats [igniteInstanceName=" + ctx.igniteInstanceName() + + ", grp=" + grp.name() + ']'); lock.readLock().lock(); @@ -1986,7 +1991,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return {@code True} if given partition belongs to local node. */ private boolean localNode(int part, List<List<ClusterNode>> aff) { - return aff.get(part).contains(cctx.localNode()); + return aff.get(part).contains(ctx.localNode()); } /** @@ -1997,7 +2002,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (node2part == null || !node2part.valid()) return; - for (int i = 0; i < cctx.affinity().partitions(); i++) { + for (int i = 0; i < grp.affinity().partitions(); i++) { List<ClusterNode> affNodes = aff.get(i); // Topology doesn't contain server nodes (just clients). @@ -2013,7 +2018,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { rebalancedTopVer = topVer; if (log.isDebugEnabled()) - log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']'); + log.debug("Updated rebalanced version [cache=" + grp.name() + ", ver=" + rebalancedTopVer + ']'); } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index c91eb7a..9b61f14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -118,10 +118,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach @Override public void start() throws IgniteCheckedException { super.start(); - preldr = new GridDhtPreloader(ctx); - - preldr.start(); - ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { @Override public void apply(UUID nodeId, GridNearGetRequest req) { processNearGetRequest(nodeId, req); @@ -382,7 +378,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null : - ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion()); + ctx.group().preloader().request(ctx, req.keys(), req.topologyVersion()); if (keyFut == null || keyFut.isDone()) { if (keyFut != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 2292cb2..10ed584 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -899,7 +899,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte ) { assert keys != null; - IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); + IgniteInternalFuture<Object> keyFut = ctx.group().preloader().request(cacheCtx, keys, topVer); // Prevent embedded future creation if possible. if (keyFut == null || keyFut.isDone()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 75cbd00..505df91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -38,13 +38,16 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; +import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; @@ -78,7 +81,10 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD; @SuppressWarnings("NonConstantFieldWithUpperCaseName") public class GridDhtPartitionDemander { /** */ - private final GridCacheContext<?, ?> cctx; + private final GridCacheSharedContext<?, ?> ctx; + + /** */ + private final CacheGroupInfrastructure grp; /** */ private final IgniteLogger log; @@ -116,16 +122,18 @@ public class GridDhtPartitionDemander { private final AtomicBoolean stoppedEvtSent = new AtomicBoolean(); /** - * @param cctx Cctx. + * @param grp Ccahe group. */ - public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) { - assert cctx != null; + public GridDhtPartitionDemander(CacheGroupInfrastructure grp) { + assert grp != null; + + this.grp = grp; - this.cctx = cctx; + ctx = grp.shared(); - log = cctx.logger(getClass()); + log = ctx.logger(getClass()); - boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode(); + boolean enabled = grp.rebalanceEnabled() && !ctx.kernalContext().clientNode(); rebalanceFut = new RebalanceFuture();//Dummy. @@ -137,7 +145,7 @@ public class GridDhtPartitionDemander { Map<Integer, Object> tops = new HashMap<>(); - for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) + for (int idx = 0; idx < grp.shared().kernalContext().config().getRebalanceThreadPoolSize(); idx++) tops.put(idx, GridCachePartitionExchangeManager.rebalanceTopic(idx)); rebalanceTopics = tops; @@ -196,7 +204,7 @@ public class GridDhtPartitionDemander { GridTimeoutObject obj = lastTimeoutObj.getAndSet(null); if (obj != null) - cctx.time().removeTimeoutObject(obj); + ctx.time().removeTimeoutObject(obj); final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; @@ -208,7 +216,7 @@ public class GridDhtPartitionDemander { exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - IgniteInternalFuture<Boolean> fut0 = cctx.shared().exchange().forceRebalance(exchFut); + IgniteInternalFuture<Boolean> fut0 = ctx.exchange().forceRebalance(exchFut); fut0.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> future) { @@ -237,7 +245,7 @@ public class GridDhtPartitionDemander { */ private boolean topologyChanged(RebalanceFuture fut) { return - !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed. + !grp.affinity().lastVersion().equals(fut.topologyVersion()) || // Topology already changed. fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions. } @@ -249,7 +257,8 @@ public class GridDhtPartitionDemander { private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) { assert discoEvt != null; - cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); + // TODO IGNITE-5075. + // cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); } /** @@ -279,12 +288,12 @@ public class GridDhtPartitionDemander { assert force == (forcedRebFut != null); - long delay = cctx.config().getRebalanceDelay(); + long delay = grp.config().getRebalanceDelay(); if (delay == 0 || force) { final RebalanceFuture oldFut = rebalanceFut; - final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, startedEvtSent, stoppedEvtSent, cnt); + final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, startedEvtSent, stoppedEvtSent, cnt); if (!oldFut.isInitial()) oldFut.cancel(); @@ -330,7 +339,7 @@ public class GridDhtPartitionDemander { fut.onDone(true); - ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone(); + ((GridFutureAdapter)grp.preloader().syncFuture()).onDone(); fut.sendRebalanceFinishedEvent(); @@ -381,7 +390,7 @@ public class GridDhtPartitionDemander { GridTimeoutObject obj = lastTimeoutObj.get(); if (obj != null) - cctx.time().removeTimeoutObject(obj); + ctx.time().removeTimeoutObject(obj); final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; @@ -391,7 +400,7 @@ public class GridDhtPartitionDemander { @Override public void onTimeout() { exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) { - cctx.shared().exchange().forceRebalance(exchFut); + ctx.exchange().forceRebalance(exchFut); } }); } @@ -399,7 +408,7 @@ public class GridDhtPartitionDemander { lastTimeoutObj.set(obj); - cctx.time().addTimeoutObject(obj); + ctx.time().addTimeoutObject(obj); } return null; @@ -433,17 +442,19 @@ public class GridDhtPartitionDemander { Collection<Integer> parts= e.getValue().partitions(); - assert parts != null : "Partitions are null [cache=" + cctx.name() + ", fromNode=" + nodeId + "]"; + assert parts != null : "Partitions are null [grp=" + grp.name() + ", fromNode=" + nodeId + "]"; fut.remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts)); } } + final CacheConfiguration cfg = grp.config(); + + int lsnrCnt = ctx.gridConfig().getRebalanceThreadPoolSize(); + for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { final ClusterNode node = e.getKey(); - final CacheConfiguration cfg = cctx.config(); - final Collection<Integer> parts = fut.remaining.get(node.id()).get2(); GridDhtPartitionDemandMessage d = e.getValue(); @@ -452,8 +463,6 @@ public class GridDhtPartitionDemander { ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); - int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); - List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); for (int cnt = 0; cnt < lsnrCnt; cnt++) @@ -473,15 +482,15 @@ public class GridDhtPartitionDemander { initD.topic(rebalanceTopics.get(cnt)); initD.updateSequence(fut.updateSeq); - initD.timeout(cctx.config().getRebalanceTimeout()); + initD.timeout(grp.config().getRebalanceTimeout()); synchronized (fut) { if (fut.isDone()) return;// Future can be already cancelled at this moment and all failovers happened. // New requests will not be covered by failovers. - cctx.io().sendOrderedMessage(node, - rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout()); + ctx.io().sendOrderedMessage(node, + rebalanceTopics.get(cnt), initD, grp.ioPolicy(), initD.timeout()); } @@ -505,11 +514,11 @@ public class GridDhtPartitionDemander { for (Integer part : parts) { try { - if (cctx.shared().database().persistenceEnabled()) { + if (ctx.database().persistenceEnabled()) { if (partCntrs == null) partCntrs = new HashMap<>(parts.size(), 1.0f); - GridDhtLocalPartition p = cctx.topology().localPartition(part, old.topologyVersion(), false); + GridDhtLocalPartition p = grp.topology().localPartition(part, old.topologyVersion(), false); partCntrs.put(part, p.initialUpdateCounter()); } @@ -585,7 +594,7 @@ public class GridDhtPartitionDemander { final RebalanceFuture fut = rebalanceFut; - ClusterNode node = cctx.node(id); + ClusterNode node = ctx.node(id); if (node == null) return; @@ -609,14 +618,16 @@ public class GridDhtPartitionDemander { return; } - final GridDhtPartitionTopology top = cctx.dht().topology(); + final GridDhtPartitionTopology top = grp.topology(); try { + AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); + // Preload. for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { int p = e.getKey(); - if (cctx.affinity().partitionLocalNode(p, topVer)) { + if (aff.get(p).contains(ctx.localNode())) { GridDhtLocalPartition part = top.localPartition(p, topVer, true); assert part != null; @@ -627,7 +638,7 @@ public class GridDhtPartitionDemander { boolean reserved = part.reserve(); assert reserved : "Failed to reserve partition [igniteInstanceName=" + - cctx.igniteInstanceName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']'; + ctx.igniteInstanceName() + ", grp=" + grp.name() + ", part=" + part + ']'; part.lock(); @@ -686,7 +697,7 @@ public class GridDhtPartitionDemander { // Only request partitions based on latest topology version. for (Integer miss : supply.missed()) { - if (cctx.affinity().partitionLocalNode(miss, topVer)) + if (aff.get(miss).contains(ctx.localNode())) fut.partitionMissed(id, miss); } @@ -696,14 +707,14 @@ public class GridDhtPartitionDemander { GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( supply.updateSequence(), supply.topologyVersion(), cctx.cacheId()); - d.timeout(cctx.config().getRebalanceTimeout()); + d.timeout(grp.config().getRebalanceTimeout()); d.topic(rebalanceTopics.get(idx)); if (!topologyChanged(fut) && !fut.isDone()) { // Send demand message. - cctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx), - d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); + ctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx), + d, grp.ioPolicy(), grp.config().getRebalanceTimeout()); } } catch (IgniteCheckedException e) { @@ -732,12 +743,15 @@ public class GridDhtPartitionDemander { GridCacheEntryInfo entry, AffinityTopologyVersion topVer ) throws IgniteCheckedException { - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); try { GridCacheEntryEx cached = null; try { + // TODO IGNITE-5075. + GridCacheContext cctx = grp.cacheContext(); + cached = cctx.dht().entryEx(entry.key()); if (log.isDebugEnabled()) @@ -789,10 +803,10 @@ public class GridDhtPartitionDemander { } catch (IgniteCheckedException e) { throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" + - cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e); + ctx.localNode() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e); } finally { - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); } return true; @@ -810,6 +824,12 @@ public class GridDhtPartitionDemander { /** */ private static final long serialVersionUID = 1L; + /** */ + private final GridCacheSharedContext<?, ?> ctx; + + /** */ + private final CacheGroupInfrastructure grp; + /** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */ private final AtomicBoolean startedEvtSent; @@ -817,9 +837,6 @@ public class GridDhtPartitionDemander { private final AtomicBoolean stoppedEvtSent; /** */ - private final GridCacheContext<?, ?> cctx; - - /** */ private final IgniteLogger log; /** Remaining. T2: startTime, partitions */ @@ -840,14 +857,15 @@ public class GridDhtPartitionDemander { /** * @param assigns Assigns. - * @param cctx Context. + * @param grp Cache group. * @param log Logger. * @param startedEvtSent Start event sent flag. * @param stoppedEvtSent Stop event sent flag. * @param updateSeq Update sequence. */ - RebalanceFuture(GridDhtPreloaderAssignments assigns, - GridCacheContext<?, ?> cctx, + RebalanceFuture( + CacheGroupInfrastructure grp, + GridDhtPreloaderAssignments assigns, IgniteLogger log, AtomicBoolean startedEvtSent, AtomicBoolean stoppedEvtSent, @@ -856,20 +874,23 @@ public class GridDhtPartitionDemander { this.exchFut = assigns.exchangeFuture(); this.topVer = assigns.topologyVersion(); - this.cctx = cctx; + this.grp = grp; this.log = log; this.startedEvtSent = startedEvtSent; this.stoppedEvtSent = stoppedEvtSent; this.updateSeq = updateSeq; + + ctx= grp.shared(); } /** * Dummy future. Will be done by real one. */ - public RebalanceFuture() { + RebalanceFuture() { this.exchFut = null; this.topVer = null; - this.cctx = null; + this.ctx = null; + this.grp = null; this.log = null; this.startedEvtSent = null; this.stoppedEvtSent = null; @@ -910,7 +931,7 @@ public class GridDhtPartitionDemander { U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']'); - if (!cctx.kernalContext().isStopping()) { + if (!ctx.kernalContext().isStopping()) { for (UUID nodeId : remaining.keySet()) cleanupRemoteContexts(nodeId); } @@ -931,7 +952,7 @@ public class GridDhtPartitionDemander { if (isDone()) return; - U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() + + U.log(log, ("Cancelled rebalancing [grp=" + grp.name() + ", fromNode=" + nodeId + ", topology=" + topologyVersion() + ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]")); @@ -966,7 +987,7 @@ public class GridDhtPartitionDemander { * @param nodeId Node id. */ private void cleanupRemoteContexts(UUID nodeId) { - ClusterNode node = cctx.discovery().node(nodeId); + ClusterNode node = ctx.discovery().node(nodeId); if (node == null) return; @@ -974,14 +995,14 @@ public class GridDhtPartitionDemander { GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId()); - d.timeout(cctx.config().getRebalanceTimeout()); + d.timeout(grp.config().getRebalanceTimeout()); try { - for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) { + for (int idx = 0; idx < ctx.gridConfig().getRebalanceThreadPoolSize(); idx++) { d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx)); - cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), - d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); + ctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), + d, grp.ioPolicy(), grp.config().getRebalanceTimeout()); } } catch (IgniteCheckedException ignored) { @@ -999,20 +1020,21 @@ public class GridDhtPartitionDemander { if (isDone()) return; - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) - preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, - exchFut.discoveryEvent()); + // TODO IGNITE-5075. +// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) +// preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, +// exchFut.discoveryEvent()); T2<Long, Collection<Integer>> t = remaining.get(nodeId); - assert t != null : "Remaining not found [cache=" + cctx.name() + ", fromNode=" + nodeId + + assert t != null : "Remaining not found [grp=" + grp.name() + ", fromNode=" + nodeId + ", part=" + p + "]"; Collection<Integer> parts = t.get2(); boolean rmvd = parts.remove(p); - assert rmvd : "Partition already done [cache=" + cctx.name() + ", fromNode=" + nodeId + + assert rmvd : "Partition already done [grp=" + grp.name() + ", fromNode=" + nodeId + ", part=" + p + ", left=" + parts + "]"; if (parts.isEmpty()) { @@ -1035,7 +1057,8 @@ public class GridDhtPartitionDemander { private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) { assert discoEvt != null; - cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); + // TODO IGNITE-5075. + // cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); } /** @@ -1063,7 +1086,7 @@ public class GridDhtPartitionDemander { if (log.isDebugEnabled()) log.debug("Completed rebalance future: " + this); - cctx.shared().exchange().scheduleResendPartitions(); + ctx.exchange().scheduleResendPartitions(); Collection<Integer> m = new HashSet<>(); @@ -1077,13 +1100,13 @@ public class GridDhtPartitionDemander { onDone(false); //Finished but has missed partitions, will force dummy exchange - cctx.shared().exchange().forceDummyExchange(true, exchFut); + ctx.exchange().forceDummyExchange(true, exchFut); return; } - if (!cancelled && !cctx.preloader().syncFuture().isDone()) - ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone(); + if (!cancelled && !grp.preloader().syncFuture().isDone()) + ((GridFutureAdapter)grp.preloader().syncFuture()).onDone(); onDone(!cancelled); } @@ -1093,24 +1116,26 @@ public class GridDhtPartitionDemander { * */ private void sendRebalanceStartedEvent() { - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) && - (!cctx.isReplicated() || !startedEvtSent.get())) { - preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent()); - - startedEvtSent.set(true); - } + // TODO IGNITE-5075. +// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) && +// (!cctx.isReplicated() || !startedEvtSent.get())) { +// preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent()); +// +// startedEvtSent.set(true); +// } } /** * */ private void sendRebalanceFinishedEvent() { - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && - (!cctx.isReplicated() || !stoppedEvtSent.get())) { - preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); - - stoppedEvtSent.set(true); - } + // TODO IGNITE-5075. +// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && +// (!cctx.isReplicated() || !stoppedEvtSent.get())) { +// preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); +// +// stoppedEvtSent.set(true); +// } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index f7f0aff..84c3d23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; @@ -26,6 +27,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator; @@ -47,7 +49,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh */ class GridDhtPartitionSupplier { /** */ - private final GridCacheContext<?, ?> cctx; + private final CacheGroupInfrastructure grp; /** */ private final IgniteLogger log; @@ -65,18 +67,18 @@ class GridDhtPartitionSupplier { private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>(); /** - * @param cctx Cache context. + * @param grp Cache group. */ - GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) { - assert cctx != null; + GridDhtPartitionSupplier(CacheGroupInfrastructure grp) { + assert grp != null; - this.cctx = cctx; + this.grp = grp; - log = cctx.logger(getClass()); + log = grp.shared().logger(getClass()); - top = cctx.dht().topology(); + top = grp.topology(); - depEnabled = cctx.gridDeploy().enabled(); + depEnabled = grp.shared().gridDeploy().enabled(); } /** @@ -171,7 +173,7 @@ class GridDhtPartitionSupplier { assert d != null; assert id != null; - AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion cutTop = grp.affinity().lastVersion(); AffinityTopologyVersion demTop = d.topologyVersion(); T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop); @@ -199,7 +201,7 @@ class GridDhtPartitionSupplier { GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage( d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled()); - ClusterNode node = cctx.discovery().node(id); + ClusterNode node = grp.shared().discovery().node(id); if (node == null) return; // Context will be cleaned at topology change. @@ -225,7 +227,7 @@ class GridDhtPartitionSupplier { boolean newReq = true; - long maxBatchesCnt = cctx.config().getRebalanceBatchesPrefetchCount(); + long maxBatchesCnt = grp.config().getRebalanceBatchesPrefetchCount(); if (sctx != null) { phase = sctx.phase; @@ -234,7 +236,7 @@ class GridDhtPartitionSupplier { } else { if (log.isDebugEnabled()) - log.debug("Starting supplying rebalancing [cache=" + cctx.name() + + log.debug("Starting supplying rebalancing [grp=" + grp.name() + ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + ", idx=" + idx + "]"); @@ -280,7 +282,7 @@ class GridDhtPartitionSupplier { IgniteRebalanceIterator iter; if (sctx == null || sctx.entryIt == null) { - iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part)); + iter = grp.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part)); if (!iter.historical()) s.clean(part); @@ -289,7 +291,9 @@ class GridDhtPartitionSupplier { iter = (IgniteRebalanceIterator)sctx.entryIt; while (iter.hasNext()) { - if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) { + List<ClusterNode> nodes = grp.affinity().cachedAffinity(d.topologyVersion()).get(part); + + if (!nodes.contains(node)) { // Demander no longer needs this partition, // so we send '-1' partition and move on. s.missed(part); @@ -313,7 +317,7 @@ class GridDhtPartitionSupplier { break; } - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + if (s.messageSize() >= grp.config().getRebalanceBatchSize()) { if (++bCnt >= maxBatchesCnt) { saveSupplyContext(scId, phase, @@ -400,7 +404,7 @@ class GridDhtPartitionSupplier { reply(node, d, s, scId); if (log.isDebugEnabled()) - log.debug("Finished supplying rebalancing [cache=" + cctx.name() + + log.debug("Finished supplying rebalancing [grp=" + grp.name() + ", fromNode=" + node.id() + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + ", idx=" + idx + "]"); @@ -427,16 +431,15 @@ class GridDhtPartitionSupplier { GridDhtPartitionSupplyMessage s, T3<UUID, Integer, AffinityTopologyVersion> scId) throws IgniteCheckedException { - try { if (log.isDebugEnabled()) log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); - cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); + grp.shared().io().sendOrderedMessage(n, d.topic(), s, grp.ioPolicy(), d.timeout()); // Throttle preloading. - if (cctx.config().getRebalanceThrottle() > 0) - U.sleep(cctx.config().getRebalanceThrottle()); + if (grp.config().getRebalanceThrottle() > 0) + U.sleep(grp.config().getRebalanceThrottle()); return true; } @@ -469,7 +472,7 @@ class GridDhtPartitionSupplier { AffinityTopologyVersion topVer, long updateSeq) { synchronized (scMap) { - if (cctx.affinity().affinityTopologyVersion().equals(topVer)) { + if (grp.affinity().lastVersion().equals(topVer)) { assert scMap.get(t) == null; scMap.put(t, http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index ee461ab..016ec6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -206,32 +206,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G * @param ctx Cache context. * @throws IgniteCheckedException If failed. */ - void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { - assert info != null; - - marshalInfo(info, ctx); - - msgSize += info.marshalledSize(ctx); - - CacheEntryInfoCollection infoCol = infos().get(p); - - if (infoCol == null) { - msgSize += 4; - - infos().put(p, infoCol = new CacheEntryInfoCollection()); - - infoCol.init(); - } - - infoCol.add(info); - } - - /** - * @param p Partition. - * @param info Entry to add. - * @param ctx Cache context. - * @throws IgniteCheckedException If failed. - */ void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { assert info != null; assert (info.key() != null || info.keyBytes() != null); http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 74bbcb0..3f74f75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -87,10 +87,10 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @return Parition update counters. */ - public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId); + public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId); /** * @return Last used version among all nodes. http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 499537d..1e656b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscovery import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; +import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; @@ -644,7 +645,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) - top.updateTopologyVersion(exchId, this, -1, stopping(top.cacheId())); + top.updateTopologyVersion(exchId, this, -1, cacheGroupStopping(top.groupId())); } /** @@ -738,14 +739,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (crd != null) { if (crd.isLocal()) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - boolean updateTop = !cacheCtx.isLocal() && - exchId.topologyVersion().equals(cacheCtx.startTopologyVersion()); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + boolean updateTop = !grp.isLocal() && + exchId.topologyVersion().equals(grp.localStartVersion()); if (updateTop) { for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { - if (top.cacheId() == cacheCtx.cacheId()) { - cacheCtx.topology().update(exchId, + if (top.groupId() == grp.groupId()) { + grp.topology().update(exchId, top.partitionMap(true), top.updateCounters(false)); @@ -766,8 +767,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } else { if (centralizedAff) { // Last server node failed. - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache(); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + GridAffinityAssignmentCache aff = grp.affinity(); aff.initialize(topologyVersion(), aff.idealAssignment()); } @@ -1009,6 +1010,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT U.dumpThreads(log); } + public boolean cacheGroupStopping(int grpId) { + return exchActions != null && exchActions.cacheGroupStopping(grpId); + } + /** * @param cacheId Cache ID to check. * @return {@code True} if cache is stopping by this exchange. @@ -1456,9 +1461,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>(); for (Map.Entry<UUID, GridDhtPartitionsAbstractMessage> e : msgs.entrySet()) { - assert e.getValue().partitionUpdateCounters(top.cacheId()) != null; + assert e.getValue().partitionUpdateCounters(top.groupId()) != null; - for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.cacheId()).entrySet()) { + for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) { int p = e0.getKey(); UUID uuid = e.getKey(); @@ -1755,19 +1760,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cctx.versions().onExchange(msg.lastVersion().order()); for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { - Integer cacheId = entry.getKey(); + Integer grpId = entry.getKey(); - Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(cacheId); + Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(grpId); - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); - if (cacheCtx != null) - cacheCtx.topology().update(exchId, entry.getValue(), cntrMap); + if (grp != null) + grp.topology().update(exchId, entry.getValue(), cntrMap); else { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest != null && oldest.isLocal()) - cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap); + cctx.exchange().clientTopology(grpId, this).update(exchId, entry.getValue(), cntrMap); } } } @@ -1781,13 +1786,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT msgs.put(node.id(), msg); for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { - Integer cacheId = entry.getKey(); - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + Integer grpId = entry.getKey(); + CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); - GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : - cctx.exchange().clientTopology(cacheId, this); + GridDhtPartitionTopology top = grp != null ? grp.topology() : + cctx.exchange().clientTopology(grpId, this); - top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId)); + top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(grpId)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 73a3481..b327ad1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -122,24 +122,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @return {@code True} if message contains full map for given cache. */ - public boolean containsCache(int cacheId) { - return parts != null && parts.containsKey(cacheId); + public boolean containsGroup(int grpId) { + return parts != null && parts.containsKey(grpId); } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param fullMap Full partitions map. * @param dupDataCache Optional ID of cache with the same partition state map. */ - public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) { + public void addFullPartitionsMap(int grpId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) { if (parts == null) parts = new HashMap<>(); - if (!parts.containsKey(cacheId)) { - parts.put(cacheId, fullMap); + if (!parts.containsKey(grpId)) { + parts.put(grpId, fullMap); if (dupDataCache != null) { assert compress; @@ -148,30 +148,30 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (dupPartsData == null) dupPartsData = new HashMap<>(); - dupPartsData.put(cacheId, dupDataCache); + dupPartsData.put(grpId, dupDataCache); } } } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param cntrMap Partition update counters. */ - public void addPartitionUpdateCounters(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) { + public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) { if (partCntrs == null) partCntrs = new HashMap<>(); - if (!partCntrs.containsKey(cacheId)) - partCntrs.put(cacheId, cntrMap); + if (!partCntrs.containsKey(grpId)) + partCntrs.put(grpId, cntrMap); } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @return Partition update counters. */ - @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) { + @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) { if (partCntrs != null) { - Map<Integer, T2<Long, Long>> res = partCntrs.get(cacheId); + Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId); return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap(); } @@ -427,7 +427,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa return 46; } - //todo /** {@inheritDoc} */ @Override public byte fieldsCount() { return 11; http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index e197864..f2c9158 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -127,7 +128,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { DiscoveryEvent e = (DiscoveryEvent)evt; try { - ClusterNode loc = cctx.localNode(); + ClusterNode loc = ctx.localNode(); assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED; @@ -148,12 +149,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { }; /** - * @param cctx Cache context. + * @param grp Cache group. */ - public GridDhtPreloader(GridCacheContext<?, ?> cctx) { - super(cctx); + public GridDhtPreloader(CacheGroupInfrastructure grp) { + super(grp); - top = cctx.dht().topology(); + top = grp.topology(); startFut = new GridFutureAdapter<>(); } @@ -163,26 +164,26 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (log.isDebugEnabled()) log.debug("Starting DHT rebalancer..."); - cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysRequest.class, + ctx.io().addHandler(grp.groupId(), GridDhtForceKeysRequest.class, new MessageHandler<GridDhtForceKeysRequest>() { @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { processForceKeysRequest(node, msg); } }); - cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysResponse.class, + ctx.io().addHandler(grp.groupId(), GridDhtForceKeysResponse.class, new MessageHandler<GridDhtForceKeysResponse>() { @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { processForceKeyResponse(node, msg); } }); - supplier = new GridDhtPartitionSupplier(cctx); + supplier = new GridDhtPartitionSupplier(grp); demander = new GridDhtPartitionDemander(cctx); demander.start(); - cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); + ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); } /** {@inheritDoc} */ @@ -203,7 +204,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { stopping = true; - cctx.events().removeListener(discoLsnr); + ctx.gridEvents().removeLocalEventListener(discoLsnr); // Acquire write busy lock. busyLock.writeLock().lock(); @@ -253,25 +254,27 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { // No assignments for disabled preloader. - GridDhtPartitionTopology top = cctx.dht().topology(); + GridDhtPartitionTopology top = grp.topology(); - if (!cctx.rebalanceEnabled() || !cctx.shared().kernalContext().state().active()) + if (!grp.rebalanceEnabled() || !grp.shared().kernalContext().state().active()) return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); - int partCnt = cctx.affinity().partitions(); + int partCnt = grp.affinity().partitions(); assert exchFut.forcePreload() || exchFut.dummyReassign() || exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) : "Topology version mismatch [exchId=" + exchFut.exchangeId() + - ", cache=" + cctx.name() + + ", grp=" + grp.name() + ", topVer=" + top.topologyVersion() + ']'; GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); AffinityTopologyVersion topVer = assigns.topologyVersion(); + AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); + for (int p = 0; p < partCnt; p++) { - if (cctx.shared().exchange().hasPendingExchange()) { + if (ctx.exchange().hasPendingExchange()) { if (log.isDebugEnabled()) log.debug("Skipping assignments creation, exchange worker has pending assignments: " + exchFut.exchangeId()); @@ -282,7 +285,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } // If partition belongs to local node. - if (cctx.affinity().partitionLocalNode(p, topVer)) { + if (aff.get(p).contains(ctx.localNode())) { GridDhtLocalPartition part = top.localPartition(p, topVer, true); assert part != null; @@ -300,13 +303,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (picked.isEmpty()) { top.own(part); - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { - DiscoveryEvent discoEvt = exchFut.discoveryEvent(); - - cctx.events().addPreloadEvent(p, - EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), - discoEvt.type(), discoEvt.timestamp()); - } +// TODO IGNITE-5075. +// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { +// DiscoveryEvent discoEvt = exchFut.discoveryEvent(); +// +// cctx.events().addPreloadEvent(p, +// EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), +// discoEvt.type(), discoEvt.timestamp()); +// } if (log.isDebugEnabled()) log.debug("Owning partition as there are no other owners: " + part); @@ -342,7 +346,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * @return Picked owners. */ private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) { - Collection<ClusterNode> affNodes = cctx.affinity().nodesByPartition(p, topVer); + Collection<ClusterNode> affNodes = grp.affinity().cachedAffinity(topVer).get(p); int affCnt = affNodes.size(); @@ -368,7 +372,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * @return Nodes owning this partition. */ private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) { - return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId())); + return F.view(grp.topology().owners(p, topVer), F.remoteNodes(ctx.localNodeId())); } /** {@inheritDoc} */ @@ -423,12 +427,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> syncFuture() { - return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture(); + return ctx.kernalContext().clientNode() ? startFut : demander.syncFuture(); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> rebalanceFuture() { - return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture(); + return ctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture(); } /** @@ -459,7 +463,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * @param msg Force keys message. */ private void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) { - IgniteInternalFuture<?> fut = cctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion()); + IgniteInternalFuture<?> fut = ctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion()); if (fut.isDone()) processForceKeysRequest0(node, msg); @@ -480,6 +484,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { return; try { + GridCacheContext cctx = ctx.cacheContext(msg.cacheId()); + ClusterNode loc = cctx.localNode(); GridDhtForceKeysResponse res = new GridDhtForceKeysResponse( @@ -591,11 +597,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { try { top.onEvicted(part, updateSeq); - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED)) - cctx.events().addUnloadEvent(part.id()); +// TODO IGNITE-5075. +// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED)) +// cctx.events().addUnloadEvent(part.id()); if (updateSeq) - cctx.shared().exchange().scheduleResendPartitions(); + ctx.exchange().scheduleResendPartitions(); } finally { leaveBusy(); @@ -604,7 +611,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public boolean needForceKeys() { - if (cctx.rebalanceEnabled()) { + if (grp.rebalanceEnabled()) { IgniteInternalFuture<Boolean> rebalanceFut = rebalanceFuture(); if (rebalanceFut.isDone() && Boolean.TRUE.equals(rebalanceFut.result())) @@ -615,12 +622,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req, + @Override public IgniteInternalFuture<Object> request(GridCacheContext cctx, + GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) { if (!needForceKeys()) return null; - return request0(req.keys(), topVer); + return request0(cctx, req.keys(), topVer); } /** @@ -628,11 +636,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * @return Future for request. */ @SuppressWarnings({"unchecked", "RedundantCast"}) - @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { + @Override public GridDhtFuture<Object> request(GridCacheContext cctx, + Collection<KeyCacheObject> keys, + AffinityTopologyVersion topVer) { if (!needForceKeys()) return null; - return request0(keys, topVer); + return request0(cctx, keys, topVer); } /** @@ -641,7 +651,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * @return Future for request. */ @SuppressWarnings({"unchecked", "RedundantCast"}) - private GridDhtFuture<Object> request0(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { + private GridDhtFuture<Object> request0(GridCacheContext cctx, Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer); @@ -652,7 +662,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (topReadyFut == null) startFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> syncFut) { - cctx.kernalContext().closure().runLocalSafe( + ctx.kernalContext().closure().runLocalSafe( new GridPlainRunnable() { @Override public void run() { fut.init(); @@ -689,7 +699,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { demandLock.writeLock().lock(); try { - cctx.deploy().unwind(cctx); + // TODO IGNITE-5075. + // cctx.deploy().unwind(cctx); } finally { demandLock.writeLock().unlock(); @@ -728,7 +739,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { partsToEvict.putIfAbsent(part.id(), part); if (partsEvictOwning.get() == 0 && partsEvictOwning.compareAndSet(0, 1)) { - cctx.closures().callLocalSafe(new GPC<Boolean>() { + ctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() { @Override public Boolean call() { boolean locked = true; @@ -749,7 +760,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { partsToEvict.put(part.id(), part); } catch (Throwable ex) { - if (cctx.kernalContext().isStopping()) { + if (ctx.kernalContext().isStopping()) { LT.warn(log, ex, "Partition eviction failed (current node is stopping).", false, true); @@ -785,7 +796,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void dumpDebugInfo() { if (!forceKeyFuts.isEmpty()) { - U.warn(log, "Pending force key futures [cache=" + cctx.name() + "]:"); + U.warn(log, "Pending force key futures [grp=" + grp.name() + "]:"); for (GridDhtForceKeysFuture fut : forceKeyFuts.values()) U.warn(log, ">>> " + fut); @@ -803,7 +814,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void apply(UUID nodeId, M msg) { - ClusterNode node = cctx.node(nodeId); + ClusterNode node = ctx.node(nodeId); if (node == null) { if (log.isDebugEnabled())