ignite-4557 Fixed wrong affinity manager call.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6d5adcb7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6d5adcb7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6d5adcb7 Branch: refs/heads/ignite-comm-balance-master Commit: 6d5adcb7b3cec01bce18bba10fbd22c426563e93 Parents: e613c00 Author: Evgeny Stanilovskiy <[email protected]> Authored: Wed Feb 1 14:21:49 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Feb 1 17:12:26 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 10 ++-- .../cache/GridCacheAffinityManager.java | 60 ++++++++------------ .../processors/cache/GridCacheContext.java | 17 ------ .../cache/GridCacheEvictionManager.java | 6 +- .../processors/cache/GridCacheUtils.java | 20 ------- .../cache/affinity/GridCacheAffinityImpl.java | 16 +++--- .../CacheDataStructuresManager.java | 2 +- .../distributed/dht/GridDhtCacheAdapter.java | 2 +- .../distributed/dht/GridDhtCacheEntry.java | 2 +- .../distributed/dht/GridDhtLocalPartition.java | 4 +- .../dht/GridDhtPartitionTopologyImpl.java | 4 +- .../cache/distributed/dht/GridDhtTxRemote.java | 2 +- .../dht/GridPartitionedGetFuture.java | 2 +- .../dht/GridPartitionedSingleGetFuture.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 13 +++-- .../GridNearAtomicSingleUpdateFuture.java | 2 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +- .../dht/colocated/GridDhtColocatedCache.java | 6 +- .../colocated/GridDhtColocatedLockFuture.java | 4 +- .../dht/preloader/GridDhtPartitionDemander.java | 8 +-- .../dht/preloader/GridDhtPartitionSupplier.java | 12 ++-- .../dht/preloader/GridDhtPreloader.java | 4 +- .../distributed/near/GridNearAtomicCache.java | 2 +- .../distributed/near/GridNearCacheEntry.java | 6 +- .../distributed/near/GridNearGetFuture.java | 4 +- .../distributed/near/GridNearLockFuture.java | 2 +- ...arOptimisticSerializableTxPrepareFuture.java | 2 +- .../near/GridNearOptimisticTxPrepareFuture.java | 2 +- .../GridNearPessimisticTxPrepareFuture.java | 2 +- .../near/GridNearTransactionalCache.java | 6 +- .../near/GridNearTxFinishFuture.java | 2 +- .../cache/query/GridCacheQueryManager.java | 9 +-- .../continuous/CacheContinuousQueryHandler.java | 2 +- .../continuous/CacheContinuousQueryManager.java | 2 +- .../cache/transactions/IgniteTxAdapter.java | 4 +- .../transactions/IgniteTxLocalAdapter.java | 4 +- .../cache/transactions/TxDeadlockDetection.java | 2 +- .../datastreamer/DataStreamerImpl.java | 2 +- .../datastructures/GridCacheSetImpl.java | 2 +- .../datastructures/GridSetQueryPredicate.java | 2 +- .../processors/job/GridJobProcessor.java | 2 +- .../cache/CacheAffinityCallSelfTest.java | 4 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 6 +- .../IgniteCacheConfigVariationsFullApiTest.java | 6 +- .../cache/IgniteCachePeekModesAbstractTest.java | 8 +-- ...actQueueFailoverDataConsistencySelfTest.java | 2 +- ...niteCacheClientNodeChangingTopologyTest.java | 8 +-- .../TxOptimisticDeadlockDetectionTest.java | 2 +- .../TxPessimisticDeadlockDetectionTest.java | 2 +- .../processors/query/h2/IgniteH2Indexing.java | 2 +- .../query/h2/opt/GridH2IndexBase.java | 2 +- 51 files changed, 127 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index ecf9ea9..8d0a962 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -778,7 +778,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V boolean nearKey; if (!(modes.near && modes.primary && modes.backup)) { - boolean keyPrimary = ctx.affinity().primary(ctx.localNode(), part, topVer); + boolean keyPrimary = ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer); if (keyPrimary) { if (!modes.primary) @@ -787,7 +787,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V nearKey = false; } else { - boolean keyBackup = ctx.affinity().belongs(ctx.localNode(), part, topVer); + boolean keyBackup = ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer); if (keyBackup) { if (!modes.backup) @@ -808,7 +808,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } } else { - nearKey = !ctx.affinity().belongs(ctx.localNode(), part, topVer); + nearKey = !ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer); if (nearKey) { // Swap and offheap are disabled for near cache. @@ -3763,8 +3763,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public boolean apply(ClusterNode clusterNode) { return clusterNode.version().compareTo(PartitionSizeLongTask.SINCE_VER) >= 0 && - ((modes.primary && aff.primary(clusterNode, part, topVer)) || - (modes.backup && aff.backup(clusterNode, part, topVer))); + ((modes.primary && aff.primaryByPartition(clusterNode, part, topVer)) || + (modes.backup && aff.backupByPartition(clusterNode, part, topVer))); } }).nodes(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 8b7be1b..d85e76e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -238,8 +238,8 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param topVer Topology version. * @return Affinity nodes. */ - public List<ClusterNode> nodes(Object key, AffinityTopologyVersion topVer) { - return nodes(partition(key), topVer); + public List<ClusterNode> nodesByKey(Object key, AffinityTopologyVersion topVer) { + return nodesByPartition(partition(key), topVer); } /** @@ -247,7 +247,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param topVer Topology version. * @return Affinity nodes. */ - public List<ClusterNode> nodes(int part, AffinityTopologyVersion topVer) { + public List<ClusterNode> nodesByPartition(int part, AffinityTopologyVersion topVer) { if (cctx.isLocal()) topVer = LOC_CACHE_TOP_VER; @@ -282,8 +282,8 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param topVer Topology version. * @return Primary node for given key. */ - @Nullable public ClusterNode primary(Object key, AffinityTopologyVersion topVer) { - return primary(partition(key), topVer); + @Nullable public ClusterNode primaryByKey(Object key, AffinityTopologyVersion topVer) { + return primaryByPartition(partition(key), topVer); } /** @@ -291,8 +291,8 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param topVer Topology version. * @return Primary node for given key. */ - @Nullable public ClusterNode primary(int part, AffinityTopologyVersion topVer) { - List<ClusterNode> nodes = nodes(part, topVer); + @Nullable public ClusterNode primaryByPartition(int part, AffinityTopologyVersion topVer) { + List<ClusterNode> nodes = nodesByPartition(part, topVer); if (nodes.isEmpty()) return null; @@ -306,8 +306,8 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param topVer Topology version. * @return {@code True} if checked node is primary for given key. */ - public boolean primary(ClusterNode n, Object key, AffinityTopologyVersion topVer) { - return F.eq(primary(key, topVer), n); + public boolean primaryByKey(ClusterNode n, Object key, AffinityTopologyVersion topVer) { + return F.eq(primaryByKey(key, topVer), n); } /** @@ -316,8 +316,8 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param topVer Topology version. * @return {@code True} if checked node is primary for given partition. */ - public boolean primary(ClusterNode n, int part, AffinityTopologyVersion topVer) { - return F.eq(primary(part, topVer), n); + public boolean primaryByPartition(ClusterNode n, int part, AffinityTopologyVersion topVer) { + return F.eq(primaryByPartition(part, topVer), n); } /** @@ -325,8 +325,8 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param topVer Topology version. * @return Backup nodes. */ - public Collection<ClusterNode> backups(Object key, AffinityTopologyVersion topVer) { - return backups(partition(key), topVer); + public Collection<ClusterNode> backupsByKey(Object key, AffinityTopologyVersion topVer) { + return backupsByPartition(partition(key), topVer); } /** @@ -334,8 +334,8 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param topVer Topology version. * @return Backup nodes. */ - public Collection<ClusterNode> backups(int part, AffinityTopologyVersion topVer) { - List<ClusterNode> nodes = nodes(part, topVer); + private Collection<ClusterNode> backupsByPartition(int part, AffinityTopologyVersion topVer) { + List<ClusterNode> nodes = nodesByPartition(part, topVer); assert !F.isEmpty(nodes); @@ -351,8 +351,8 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param topVer Topology version. * @return {@code True} if checked node is a backup node for given partition. */ - public boolean backup(ClusterNode n, int part, AffinityTopologyVersion topVer) { - List<ClusterNode> nodes = nodes(part, topVer); + public boolean backupByPartition(ClusterNode n, int part, AffinityTopologyVersion topVer) { + List<ClusterNode> nodes = nodesByPartition(part, topVer); assert !F.isEmpty(nodes); @@ -360,26 +360,12 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { } /** - * @param keys keys. - * @param topVer Topology version. - * @return Nodes for the keys. - */ - public Collection<ClusterNode> remoteNodes(Iterable keys, AffinityTopologyVersion topVer) { - Collection<Collection<ClusterNode>> colcol = new GridLeanSet<>(); - - for (Object key : keys) - colcol.add(nodes(key, topVer)); - - return F.view(F.flatCollections(colcol), F.remoteNodes(cctx.localNodeId())); - } - - /** * @param key Key to check. * @param topVer Topology version. * @return {@code true} if given key belongs to local node. */ - public boolean localNode(Object key, AffinityTopologyVersion topVer) { - return localNode(partition(key), topVer); + public boolean keyLocalNode(Object key, AffinityTopologyVersion topVer) { + return partitionLocalNode(partition(key), topVer); } /** @@ -387,10 +373,10 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param topVer Topology version. * @return {@code true} if given partition belongs to local node. */ - public boolean localNode(int part, AffinityTopologyVersion topVer) { + public boolean partitionLocalNode(int part, AffinityTopologyVersion topVer) { assert part >= 0 : "Invalid partition: " + part; - return nodes(part, topVer).contains(cctx.localNode()); + return nodesByPartition(part, topVer).contains(cctx.localNode()); } /** @@ -399,11 +385,11 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param topVer Topology version. * @return {@code true} if given partition belongs to specified node. */ - public boolean belongs(ClusterNode node, int part, AffinityTopologyVersion topVer) { + public boolean partitionBelongs(ClusterNode node, int part, AffinityTopologyVersion topVer) { assert node != null; assert part >= 0 : "Invalid partition: " + part; - return nodes(part, topVer).contains(node); + return nodesByPartition(part, topVer).contains(node); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 66b71b4..fdcef3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1591,23 +1591,6 @@ public class GridCacheContext<K, V> implements Externalizable { } /** - * Checks if at least one of the given keys belongs to one of the given partitions. - * - * @param keys Collection of keys to check. - * @param movingParts Collection of partitions to check against. - * @return {@code True} if there exist a key in collection {@code keys} that belongs - * to one of partitions in {@code movingParts} - */ - public boolean hasKey(Iterable<? extends K> keys, Collection<Integer> movingParts) { - for (K key : keys) { - if (movingParts.contains(affinity().partition(key))) - return true; - } - - return false; - } - - /** * Check whether conflict resolution is required. * * @return {@code True} in case DR is required. http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index 134e743..f8722d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -808,7 +808,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { return; // Don't track non-primary entries if evicts are synchronized. - if (!cctx.isNear() && evictSync && !cctx.affinity().primary(cctx.localNode(), e.partition(), topVer)) + if (!cctx.isNear() && evictSync && !cctx.affinity().primaryByPartition(cctx.localNode(), e.partition(), topVer)) return; if (!busyLock.enterBusy()) @@ -910,7 +910,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { if (evictSyncAgr) { assert !cctx.isNear(); // Make sure cache is not NEAR. - if (cctx.affinity().backups( + if (cctx.affinity().backupsByKey( entry.key(), cctx.topology().topologyVersion()).contains(cctx.localNode()) && evictSync) @@ -1498,7 +1498,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { if (!evts.isEmpty()) break; - if (!cctx.affinity().primary(loc, it.next(), topVer)) + if (!cctx.affinity().primaryByPartition(loc, it.next(), topVer)) it.remove(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 969c41a..b927096 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -953,26 +953,6 @@ public class GridCacheUtils { } /** - * Gets primary node on which given key is cached. - * - * @param ctx Cache. - * @param key Key to find primary node for. - * @return Primary node for the key. - */ - @SuppressWarnings( {"unchecked"}) - @Nullable public static ClusterNode primaryNode(GridCacheContext ctx, Object key) { - assert ctx != null; - assert key != null; - - CacheConfiguration cfg = ctx.cache().configuration(); - - if (cfg.getCacheMode() != PARTITIONED) - return ctx.localNode(); - - return ctx.affinity().primary(key, ctx.affinity().affinityTopologyVersion()); - } - - /** * @param asc {@code True} for ascending. * @return Descending order comparator. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java index 9e85bad..11361a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java @@ -82,21 +82,21 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { @Override public boolean isPrimary(ClusterNode n, K key) { A.notNull(n, "n", key, "key"); - return cctx.affinity().primary(n, key, topologyVersion()); + return cctx.affinity().primaryByKey(n, key, topologyVersion()); } /** {@inheritDoc} */ @Override public boolean isBackup(ClusterNode n, K key) { A.notNull(n, "n", key, "key"); - return cctx.affinity().backups(key, topologyVersion()).contains(n); + return cctx.affinity().backupsByKey(key, topologyVersion()).contains(n); } /** {@inheritDoc} */ @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) { A.notNull(n, "n", key, "key"); - return cctx.affinity().belongs(n, cctx.affinity().partition(key), topologyVersion()); + return cctx.affinity().partitionBelongs(n, cctx.affinity().partition(key), topologyVersion()); } /** {@inheritDoc} */ @@ -126,7 +126,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { AffinityTopologyVersion topVer = topologyVersion(); for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) { - for (ClusterNode affNode : cctx.affinity().nodes(part, topVer)) { + for (ClusterNode affNode : cctx.affinity().nodesByPartition(part, topVer)) { if (n.id().equals(affNode.id())) { parts.add(part); @@ -142,7 +142,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { @Override public ClusterNode mapPartitionToNode(int part) { A.ensure(part >= 0 && part < partitions(), "part >= 0 && part < total partitions"); - return F.first(cctx.affinity().nodes(part, topologyVersion())); + return F.first(cctx.affinity().nodesByPartition(part, topologyVersion())); } /** {@inheritDoc} */ @@ -204,7 +204,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { Map<ClusterNode, Collection<K>> res = new HashMap<>(nodesCnt, 1.0f); for (K key : keys) { - ClusterNode primary = cctx.affinity().primary(key, topVer); + ClusterNode primary = cctx.affinity().primaryByKey(key, topVer); if (primary == null) throw new IgniteException("Failed to get primary node [topVer=" + topVer + ", key=" + key + ']'); @@ -227,14 +227,14 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) { A.notNull(key, "key"); - return cctx.affinity().nodes(partition(key), topologyVersion()); + return cctx.affinity().nodesByPartition(partition(key), topologyVersion()); } /** {@inheritDoc} */ @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) { A.ensure(part >= 0 && part < partitions(), "part >= 0 && part < total partitions"); - return cctx.affinity().nodes(part, topologyVersion()); + return cctx.affinity().nodesByPartition(part, topologyVersion()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index c1983df..d864d3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -455,7 +455,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { Collection<SetItemKey> keys = new ArrayList<>(BATCH_SIZE); for (SetItemKey key : set) { - if (!loc && !aff.primary(cctx.localNode(), key, topVer)) + if (!loc && !aff.primaryByKey(cctx.localNode(), key, topVer)) continue; keys.add(key); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index c9f7c5c..2d69fd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -931,7 +931,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion(); for (Map.Entry<KeyCacheObject, GridCacheVersion> e : entries.entrySet()) { - List<ClusterNode> nodes = ctx.affinity().nodes(e.getKey(), topVer); + List<ClusterNode> nodes = ctx.affinity().nodesByKey(e.getKey(), topVer); for (int i = 0; i < nodes.size(); i++) { ClusterNode node = nodes.get(i); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index cf4085b..39571ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -402,7 +402,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { } // If remote node is (primary?) or back up, don't add it as a reader. - if (cctx.affinity().belongs(node, partition(), topVer)) { + if (cctx.affinity().partitionBelongs(node, partition(), topVer)) { if (log.isDebugEnabled()) log.debug("Ignoring near reader because remote node is affinity node [locNodeId=" + cctx.localNodeId() + ", rmtNodeId=" + nodeId + ", key=" + key + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 3ce1dd8..142982b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -616,7 +616,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @return {@code True} if local node is primary for this partition. */ public boolean primary(AffinityTopologyVersion topVer) { - return cctx.affinity().primary(cctx.localNode(), id, topVer); + return cctx.affinity().primaryByPartition(cctx.localNode(), id, topVer); } /** @@ -624,7 +624,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @return {@code True} if local node is backup for this partition. */ public boolean backup(AffinityTopologyVersion topVer) { - return cctx.affinity().backup(cctx.localNode(), id, topVer); + return cctx.affinity().backupByPartition(cctx.localNode(), id, topVer); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 1b4dcc9..97d1053 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -568,7 +568,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { for (int p = 0; p < num; p++) { GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); - if (cctx.affinity().localNode(p, topVer)) { + if (cctx.affinity().partitionLocalNode(p, topVer)) { // This partition will be created during next topology event, // which obviously has not happened at this point. if (locPart == null) { @@ -691,7 +691,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { loc = locParts.get(p); - boolean belongs = cctx.affinity().localNode(p, topVer); + boolean belongs = cctx.affinity().partitionLocalNode(p, topVer); if (loc != null && loc.state() == EVICTED) { locParts.set(p, loc = null); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 6ad20c7..f3c4963 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -274,7 +274,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { return true; // Check if we are on the backup node. - return !cacheCtx.affinity().backups(key, topVer).contains(cctx.localNode()); + return !cacheCtx.affinity().backupsByKey(key, topVer).contains(cctx.localNode()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 7efe841..12df74f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -379,7 +379,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda ) { int part = cctx.affinity().partition(key); - List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer); + List<ClusterNode> affNodes = cctx.affinity().nodesByPartition(part, topVer); if (affNodes.isEmpty()) { onDone(serverNotFoundError(topVer)); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index a0b7940..b3c09e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -325,7 +325,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im @Nullable private ClusterNode mapKeyToNode(AffinityTopologyVersion topVer) { int part = cctx.affinity().partition(key); - List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer); + List<ClusterNode> affNodes = cctx.affinity().nodesByPartition(part, topVer); if (affNodes.isEmpty()) { onDone(serverNotFoundError(topVer)); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index acfe141..c116639 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -2463,7 +2463,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; - boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.partition(), + boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), entry.partition(), req.topologyVersion()); Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i); @@ -2553,7 +2553,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (hasNear) { if (primary && updRes.sendToDht()) { - if (!ctx.affinity().belongs(node, entry.partition(), topVer)) { + if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) { // If put the same value as in request then do not need to send it back. if (op == TRANSFORM || writeVal != updRes.newValue()) { res.addNearValue(i, @@ -2684,7 +2684,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ? F.view(putMap, new P1<CacheObject>() { @Override public boolean apply(CacheObject key) { - return ctx.affinity().primary(ctx.localNode(), key, req.topologyVersion()); + return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion()); } }) : putMap; @@ -2707,7 +2707,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Collection<KeyCacheObject> storeKeys = req.fastMap() ? F.view(rmvKeys, new P1<Object>() { @Override public boolean apply(Object key) { - return ctx.affinity().primary(ctx.localNode(), key, req.topologyVersion()); + return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion()); } }) : rmvKeys; @@ -2746,7 +2746,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert writeVal != null || op == DELETE : "null write value found."; - boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(), + boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), + entry.partition(), req.topologyVersion()); Collection<UUID> readers = null; @@ -2842,7 +2843,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (hasNear) { if (primary) { - if (!ctx.affinity().belongs(node, entry.partition(), topVer)) { + if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) { int idx = firstEntryIdx + i; if (req.operation() == TRANSFORM) { http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 7376aff..891a20c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -543,7 +543,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda else val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val); - ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); + ClusterNode primary = cctx.affinity().primaryByKey(cacheKey, topVer); if (primary == null) throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 950e5bd..2315a18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -936,7 +936,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu else val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val); - ClusterNode primary = cctx.affinity().primary(cacheKey.partition(), topVer); + ClusterNode primary = cctx.affinity().primaryByPartition(cacheKey.partition(), topVer); if (primary == null) throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + @@ -987,7 +987,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu // If we can send updates in parallel - do it. return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) : - Collections.singletonList(affMgr.primary(key, topVer)); + Collections.singletonList(affMgr.primaryByKey(key, topVer)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 2d18a47..20b8791 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -173,7 +173,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte AffinityTopologyVersion topVer, boolean allowDetached ) { - return allowDetached && !ctx.affinity().primary(ctx.localNode(), key, topVer) ? + return allowDetached && !ctx.affinity().primaryByKey(ctx.localNode(), key, topVer) ? createEntry(key) : entryExx(key, topVer); } @@ -663,7 +663,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0; // Send request to remove from remote nodes. - ClusterNode primary = ctx.affinity().primary(key, topVer); + ClusterNode primary = ctx.affinity().primaryByKey(key, topVer); if (primary == null) { if (log.isDebugEnabled()) @@ -783,7 +783,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte map = U.newHashMap(affNodes.size()); } - ClusterNode primary = ctx.affinity().primary(key, topVer); + ClusterNode primary = ctx.affinity().primaryByKey(key, topVer); if (primary == null) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 69b66f9..79ca108 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -1203,7 +1203,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture boolean explicit = false; for (KeyCacheObject key : keys) { - if (!cctx.affinity().primary(cctx.localNode(), key, topVer)) { + if (!cctx.affinity().primaryByKey(cctx.localNode(), key, topVer)) { // Remove explicit locks added so far. for (KeyCacheObject k : keys) cctx.mvcc().removeExplicitLock(threadId, cctx.txKey(k), lockVer); @@ -1287,7 +1287,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture ) throws IgniteCheckedException { assert mapping == null || mapping.node() != null; - ClusterNode primary = cctx.affinity().primary(key, topVer); + ClusterNode primary = cctx.affinity().primaryByKey(key, topVer); if (primary == null) throw new ClusterTopologyServerNotFoundException("Failed to lock keys " + http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 02c31da..274205b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -623,7 +623,7 @@ public class GridDhtPartitionDemander { for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { int p = e.getKey(); - if (cctx.affinity().localNode(p, topVer)) { + if (cctx.affinity().partitionLocalNode(p, topVer)) { GridDhtLocalPartition part = top.localPartition(p, topVer, true); assert part != null; @@ -693,7 +693,7 @@ public class GridDhtPartitionDemander { // Only request partitions based on latest topology version. for (Integer miss : supply.missed()) { - if (cctx.affinity().localNode(miss, topVer)) + if (cctx.affinity().partitionLocalNode(miss, topVer)) fut.partitionMissed(id, miss); } @@ -1384,7 +1384,7 @@ public class GridDhtPartitionDemander { for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { int p = e.getKey(); - if (cctx.affinity().localNode(p, topVer)) { + if (cctx.affinity().partitionLocalNode(p, topVer)) { GridDhtLocalPartition part = top.localPartition(p, topVer, true); assert part != null; @@ -1461,7 +1461,7 @@ public class GridDhtPartitionDemander { // Only request partitions based on latest topology version. for (Integer miss : s.supply().missed()) { - if (cctx.affinity().localNode(miss, topVer)) + if (cctx.affinity().partitionLocalNode(miss, topVer)) fut.partitionMissed(node.id(), miss); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index b082c47..9942423 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -302,7 +302,7 @@ class GridDhtPartitionSupplier { (Iterator<GridCacheMapEntry>)sctx.entryIt : loc.allEntries().iterator(); while (entIt.hasNext()) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) { // Demander no longer needs this partition, so we send '-1' partition and move on. s.missed(part); @@ -387,7 +387,7 @@ class GridDhtPartitionSupplier { boolean prepared = false; while (iter.hasNext()) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) { // Demander no longer needs this partition, // so we send '-1' partition and move on. s.missed(part); @@ -510,7 +510,7 @@ class GridDhtPartitionSupplier { (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator(); while (lsnrIt.hasNext()) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) { // Demander no longer needs this partition, // so we send '-1' partition and move on. s.missed(part); @@ -808,7 +808,7 @@ class GridDhtPartitionSupplier { boolean partMissing = false; for (GridCacheEntryEx e : loc.allEntries()) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) { // Demander no longer needs this partition, so we send '-1' partition and move on. s.missed(part); @@ -859,7 +859,7 @@ class GridDhtPartitionSupplier { boolean prepared = false; for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) { // Demander no longer needs this partition, // so we send '-1' partition and move on. s.missed(part); @@ -947,7 +947,7 @@ class GridDhtPartitionSupplier { swapLsnr = null; for (GridCacheEntryInfo info : entries) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) { // Demander no longer needs this partition, // so we send '-1' partition and move on. s.missed(part); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 41bc2fc..a4a0c35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -289,7 +289,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } // If partition belongs to local node. - if (cctx.affinity().localNode(p, topVer)) { + if (cctx.affinity().partitionLocalNode(p, topVer)) { GridDhtLocalPartition part = top.localPartition(p, topVer, true); assert part != null; @@ -349,7 +349,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * @return Picked owners. */ private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) { - Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); + Collection<ClusterNode> affNodes = cctx.affinity().nodesByPartition(p, topVer); int affCnt = affNodes.size(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index b843e4e..41632ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -161,7 +161,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { if (F.contains(failed, key)) continue; - if (ctx.affinity().belongs(ctx.localNode(), ctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup. + if (ctx.affinity().partitionBelongs(ctx.localNode(), ctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup. GridCacheEntryEx entry = peekEx(key); if (entry != null && entry.markObsolete(ver)) http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 30fc213..d022805 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -112,7 +112,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { return false; } - if (cctx.affinity().backup(cctx.localNode(), part, topVer)) { + if (cctx.affinity().backupByPartition(cctx.localNode(), part, topVer)) { this.topVer = AffinityTopologyVersion.NONE; return false; @@ -162,7 +162,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } } - ClusterNode primaryNode = cctx.affinity().primary(key, topVer); + ClusterNode primaryNode = cctx.affinity().primaryByKey(key, topVer); if (primaryNode == null) this.topVer = AffinityTopologyVersion.NONE; @@ -686,7 +686,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { ClusterNode primary = null; try { - primary = cctx.affinity().primary(part, topVer); + primary = cctx.affinity().primaryByPartition(part, topVer); } catch (IllegalStateException ignore) { // Do not have affinity history. http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 6ac55f8..47ba043 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -413,7 +413,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap ) { int part = cctx.affinity().partition(key); - List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer); + List<ClusterNode> affNodes = cctx.affinity().nodesByPartition(part, topVer); if (affNodes.isEmpty()) { onDone(serverNotFoundError(topVer)); @@ -724,7 +724,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap info.unmarshalValue(cctx, cctx.deploy().globalLoader()); // Entries available locally in DHT should not be loaded into near cache for reading. - if (!cctx.affinity().localNode(info.key(), cctx.affinity().affinityTopologyVersion())) { + if (!cctx.affinity().keyLocalNode(info.key(), cctx.affinity().affinityTopologyVersion())) { GridNearCacheEntry entry = savedEntries.get(info.key()); if (entry == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index d7a0fb5..d3e3a15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -1373,7 +1373,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean ) throws IgniteCheckedException { assert mapping == null || mapping.node() != null; - ClusterNode primary = cctx.affinity().primary(key, topVer); + ClusterNode primary = cctx.affinity().primaryByKey(key, topVer); if (primary == null) throw new ClusterTopologyServerNotFoundException("Failed to lock keys " + http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index c464b36..a8448dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -529,7 +529,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim GridCacheContext cacheCtx = entry.context(); List<ClusterNode> nodes = cacheCtx.isLocal() ? - cacheCtx.affinity().nodes(entry.key(), topVer) : + cacheCtx.affinity().nodesByKey(entry.key(), topVer) : cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer); txMapping.addMapping(nodes); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index b314b81..606d70f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -605,7 +605,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa nodes = cacheCtx.topology().nodes(cached0.partition(), topVer); else nodes = cacheCtx.isLocal() ? - cacheCtx.affinity().nodes(entry.key(), topVer) : + cacheCtx.affinity().nodesByKey(entry.key(), topVer) : cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer); txMapping.addMapping(nodes); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index f9a2f90..a4132f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -196,7 +196,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA GridCacheContext cacheCtx = txEntry.context(); List<ClusterNode> nodes = cacheCtx.isLocal() ? - cacheCtx.affinity().nodes(txEntry.key(), topVer) : + cacheCtx.affinity().nodesByKey(txEntry.key(), topVer) : cacheCtx.topology().nodes(cacheCtx.affinity().partition(txEntry.key()), topVer); ClusterNode primary = F.first(nodes); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index b3eb755..940dd80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -476,7 +476,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> * @return {@code True} if entry is locally mapped as a primary or back up node. */ protected boolean isNearLocallyMapped(GridCacheEntryEx e, AffinityTopologyVersion topVer) { - return ctx.affinity().belongs(ctx.localNode(), e.partition(), topVer); + return ctx.affinity().partitionBelongs(ctx.localNode(), e.partition(), topVer); } /** @@ -548,7 +548,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> topVer = cand.topologyVersion(); // Send request to remove from remote nodes. - ClusterNode primary = ctx.affinity().primary(key, topVer); + ClusterNode primary = ctx.affinity().primaryByKey(key, topVer); if (primary == null) { if (log.isDebugEnabled()) @@ -668,7 +668,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> map = U.newHashMap(affNodes.size()); } - ClusterNode primary = ctx.affinity().primary(key, cand.topologyVersion()); + ClusterNode primary = ctx.affinity().primaryByKey(key, cand.topologyVersion()); if (primary == null) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 9acab56..aed1ab0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -350,7 +350,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu GridCacheContext cacheCtx = e.context(); try { - if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(), topVer)) { + if (e.op() != NOOP && !cacheCtx.affinity().keyLocalNode(e.key(), topVer)) { GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key()); if (entry != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 85c01d9..4fa7e36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1525,11 +1525,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte // Other types are filtered in indexing manager. if (!cctx.isReplicated() && qry.type() == SCAN && qry.partition() == null && cctx.config().getCacheMode() != LOCAL && !incBackups && - !cctx.affinity().primary(cctx.localNode(), key, topVer)) { + !cctx.affinity().primaryByKey(cctx.localNode(), key, topVer)) { if (log.isDebugEnabled()) log.debug("Ignoring backup element [row=" + row + ", cacheMode=" + cctx.config().getCacheMode() + ", incBackups=" + incBackups + - ", primary=" + cctx.affinity().primary(cctx.localNode(), key, topVer) + ']'); + ", primary=" + cctx.affinity().primaryByKey(cctx.localNode(), key, topVer) + ']'); continue; } @@ -1537,7 +1537,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte V val = row.getValue(); if (log.isDebugEnabled()) { - ClusterNode primaryNode = CU.primaryNode(cctx, key); + ClusterNode primaryNode = cctx.affinity().primaryByKey(key, + cctx.affinity().affinityTopologyVersion()); log.debug(S.toString("Record", "key", key, true, @@ -2300,7 +2301,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return new IgniteBiPredicate<K, V>() { @Override public boolean apply(K k, V v) { - return cache.context().affinity().primary(ctx.discovery().localNode(), k, NONE); + return cache.context().affinity().primaryByKey(ctx.discovery().localNode(), k, NONE); } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 10784fc..dbbcbff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -861,7 +861,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler GridCacheAffinityManager aff = cctx.affinity(); if (initUpdCntrsPerNode != null) { - for (ClusterNode node : aff.nodes(partId, initTopVer)) { + for (ClusterNode node : aff.nodesByPartition(partId, initTopVer)) { Map<Integer, Long> map = initUpdCntrsPerNode.get(node.id()); if (map != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 91c1991..6887a50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -367,7 +367,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if (F.isEmpty(lsnrCol)) return; - boolean primary = cctx.affinity().primary(cctx.localNode(), e.partition(), AffinityTopologyVersion.NONE); + boolean primary = cctx.affinity().primaryByPartition(cctx.localNode(), e.partition(), AffinityTopologyVersion.NONE); if (cctx.isReplicated() || primary) { boolean recordIgniteEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 18c3011..b07a117 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1288,7 +1288,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement if (!skip && skipNonPrimary) { skip = e.cached().isNear() || e.cached().detached() || - !e.context().affinity().primary(e.cached().partition(), topologyVersion()).isLocal(); + !e.context().affinity().primaryByPartition(e.cached().partition(), topologyVersion()).isLocal(); } if (!skip && !local() && // Update local store at backups only if needed. @@ -1707,7 +1707,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement int part = cached != null ? cached.partition() : cacheCtx.affinity().partition(e.key()); - List<ClusterNode> affNodes = cacheCtx.affinity().nodes(part, topologyVersion()); + List<ClusterNode> affNodes = cacheCtx.affinity().nodesByPartition(part, topologyVersion()); e.locallyMapped(F.contains(affNodes, cctx.localNode())); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 3043ecc..9e2f321 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1101,7 +1101,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig * @return {@code True} if local node is current primary for given entry. */ private boolean primaryLocal(GridCacheEntryEx entry) { - return entry.context().affinity().primary(cctx.localNode(), entry.partition(), AffinityTopologyVersion.NONE); + return entry.context().affinity().primaryByPartition(cctx.localNode(), entry.partition(), AffinityTopologyVersion.NONE); } /** @@ -1389,7 +1389,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig finally { if (entry != null && readCommitted()) { if (cacheCtx.isNear()) { - if (cacheCtx.affinity().belongs(cacheCtx.localNode(), entry.partition(), topVer)) { + if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) { if (entry.markObsolete(xidVer)) cacheCtx.cache().removeEntry(entry); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java index 70d938e..67d00ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java @@ -401,7 +401,7 @@ public class TxDeadlockDetection { private UUID primary(IgniteTxKey txKey) { GridCacheContext ctx = cctx.cacheContext(txKey.cacheId()); - ClusterNode node = ctx.affinity().primary(txKey.key(), topVer); + ClusterNode node = ctx.affinity().primaryByKey(txKey.key(), topVer); assert node != null : topVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index bb9ffdd..56ab235 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -1945,7 +1945,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed expiryTime = CU.toExpireTime(ttl); } - boolean primary = cctx.affinity().primary(cctx.localNode(), entry.key(), topVer); + boolean primary = cctx.affinity().primaryByKey(cctx.localNode(), entry.key(), topVer); entry.initialValue(e.getValue(), ver, http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 70232af..d5f277e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -496,7 +496,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite Collection<ClusterNode> nodes; if (collocated) { - List<ClusterNode> nodes0 = ctx.affinity().nodes(hdrPart, topVer); + List<ClusterNode> nodes0 = ctx.affinity().nodesByPartition(hdrPart, topVer); nodes = !nodes0.isEmpty() ? Collections.singleton(nodes0.contains(ctx.localNode()) ? ctx.localNode() : F.first(nodes0)) : nodes0; http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridSetQueryPredicate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridSetQueryPredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridSetQueryPredicate.java index e8b2cc7..bc6c182 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridSetQueryPredicate.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridSetQueryPredicate.java @@ -91,7 +91,7 @@ public class GridSetQueryPredicate<K, V> implements IgniteBiPredicate<K, V>, Ext /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public boolean apply(K k, V v) { - return !filter || ctx.affinity().primary(ctx.localNode(), k, ctx.affinity().affinityTopologyVersion()); + return !filter || ctx.affinity().primaryByKey(ctx.localNode(), k, ctx.affinity().affinityTopologyVersion()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index ea9cbd7..2b6699d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -1568,7 +1568,7 @@ public class GridJobProcessor extends GridProcessorAdapter { } } finally { - if (checkPartMapping && !cctx.affinity().primary(partId, topVer).id().equals(ctx.localNodeId())) + if (checkPartMapping && !cctx.affinity().primaryByPartition(partId, topVer).id().equals(ctx.localNodeId())) throw new IgniteException("Failed partition reservation. " + "Partition is not primary on the node. [partition=" + partId + ", cacheName=" + cctx.name() + ", nodeId=" + ctx.localNodeId() + ", topology=" + topVer + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java index 92e2b9b..b0337d6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java @@ -214,12 +214,12 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { ClusterNode loc = ignite.cluster().localNode(); - if (loc.equals(aff.primary(key, topVer))) + if (loc.equals(aff.primaryByKey(key, topVer))) return true; AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer.topologyVersion() + 1, 0); - assertEquals(loc, aff.primary(key, topVer0)); + assertEquals(loc, aff.primaryByKey(key, topVer0)); } return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 56341bd..7a0b713 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -5875,7 +5875,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract int size = 0; for (String key : keys) { - if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) { + if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) { GridCacheEntryEx e = ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key); @@ -5911,7 +5911,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract int size = 0; for (String key : map.keySet()) - if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) + if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) size++; assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL)); @@ -6154,7 +6154,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract int size = 0; for (String key : keys) - if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) + if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) size++; assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(null).localSize(ALL)); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d5adcb7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java index 6b0e193..d4449f9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java @@ -5548,7 +5548,7 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar int size = 0; for (String key : keys) { - if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) { + if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) { GridCacheEntryEx e = ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key); @@ -5589,7 +5589,7 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar int size = 0; for (String key : map.keySet()) - if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) + if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) size++; assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL)); @@ -5850,7 +5850,7 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar int size = 0; for (String key : keys) - if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) + if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) size++; assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(cacheName).localSize(ALL));
