http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index e63379a..a96cc43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -33,6 +33,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -45,7 +46,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -107,6 +107,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** */ private final Object similarAffKey; + /** */ + private volatile DiscoCache discoCache; + /** * @param cctx Context. * @param cacheId Cache ID. @@ -125,6 +128,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { topVer = exchFut.topologyVersion(); + discoCache = exchFut.discoCache(); + log = cctx.logger(getClass()); lock.writeLock().lock(); @@ -193,6 +198,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { this.stopping = stopping; topVer = exchId.topologyVersion(); + discoCache = exchFut.discoCache(); updateSeq.setIfGreater(updSeq); @@ -273,7 +279,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); assert oldest != null; @@ -432,7 +438,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (!F.isEmpty(nodeIds)) { for (UUID nodeId : nodeIds) { - ClusterNode n = cctx.discovery().node(nodeId); + ClusterNode n = discoCache.node(nodeId); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { if (nodes == null) @@ -458,7 +464,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { * @return List of nodes for the partition. */ private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null; + Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.allNodesWithCaches()) : null; lock.readLock().lock(); @@ -481,7 +487,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { continue; if (hasState(p, id, state, states)) { - ClusterNode n = cctx.discovery().node(id); + ClusterNode n = discoCache.node(id); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) nodes.add(n); @@ -792,7 +798,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId.equals(cctx.localNodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); // If this node became the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -842,7 +848,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId != null; assert lock.writeLock().isHeldByCurrentThread(); - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); ClusterNode loc = cctx.localNode();
http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index ab8e863..7dcfcf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -29,6 +29,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -72,16 +73,18 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin * @param ctx Context. * @param cacheName Cache name. * @param topVer Topology version. + * @param discoCache Discovery cache. */ public GridDhtAssignmentFetchFuture( GridCacheSharedContext ctx, String cacheName, - AffinityTopologyVersion topVer + AffinityTopologyVersion topVer, + DiscoCache discoCache ) { this.ctx = ctx; this.key = new T2<>(CU.cacheId(cacheName), topVer); - Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer); + Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheName); LinkedList<ClusterNode> tmp = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 553aa2a..36c1ae5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -37,6 +37,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.ClusterState; @@ -98,6 +99,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** */ private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; + /** Discovery cache. */ + private volatile DiscoCache discoCache; + /** */ private volatile boolean stopping; @@ -162,6 +166,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh rebalancedTopVer = AffinityTopologyVersion.NONE; topVer = AffinityTopologyVersion.NONE; + + discoCache = cctx.discovery().discoCache(); } finally { lock.writeLock().unlock(); @@ -219,6 +225,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh rebalancedTopVer = AffinityTopologyVersion.NONE; topVer = exchId.topologyVersion(); + + discoCache = exchFut.discoCache(); } finally { lock.writeLock().unlock(); @@ -281,7 +289,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { ClusterNode loc = cctx.localNode(); - ClusterNode oldest = currentCoordinator(); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); @@ -430,7 +438,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (exchId.isLeft()) removeNode(exchId.nodeId()); - ClusterNode oldest = currentCoordinator(); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); @@ -875,7 +883,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null; + Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null; lock.readLock().lock(); @@ -1599,7 +1607,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh List<ClusterNode> affNodes = aff.get(p); if (!affNodes.contains(cctx.localNode())) { - Collection<UUID> nodeIds = F.nodeIds(nodes(p, topVer, OWNING)); + List<ClusterNode> nodes = nodes(p, topVer, OWNING); + Collection<UUID> nodeIds = F.nodeIds(nodes); // If all affinity nodes are owners, then evict partition from local node. if (nodeIds.containsAll(F.nodeIds(affNodes))) { @@ -1619,15 +1628,13 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh int affCnt = affNodes.size(); if (ownerCnt > affCnt) { - List<ClusterNode> sorted = new ArrayList<>(cctx.discovery().nodes(nodeIds)); - // Sort by node orders in ascending order. - Collections.sort(sorted, CU.nodeComparator(true)); + Collections.sort(nodes, CU.nodeComparator(true)); - int diff = sorted.size() - affCnt; + int diff = nodes.size() - affCnt; for (int i = 0; i < diff; i++) { - ClusterNode n = sorted.get(i); + ClusterNode n = nodes.get(i); if (locId.equals(n.id())) { part.reload(false); @@ -1655,17 +1662,6 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } /** - * @return Current coordinator node. - */ - @Nullable private ClusterNode currentCoordinator() { - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); - - assert oldest != null || cctx.kernalContext().clientNode(); - - return oldest; - } - - /** * Updates value for single partition. * * @param p Partition. @@ -1675,7 +1671,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) { - ClusterNode oldest = currentCoordinator(); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); assert oldest != null || cctx.kernalContext().clientNode(); @@ -1742,7 +1738,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh private void removeNode(UUID nodeId) { assert nodeId != null; - ClusterNode oldest = CU.oldest(cctx.discovery().serverNodes(topVer)); + ClusterNode oldest = discoCache.oldestAliveServerNode(); assert oldest != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 6cba4f4..b3f3d4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -51,6 +51,7 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapsh import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperation; import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperationType; import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -121,6 +122,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** Dummy reassign flag. */ private final boolean reassign; + /** */ + @GridToStringExclude + private volatile DiscoCache discoCache; + /** Discovery event. */ private volatile DiscoveryEvent discoEvt; @@ -161,9 +166,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** */ private boolean init; - /** Topology snapshot. */ - private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>(); - /** Last committed cache version before next topology version use. */ private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>(); @@ -386,6 +388,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @return Discovery cache. + */ + public DiscoCache discoCache() { + return discoCache; + } + + /** * @param cacheId Cache ID to check. * @param topVer Topology version. * @return {@code True} if cache was added during this exchange. @@ -428,11 +437,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * * @param exchId Exchange ID. * @param discoEvt Discovery event. + * @param discoCache Discovery data cache. */ - public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt) { + public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt, DiscoCache discoCache) { assert exchId.equals(this.exchId); this.discoEvt = discoEvt; + this.discoCache = discoCache; evtLatch.countDown(); } @@ -512,7 +523,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT try { AffinityTopologyVersion topVer = topologyVersion(); - srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topVer)); + discoCache.updateAlives(cctx.discovery()); + + srvNodes = new ArrayList<>(discoCache.serverNodes()); remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId())))); @@ -1017,7 +1030,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT List<String> cachesWithoutNodes = null; for (String name : cctx.cache().cacheNames()) { - if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) { + if (discoCache.cacheAffinityNodes(name).isEmpty()) { if (cachesWithoutNodes == null) cachesWithoutNodes = new ArrayList<>(); @@ -1446,7 +1459,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * Cleans up resources to avoid excessive memory usage. */ public void cleanUp() { - topSnapshot.set(null); singleMsgs.clear(); fullMsgs.clear(); msgs.clear(); @@ -1732,7 +1744,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert partHistSuppliers.isEmpty(); - if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) { + if (!crd.equals(discoCache.serverNodes().get(0))) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) cacheCtx.topology().beforeExchange(this, !centralizedAff); @@ -2090,6 +2102,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT ClusterNode crd0; + discoCache.updateAlives(node); + synchronized (mux) { if (!srvNodes.remove(node)) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index f1be372..94cf6e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -48,7 +48,6 @@ import org.apache.ignite.compute.ComputeJobContext; import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridClosureCallMode; import org.apache.ignite.internal.GridKernalContext; @@ -58,8 +57,9 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -168,7 +168,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite private IgniteInternalCache<Object, Object> cache; /** Topology listener. */ - private GridLocalEventListener topLsnr = new TopologyListener(); + private DiscoveryEventListener topLsnr = new TopologyListener(); static { Set<IgniteProductVersion> versions = new TreeSet<>(new Comparator<IgniteProductVersion>() { @@ -250,7 +250,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite cache = ctx.cache().utilityCache(); if (!ctx.clientNode()) - ctx.event().addLocalEventListener(topLsnr, EVTS); + ctx.event().addDiscoveryEventListener(topLsnr, EVTS); try { if (ctx.deploy().enabled()) @@ -312,7 +312,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite return; if (!ctx.clientNode()) - ctx.event().removeLocalEventListener(topLsnr); + ctx.event().removeDiscoveryEventListener(topLsnr); Collection<ServiceContextImpl> ctxs = new ArrayList<>(); @@ -1588,9 +1588,9 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite /** * Topology listener. */ - private class TopologyListener implements GridLocalEventListener { + private class TopologyListener implements DiscoveryEventListener { /** {@inheritDoc} */ - @Override public void onEvent(Event evt) { + @Override public void onEvent(DiscoveryEvent evt, final DiscoCache discoCache) { if (!busyLock.enterBusy()) return; @@ -1612,11 +1612,14 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } } else - topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0); + topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0); depExe.execute(new BusyRunnable() { @Override public void run0() { - ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer); + // In case the cache instance isn't tracked by DiscoveryManager anymore. + discoCache.updateAlives(ctx.discovery()); + + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); if (oldest != null && oldest.isLocal()) { final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java index 31b4bc7..69573d6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java @@ -222,20 +222,6 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe long startVer = discoMgr.localNode().order(); for (long v = currVer; v > currVer - GridDiscoveryManager.DISCOVERY_HISTORY_SIZE && v >= startVer; v--) { - F.forAll(discoMgr.aliveCacheNodes(null, new AffinityTopologyVersion(v)), - new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { - return currTop.contains(e); - } - }); - - F.forAll(discoMgr.aliveRemoteCacheNodes(null, new AffinityTopologyVersion(v)), - new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { - return currTop.contains(e) || g.cluster().localNode().equals(e); - } - }); - GridCacheSharedContext<?, ?> ctx = k.context().cache().context(); ClusterNode oldest =
