5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/95a41e88 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/95a41e88 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/95a41e88 Branch: refs/heads/ignite-5578 Commit: 95a41e885a097de7f7d6e19b546ec051518d6856 Parents: 6e280ad Author: sboikov <[email protected]> Authored: Thu Aug 3 22:17:11 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Aug 3 22:17:11 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheGroupContext.java | 16 +++++-- .../dht/GridDhtPartitionTopologyImpl.java | 44 ++++++++++++-------- .../colocated/GridDhtColocatedLockFuture.java | 2 +- .../distributed/near/GridNearLockFuture.java | 2 +- 4 files changed, 41 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/95a41e88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index 209fe0a..5e5e02e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -37,15 +37,15 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager; -import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy; -import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList; -import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager; +import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy; +import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList; +import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.typedef.CI1; @@ -59,6 +59,7 @@ import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; +import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheRebalanceMode.NONE; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; @@ -581,6 +582,13 @@ public class CacheGroupContext { } /** + * @return {@code True} if cache is local. + */ + public boolean isReplicated() { + return ccfg.getCacheMode() == REPLICATED; + } + + /** * @return Cache configuration. */ public CacheConfiguration config() { http://git-wip-us.apache.org/repos/asf/ignite/blob/95a41e88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 1770497..0f804d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -954,6 +954,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return Nodes responsible for given partition (primary is first). */ @Nullable private List<ClusterNode> nodes0(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes) { + if (grp.isReplicated()) + return affNodes; + AffinityTopologyVersion topVer = affAssignment.topologyVersion(); lock.readLock().lock(); @@ -1277,7 +1280,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { node2part = partMap; - if (exchangeVer == null && (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) { + if (exchangeVer == null && !grp.isReplicated() && + (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) { AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer); for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { @@ -1538,7 +1542,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { node2part.put(parts.nodeId(), parts); // During exchange calculate diff after all messages are received and affinity initialized. - if (exchId == null) { + if (exchId == null && !grp.isReplicated()) { if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) { AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer); @@ -1618,8 +1622,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { readyTopVer = lastTopChangeVer = assignment.topologyVersion(); - if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0) - rebuildDiff(assignment); + if (!grp.isReplicated()) { + if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0) + rebuildDiff(assignment); + } if (updateRebalanceVer) updateRebalanceVersion(assignment.assignment()); @@ -2041,7 +2047,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { map.put(p, state); - if (state == MOVING || state == OWNING || state == RENTING) { + if (!grp.isReplicated() && (state == MOVING || state == OWNING || state == RENTING)) { AffinityAssignment assignment = grp.affinity().cachedAffinity(diffFromAffinityVer); if (!assignment.getIds(p).contains(ctx.localNodeId())) { @@ -2082,12 +2088,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionMap parts = node2part.remove(nodeId); - if (parts != null) { - for (Integer p : parts.keySet()) { - Set<UUID> diffIds = diffFromAffinity.get(p); + if (!grp.isReplicated()) { + if (parts != null) { + for (Integer p : parts.keySet()) { + Set<UUID> diffIds = diffFromAffinity.get(p); - if (diffIds != null) - diffIds.remove(nodeId); + if (diffIds != null) + diffIds.remove(nodeId); + } } } @@ -2307,15 +2315,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { owners.add(node); } - Set<UUID> diff = diffFromAffinity.get(i); + if (!grp.isReplicated()) { + Set<UUID> diff = diffFromAffinity.get(i); - if (diff != null) { - for (UUID nodeId : diff) { - if (hasState(i, nodeId, OWNING)) { - ClusterNode node = ctx.discovery().node(nodeId); + if (diff != null) { + for (UUID nodeId : diff) { + if (hasState(i, nodeId, OWNING)) { + ClusterNode node = ctx.discovery().node(nodeId); - if (node != null) - owners.add(node); + if (node != null) + owners.add(node); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/95a41e88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index af4af5c..7500549 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -690,7 +690,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF if (topVer != null) { for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) { - if (fut.isDone() && fut.topologyVersion().equals(topVer)) { + if (fut.exchangeDone() && fut.topologyVersion().equals(topVer)) { Throwable err = fut.validateCache(cctx, recovery, read, null, keys); if (err != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/95a41e88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index c6d7450..bb71337 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -787,7 +787,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo if (topVer != null) { for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) { - if (fut.isDone() && fut.topologyVersion().equals(topVer)){ + if (fut.exchangeDone() && fut.topologyVersion().equals(topVer)){ Throwable err = fut.validateCache(cctx, recovery, read, null, keys); if (err != null) {
