ignite-4779 Missed discovery data snapshot during exchange processing (do not use discovery manager cache to handle exchange)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a61a98ad Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a61a98ad Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a61a98ad Branch: refs/heads/ignite-4768 Commit: a61a98ad3908770b77d0ffb071effbc92f4d5c5a Parents: 2d385c5 Author: Igor Seliverstov <gvvinbl...@gmail.com> Authored: Thu Mar 9 21:01:41 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Mar 9 21:01:41 2017 +0300 ---------------------------------------------------------------------- .../internal/managers/discovery/DiscoCache.java | 310 +++++++++++ .../discovery/GridDiscoveryManager.java | 550 ++++++------------- .../eventstorage/DiscoveryEventListener.java | 33 ++ .../eventstorage/GridEventStorageManager.java | 162 +++++- .../affinity/GridAffinityAssignmentCache.java | 7 +- .../cache/CacheAffinitySharedManager.java | 35 +- .../cache/GridCacheAffinityManager.java | 3 +- .../GridCachePartitionExchangeManager.java | 64 ++- .../dht/GridClientPartitionTopology.java | 20 +- .../dht/GridDhtAssignmentFetchFuture.java | 7 +- .../dht/GridDhtPartitionTopologyImpl.java | 44 +- .../GridDhtPartitionsExchangeFuture.java | 33 +- .../service/GridServiceProcessor.java | 21 +- 13 files changed, 806 insertions(+), 483 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java new file mode 100644 index 0000000..5247ac1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class DiscoCache { + /** Local node. */ + private final ClusterNode loc; + + /** Remote nodes. */ + private final List<ClusterNode> rmtNodes; + + /** All nodes. */ + private final List<ClusterNode> allNodes; + + /** All server nodes. */ + private final List<ClusterNode> srvNodes; + + /** Daemon nodes. */ + private final List<ClusterNode> daemonNodes; + + /** All server nodes. */ + private final List<ClusterNode> srvNodesWithCaches; + + /** All nodes with at least one cache configured. */ + @GridToStringInclude + private final List<ClusterNode> allNodesWithCaches; + + /** All remote nodes with at least one cache configured. */ + @GridToStringInclude + private final List<ClusterNode> rmtNodesWithCaches; + + /** Cache nodes by cache name. */ + @GridToStringInclude + private final Map<Integer, List<ClusterNode>> allCacheNodes; + + /** Affinity cache nodes by cache name. */ + @GridToStringInclude + private final Map<Integer, List<ClusterNode>> affCacheNodes; + + /** Node map. */ + private final Map<UUID, ClusterNode> nodeMap; + + /** Caches where at least one node has near cache enabled. */ + @GridToStringInclude + private final Set<Integer> nearEnabledCaches; + + /** Alive nodes. */ + private final Set<UUID> alives = new GridConcurrentHashSet<>(); + + /** + * @param loc Local node. + * @param rmtNodes Remote nodes. + * @param allNodes All nodes. + * @param srvNodes Server nodes. + * @param daemonNodes Daemon nodes. + * @param srvNodesWithCaches Server nodes with at least one cache configured. + * @param allNodesWithCaches All nodes with at least one cache configured. + * @param rmtNodesWithCaches Remote nodes with at least one cache configured. + * @param allCacheNodes Cache nodes by cache name. + * @param affCacheNodes Affinity cache nodes by cache name. + * @param nodeMap Node map. + * @param nearEnabledCaches Caches where at least one node has near cache enabled. + * @param alives Alive nodes. + */ + DiscoCache(ClusterNode loc, + List<ClusterNode> rmtNodes, + List<ClusterNode> allNodes, + List<ClusterNode> srvNodes, + List<ClusterNode> daemonNodes, + List<ClusterNode> srvNodesWithCaches, + List<ClusterNode> allNodesWithCaches, + List<ClusterNode> rmtNodesWithCaches, + Map<Integer, List<ClusterNode>> allCacheNodes, + Map<Integer, List<ClusterNode>> affCacheNodes, + Map<UUID, ClusterNode> nodeMap, + Set<Integer> nearEnabledCaches, + Set<UUID> alives) { + this.loc = loc; + this.rmtNodes = rmtNodes; + this.allNodes = allNodes; + this.srvNodes = srvNodes; + this.daemonNodes = daemonNodes; + this.srvNodesWithCaches = srvNodesWithCaches; + this.allNodesWithCaches = allNodesWithCaches; + this.rmtNodesWithCaches = rmtNodesWithCaches; + this.allCacheNodes = allCacheNodes; + this.affCacheNodes = affCacheNodes; + this.nodeMap = nodeMap; + this.nearEnabledCaches = nearEnabledCaches; + this.alives.addAll(alives); + } + + /** @return Local node. */ + public ClusterNode localNode() { + return loc; + } + + /** @return Remote nodes. */ + public List<ClusterNode> remoteNodes() { + return rmtNodes; + } + + /** @return All nodes. */ + public List<ClusterNode> allNodes() { + return allNodes; + } + + /** @return Server nodes. */ + public List<ClusterNode> serverNodes() { + return srvNodes; + } + + /** @return Daemon nodes. */ + public List<ClusterNode> daemonNodes() { + return daemonNodes; + } + + /** @return Server nodes with at least one cache configured. */ + public List<ClusterNode> serverNodesWithCaches() { + return srvNodesWithCaches; + } + + /** + * Gets all remote nodes that have at least one cache configured. + * + * @return Collection of nodes. + */ + public List<ClusterNode> remoteNodesWithCaches() { + return rmtNodesWithCaches; + } + + /** + * Gets collection of nodes with at least one cache configured. + * + * @return Collection of nodes. + */ + public List<ClusterNode> allNodesWithCaches() { + return allNodesWithCaches; + } + + /** + * Gets collection of server nodes with at least one cache configured. + * + * @return Collection of nodes. + */ + public Collection<ClusterNode> aliveServerNodes() { + return F.view(serverNodes(), new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return alives.contains(node.id()); + } + }); + } + + /** + * Gets collection of server nodes with at least one cache configured. + * + * @return Collection of nodes. + */ + public Collection<ClusterNode> aliveServerNodesWithCaches() { + return F.view(serverNodesWithCaches(), new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return alives.contains(node.id()); + } + }); + } + + /** + * @return Oldest alive server node. + */ + public @Nullable ClusterNode oldestAliveServerNode(){ + Iterator<ClusterNode> it = aliveServerNodes().iterator(); + return it.hasNext() ? it.next() : null; + } + + /** + * @return Oldest alive server node with at least one cache configured. + */ + public @Nullable ClusterNode oldestAliveServerNodeWithCache(){ + Iterator<ClusterNode> it = aliveServerNodesWithCaches().iterator(); + return it.hasNext() ? it.next() : null; + } + + /** + * Gets all nodes that have cache with given name. + * + * @param cacheName Cache name. + * @return Collection of nodes. + */ + public List<ClusterNode> cacheNodes(@Nullable String cacheName) { + return cacheNodes(CU.cacheId(cacheName)); + } + + /** + * Gets all nodes that have cache with given ID. + * + * @param cacheId Cache ID. + * @return Collection of nodes. + */ + public List<ClusterNode> cacheNodes(Integer cacheId) { + return emptyIfNull(allCacheNodes.get(cacheId)); + } + + /** + * Gets all nodes that have cache with given ID and should participate in affinity calculation. With + * partitioned cache nodes with near-only cache do not participate in affinity node calculation. + * + * @param cacheName Cache name. + * @return Collection of nodes. + */ + public List<ClusterNode> cacheAffinityNodes(@Nullable String cacheName) { + return cacheAffinityNodes(CU.cacheId(cacheName)); + } + + /** + * Gets all nodes that have cache with given ID and should participate in affinity calculation. With + * partitioned cache nodes with near-only cache do not participate in affinity node calculation. + * + * @param cacheId Cache ID. + * @return Collection of nodes. + */ + public List<ClusterNode> cacheAffinityNodes(int cacheId) { + return emptyIfNull(affCacheNodes.get(cacheId)); + } + + /** + * Checks if cache with given ID has at least one node with near cache enabled. + * + * @param cacheId Cache ID. + * @return {@code True} if cache with given name has at least one node with near cache enabled. + */ + public boolean hasNearCache(int cacheId) { + return nearEnabledCaches.contains(cacheId); + } + + /** + * @param id Node ID. + * @return Node. + */ + public @Nullable ClusterNode node(UUID id) { + return nodeMap.get(id); + } + + /** + * Removes left node from alives lists. + * + * @param rmvd Removed node. + */ + public void updateAlives(ClusterNode rmvd) { + alives.remove(rmvd.id()); + } + + /** + * Removes left nodes from cached alives lists. + * + * @param discovery Discovery manager. + */ + public void updateAlives(GridDiscoveryManager discovery) { + for (UUID alive : alives) { + if (!discovery.alive(alive)) + alives.remove(alive); + } + } + + /** + * @param nodes Cluster nodes. + * @return Empty collection if nodes list is {@code null} + */ + private List<ClusterNode> emptyIfNull(List<ClusterNode> nodes) { + return nodes == null ? Collections.<ClusterNode>emptyList() : nodes; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DiscoCache.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 9ea707d..db9638d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -34,14 +34,12 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NavigableMap; import java.util.Set; -import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -79,19 +77,17 @@ import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; -import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridTuple5; +import org.apache.ignite.internal.util.lang.GridTuple6; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -99,7 +95,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.plugin.segmentation.SegmentationPolicy; @@ -113,6 +108,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -230,7 +226,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private long segChkFreq; /** Local node join to topology event. */ - private GridFutureAdapter<DiscoveryEvent> locJoinEvt = new GridFutureAdapter<>(); + private GridFutureAdapter<T2<DiscoveryEvent, DiscoCache>> locJoin = new GridFutureAdapter<>(); /** GC CPU load. */ private volatile double gcCpuLoad; @@ -539,20 +535,25 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } + final DiscoCache discoCache; + // Put topology snapshot into discovery history. // There is no race possible between history maintenance and concurrent discovery // event notifications, since SPI notifies manager about all events from this listener. if (verChanged) { - DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id()))); + discoCache = createDiscoCache(locNode, topSnapshot); - discoCacheHist.put(nextTopVer, cache); + discoCacheHist.put(nextTopVer, discoCache); - boolean set = updateTopologyVersionIfGreater(nextTopVer, cache); + boolean set = updateTopologyVersionIfGreater(nextTopVer, discoCache); assert set || topVer == 0 : "Topology version has not been updated [this.topVer=" + topSnap + ", topVer=" + topVer + ", node=" + node + ", evt=" + U.gridEventName(type) + ']'; } + else + // Current version. + discoCache = discoCache(); // If this is a local join event, just save it and do not notify listeners. if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) { @@ -560,7 +561,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { gridStartTime = getSpi().getGridStartTime(); updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order()), - new DiscoCache(localNode(), F.view(topSnapshot, F.remoteNodes(locNode.id())))); + discoCache); startLatch.countDown(); @@ -570,14 +571,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { discoEvt.eventNode(node); discoEvt.type(EVT_NODE_JOINED); - discoEvt.topologySnapshot(topVer, new ArrayList<>( - F.viewReadOnly(topSnapshot, new C1<ClusterNode, ClusterNode>() { - @Override public ClusterNode apply(ClusterNode e) { - return e; - } - }, FILTER_DAEMON))); + discoEvt.topologySnapshot(topVer, new ArrayList<>(F.view(topSnapshot, FILTER_DAEMON))); - locJoinEvt.onDone(discoEvt); + locJoin.onDone(new T2<>(discoEvt, discoCache)); return; } @@ -592,7 +588,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ((IgniteKernal)ctx.grid()).onDisconnected(); - locJoinEvt = new GridFutureAdapter<>(); + locJoin = new GridFutureAdapter<>(); registeredCaches.clear(); @@ -605,7 +601,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { topHist.clear(); topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, - new DiscoCache(locNode, Collections.<ClusterNode>emptySet()))); + createDiscoCache(locNode, Collections.<ClusterNode>emptySet()))); } else if (type == EVT_CLIENT_NODE_RECONNECTED) { assert locNode.isClient() : locNode; @@ -622,7 +618,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { try { fut.get(); - discoWrk.addEvent(type, nextTopVer, node, topSnapshot, null); + discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, null); } catch (IgniteException ignore) { // No-op. @@ -634,7 +630,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } if (type == EVT_CLIENT_NODE_DISCONNECTED || type == EVT_NODE_SEGMENTED || !ctx.clientDisconnected()) - discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg); + discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, customMsg); } }); @@ -1330,8 +1326,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { U.join(segChkThread, log); } - if (!locJoinEvt.isDone()) - locJoinEvt.onDone( + if (!locJoin.isDone()) + locJoin.onDone( new IgniteCheckedException("Failed to wait for local node joined event (grid is stopping).")); } @@ -1525,7 +1521,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * * @return Discovery collection cache. */ - private DiscoCache discoCache() { + public DiscoCache discoCache() { Snapshot cur = topSnap.get(); assert cur != null; @@ -1582,7 +1578,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return All server nodes for given topology version. */ public List<ClusterNode> serverNodes(AffinityTopologyVersion topVer) { - return resolveDiscoCache(CU.cacheId(null), topVer).srvNodes; + return resolveDiscoCache(CU.cacheId(null), topVer).serverNodes(); } /** @@ -1604,7 +1600,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of cache nodes. */ public Collection<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { - return resolveDiscoCache(CU.cacheId(cacheName), topVer).cacheNodes(cacheName, topVer.topologyVersion()); + return resolveDiscoCache(CU.cacheId(cacheName), topVer).cacheNodes(cacheName); } /** @@ -1615,7 +1611,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of cache nodes. */ public Collection<ClusterNode> cacheNodes(int cacheId, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheId, topVer).cacheNodes(cacheId, topVer.topologyVersion()); + return resolveDiscoCache(cacheId, topVer).cacheNodes(cacheId); } /** @@ -1625,7 +1621,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of cache nodes. */ public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) { - return resolveDiscoCache(CU.cacheId(null), topVer).allNodesWithCaches(topVer.topologyVersion()); + return resolveDiscoCache(CU.cacheId(null), topVer).allNodesWithCaches(); } /** @@ -1635,7 +1631,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of cache nodes. */ public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) { - return resolveDiscoCache(CU.cacheId(null), topVer).remoteCacheNodes(topVer.topologyVersion()); + return resolveDiscoCache(CU.cacheId(null), topVer).remoteNodesWithCaches(); } /** @@ -1643,11 +1639,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Oldest alive server nodes with at least one cache configured. */ @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) { - DiscoCache cache = resolveDiscoCache(CU.cacheId(null), topVer); - - Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry(); - - return e != null ? e.getKey() : null; + return resolveDiscoCache(CU.cacheId(null), topVer).oldestAliveServerNodeWithCache(); } /** @@ -1660,7 +1652,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { public Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { int cacheId = CU.cacheId(cacheName); - return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId, topVer.topologyVersion()); + return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId); } /** @@ -1671,7 +1663,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of cache affinity nodes. */ public Collection<ClusterNode> cacheAffinityNodes(int cacheId, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId, topVer.topologyVersion()); + return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId); } /** @@ -1819,7 +1811,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** @return Event that represents a local node joined to topology. */ public DiscoveryEvent localJoinEvent() { try { - return locJoinEvt.get(); + return locJoin.get().get1(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * @return Tuple that consists of a local join event and discovery cache at the join time. + */ + public T2<DiscoveryEvent, DiscoCache> localJoin() { + try { + return locJoin.get(); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -1893,6 +1897,114 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * @param loc Local node. + * @param topSnapshot Topology snapshot. + * @return Newly created discovery cache. + */ + @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) { + HashSet<UUID> alives = U.newHashSet(topSnapshot.size()); + HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size()); + + ArrayList<ClusterNode> daemonNodes = new ArrayList<>(topSnapshot.size()); + ArrayList<ClusterNode> srvNodes = new ArrayList<>(topSnapshot.size()); + ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size()); + ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size()); + + for (ClusterNode node : topSnapshot) { + if (alive(node)) + alives.add(node.id()); + + if (node.isDaemon()) + daemonNodes.add(node); + else { + allNodes.add(node); + + if (!node.isLocal()) + rmtNodes.add(node); + + if (!CU.clientNode(node)) + srvNodes.add(node); + } + + nodeMap.put(node.id(), node); + } + + assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" + + " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']'; + + Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size()); + Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size()); + + Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + + Set<Integer> nearEnabledCaches = new HashSet<>(); + + for (ClusterNode node : allNodes) { + assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']'; + assert !node.isDaemon(); + + for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { + String cacheName = entry.getKey(); + CachePredicate filter = entry.getValue(); + + if (filter.cacheNode(node)) { + allNodesWithCaches.add(node); + + if(!CU.clientNode(node)) + srvNodesWithCaches.add(node); + + if (!node.isLocal()) + rmtNodesWithCaches.add(node); + + addToMap(allCacheNodes, cacheName, node); + + if (filter.dataNode(node)) + addToMap(affCacheNodes, cacheName, node); + + if (filter.nearNode(node)) + nearEnabledCaches.add(CU.cacheId(cacheName)); + } + } + } + + return new DiscoCache( + loc, + Collections.unmodifiableList(rmtNodes), + Collections.unmodifiableList(allNodes), + Collections.unmodifiableList(srvNodes), + Collections.unmodifiableList(daemonNodes), + U.sealList(srvNodesWithCaches), + U.sealList(allNodesWithCaches), + U.sealList(rmtNodesWithCaches), + Collections.unmodifiableMap(allCacheNodes), + Collections.unmodifiableMap(affCacheNodes), + Collections.unmodifiableMap(nodeMap), + Collections.unmodifiableSet(nearEnabledCaches), + alives); + } + + /** + * Adds node to map. + * + * @param cacheMap Map to add to. + * @param cacheName Cache name. + * @param rich Node to add + */ + private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) { + List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName)); + + if (cacheNodes == null) { + cacheNodes = new ArrayList<>(); + + cacheMap.put(CU.cacheId(cacheName), cacheNodes); + } + + cacheNodes.add(rich); + } + + /** * Updates topology version if current version is smaller than updated. * * @param updated Updated topology version. @@ -1993,8 +2105,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { lastChk = now; if (!segValid) { - discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, getSpi().getLocalNode(), - Collections.<ClusterNode>emptyList(), null); + List<ClusterNode> empty = Collections.emptyList(); + + ClusterNode node = getSpi().getLocalNode(); + + discoWrk.addEvent(EVT_NODE_SEGMENTED, + AffinityTopologyVersion.NONE, + node, + createDiscoCache(node, empty), + empty, + null); lastSegChkRes.set(false); } @@ -2014,8 +2134,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Worker for discovery events. */ private class DiscoveryWorker extends GridWorker { /** Event queue. */ - private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, - DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>(); + private final BlockingQueue<GridTuple6<Integer, AffinityTopologyVersion, ClusterNode, + DiscoCache, Collection<ClusterNode>, DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>(); /** Node segmented event fired flag. */ private boolean nodeSegFired; @@ -2033,10 +2153,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param type Discovery event type. See {@link DiscoveryEvent} for more details. * @param topVer Topology version. * @param node Remote node this event is connected with. + * @param discoCache Discovery cache. * @param topSnapshot Topology snapshot. */ @SuppressWarnings("RedundantTypeArguments") - private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) { + private void recordEvent(int type, long topVer, ClusterNode node, DiscoCache discoCache, Collection<ClusterNode> topSnapshot) { assert node != null; if (ctx.event().isRecordable(type)) { @@ -2045,7 +2166,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { evt.node(ctx.discovery().localNode()); evt.eventNode(node); evt.type(type); - evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, FILTER_DAEMON)); if (type == EVT_NODE_METRICS_UPDATED) @@ -2072,7 +2192,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { else assert false; - ctx.event().record(evt); + ctx.event().record(evt, discoCache); } } @@ -2080,6 +2200,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param type Event type. * @param topVer Topology version. * @param node Node. + * @param discoCache Discovery cache. * @param topSnapshot Topology snapshot. * @param data Custom message. */ @@ -2087,12 +2208,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { int type, AffinityTopologyVersion topVer, ClusterNode node, + DiscoCache discoCache, Collection<ClusterNode> topSnapshot, @Nullable DiscoveryCustomMessage data ) { assert node != null : data; - evts.add(new GridTuple5<>(type, topVer, node, topSnapshot, data)); + evts.add(new GridTuple6<>(type, topVer, node, discoCache, topSnapshot, data)); } /** {@inheritDoc} */ @@ -2116,7 +2238,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** @throws InterruptedException If interrupted. */ @SuppressWarnings("DuplicateCondition") private void body0() throws InterruptedException { - GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, + GridTuple6<Integer, AffinityTopologyVersion, ClusterNode, DiscoCache, Collection<ClusterNode>, DiscoveryCustomMessage> evt = evts.take(); int type = evt.get1(); @@ -2249,11 +2371,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { customEvt.node(ctx.discovery().localNode()); customEvt.eventNode(node); customEvt.type(type); - customEvt.topologySnapshot(topVer.topologyVersion(), evt.get4()); + customEvt.topologySnapshot(topVer.topologyVersion(), evt.get5()); customEvt.affinityTopologyVersion(topVer); - customEvt.customMessage(evt.get5()); + customEvt.customMessage(evt.get6()); - ctx.event().record(customEvt); + ctx.event().record(customEvt, evt.get4()); } return; @@ -2267,7 +2389,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { assert false : "Invalid discovery event: " + type; } - recordEvent(type, topVer.topologyVersion(), node, evt.get4()); + recordEvent(type, topVer.topologyVersion(), node, evt.get4(), evt.get5()); if (segmented) onSegmentation(); @@ -2472,328 +2594,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - /** Cache for discovery collections. */ - private class DiscoCache { - /** Remote nodes. */ - private final List<ClusterNode> rmtNodes; - - /** All nodes. */ - private final List<ClusterNode> allNodes; - - /** All server nodes. */ - private final List<ClusterNode> srvNodes; - - /** All nodes with at least one cache configured. */ - @GridToStringInclude - private final Collection<ClusterNode> allNodesWithCaches; - - /** All nodes with at least one cache configured. */ - @GridToStringInclude - private final Collection<ClusterNode> rmtNodesWithCaches; - - /** Cache nodes by cache name. */ - @GridToStringInclude - private final Map<Integer, Collection<ClusterNode>> allCacheNodes; - - /** Cache nodes by cache name. */ - @GridToStringInclude - private final Map<Integer, Collection<ClusterNode>> affCacheNodes; - - /** Caches where at least one node has near cache enabled. */ - @GridToStringInclude - private final Set<Integer> nearEnabledCaches; - - /** Nodes grouped by version. */ - private final NavigableMap<IgniteProductVersion, Collection<ClusterNode>> nodesByVer; - - /** Daemon nodes. */ - private final List<ClusterNode> daemonNodes; - - /** Node map. */ - private final Map<UUID, ClusterNode> nodeMap; - - /** Local node. */ - private final ClusterNode loc; - - /** Highest node order. */ - private final long maxOrder; - - /** - * Cached alive server remote nodes with caches. - */ - private final ConcurrentSkipListMap<ClusterNode, Boolean> aliveSrvNodesWithCaches; - - /** - * @param loc Local node. - * @param rmts Remote nodes. - */ - private DiscoCache(ClusterNode loc, Collection<ClusterNode> rmts) { - this.loc = loc; - - rmtNodes = Collections.unmodifiableList(new ArrayList<>(F.view(rmts, FILTER_DAEMON))); - - assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" + - " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']'; - - List<ClusterNode> all = new ArrayList<>(rmtNodes.size() + 1); - - if (!loc.isDaemon()) - all.add(loc); - - all.addAll(rmtNodes); - - Collections.sort(all, GridNodeOrderComparator.INSTANCE); - - allNodes = Collections.unmodifiableList(all); - - Map<Integer, Collection<ClusterNode>> cacheMap = U.newHashMap(allNodes.size()); - Map<Integer, Collection<ClusterNode>> dhtNodesMap = U.newHashMap(allNodes.size()); - Collection<ClusterNode> nodesWithCaches = U.newHashSet(allNodes.size()); - Collection<ClusterNode> rmtNodesWithCaches = U.newHashSet(allNodes.size()); - - aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE); - nodesByVer = new TreeMap<>(); - - long maxOrder0 = 0; - - Set<Integer> nearEnabledSet = new HashSet<>(); - - List<ClusterNode> srvNodes = new ArrayList<>(); - - for (ClusterNode node : allNodes) { - assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']'; - assert !node.isDaemon(); - - if (!CU.clientNode(node)) - srvNodes.add(node); - - if (node.order() > maxOrder0) - maxOrder0 = node.order(); - - boolean hasCaches = false; - - for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { - String cacheName = entry.getKey(); - - CachePredicate filter = entry.getValue(); - - if (filter.cacheNode(node)) { - nodesWithCaches.add(node); - - if (!loc.id().equals(node.id())) - rmtNodesWithCaches.add(node); - - addToMap(cacheMap, cacheName, node); - - if (filter.dataNode(node)) - addToMap(dhtNodesMap, cacheName, node); - - if (filter.nearNode(node)) - nearEnabledSet.add(CU.cacheId(cacheName)); - - hasCaches = true; - } - } - - if (hasCaches && alive(node.id()) && !CU.clientNode(node)) - aliveSrvNodesWithCaches.put(node, Boolean.TRUE); - - IgniteProductVersion nodeVer = U.productVersion(node); - - // Create collection for this version if it does not exist. - Collection<ClusterNode> nodes = nodesByVer.get(nodeVer); - - if (nodes == null) { - nodes = new ArrayList<>(allNodes.size()); - - nodesByVer.put(nodeVer, nodes); - } - - nodes.add(node); - } - - Collections.sort(srvNodes, CU.nodeComparator(true)); - - // Need second iteration to add this node to all previous node versions. - for (ClusterNode node : allNodes) { - IgniteProductVersion nodeVer = U.productVersion(node); - - // Get all versions lower or equal node's version. - NavigableMap<IgniteProductVersion, Collection<ClusterNode>> updateView = - nodesByVer.headMap(nodeVer, false); - - for (Collection<ClusterNode> prevVersions : updateView.values()) - prevVersions.add(node); - } - - maxOrder = maxOrder0; - - allCacheNodes = Collections.unmodifiableMap(cacheMap); - affCacheNodes = Collections.unmodifiableMap(dhtNodesMap); - allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches); - this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches); - nearEnabledCaches = Collections.unmodifiableSet(nearEnabledSet); - this.srvNodes = Collections.unmodifiableList(srvNodes); - - daemonNodes = Collections.unmodifiableList(new ArrayList<>( - F.view(F.concat(false, loc, rmts), F0.not(FILTER_DAEMON)))); - - Map<UUID, ClusterNode> nodeMap = U.newHashMap(allNodes().size() + daemonNodes.size()); - - for (ClusterNode n : F.concat(false, allNodes(), daemonNodes())) - nodeMap.put(n.id(), n); - - this.nodeMap = nodeMap; - } - - /** - * Adds node to map. - * - * @param cacheMap Map to add to. - * @param cacheName Cache name. - * @param rich Node to add - */ - private void addToMap(Map<Integer, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) { - Collection<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName)); - - if (cacheNodes == null) { - cacheNodes = new ArrayList<>(allNodes.size()); - - cacheMap.put(CU.cacheId(cacheName), cacheNodes); - } - - cacheNodes.add(rich); - } - - /** @return Local node. */ - ClusterNode localNode() { - return loc; - } - - /** @return Remote nodes. */ - Collection<ClusterNode> remoteNodes() { - return rmtNodes; - } - - /** @return All nodes. */ - Collection<ClusterNode> allNodes() { - return allNodes; - } - - /** - * Gets collection of nodes with at least one cache configured. - * - * @param topVer Topology version (maximum allowed node order). - * @return Collection of nodes. - */ - Collection<ClusterNode> allNodesWithCaches(final long topVer) { - return filter(topVer, allNodesWithCaches); - } - - /** - * Gets all nodes that have cache with given name. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> cacheNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, allCacheNodes.get(CU.cacheId(cacheName))); - } - - /** - * Gets all nodes that have cache with given ID. - * - * @param cacheId Cache ID. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> cacheNodes(Integer cacheId, final long topVer) { - return filter(topVer, allCacheNodes.get(cacheId)); - } - - /** - * Gets all remote nodes that have at least one cache configured. - * - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> remoteCacheNodes(final long topVer) { - return filter(topVer, rmtNodesWithCaches); - } - - /** - * Gets all nodes that have cache with given ID and should participate in affinity calculation. With - * partitioned cache nodes with near-only cache do not participate in affinity node calculation. - * - * @param cacheId Cache ID. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> cacheAffinityNodes(int cacheId, final long topVer) { - return filter(topVer, affCacheNodes.get(cacheId)); - } - - /** - * Checks if cache with given ID has at least one node with near cache enabled. - * - * @param cacheId Cache ID. - * @return {@code True} if cache with given name has at least one node with near cache enabled. - */ - boolean hasNearCache(int cacheId) { - return nearEnabledCaches.contains(cacheId); - } - - /** - * Removes left node from cached alives lists. - * - * @param leftNode Left node. - */ - void updateAlives(ClusterNode leftNode) { - if (leftNode.order() > maxOrder) - return; - - aliveSrvNodesWithCaches.remove(leftNode); - } - - /** - * @param topVer Topology version. - * @param nodes Nodes. - * @return Filtered collection (potentially empty, but never {@code null}). - */ - private Collection<ClusterNode> filter(final long topVer, @Nullable Collection<ClusterNode> nodes) { - if (nodes == null) - return Collections.emptyList(); - - // If no filtering needed, return original collection. - return nodes.isEmpty() || topVer < 0 || topVer >= maxOrder ? - nodes : - F.view(nodes, new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return node.order() <= topVer; - } - }); - } - - /** @return Daemon nodes. */ - Collection<ClusterNode> daemonNodes() { - return daemonNodes; - } - - /** - * @param id Node ID. - * @return Node. - */ - @Nullable ClusterNode node(UUID id) { - return nodeMap.get(id); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DiscoCache.class, this, "allNodesWithDaemons", U.toShortString(allNodes)); - } - } - /** * Cache predicate. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java new file mode 100644 index 0000000..963d97e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.eventstorage; + +import java.util.EventListener; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.managers.discovery.DiscoCache; + +/** + * Internal listener for discovery events. + */ +public interface DiscoveryEventListener extends EventListener { + /** + * @param evt Discovery event. + * @param discoCache Discovery cache. + */ + public void onEvent(DiscoveryEvent evt, DiscoCache discoCache); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index b5d5ee2..5464a8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -79,6 +80,9 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> /** Local event listeners. */ private final ConcurrentMap<Integer, Set<GridLocalEventListener>> lsnrs = new ConcurrentHashMap8<>(); + /** Internal discovery listeners. */ + private final ConcurrentMap<Integer, Set<DiscoveryEventListener>> discoLsnrs = new ConcurrentHashMap8<>(); + /** Busy lock to control activity of threads. */ private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); @@ -248,6 +252,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> msgLsnr = null; lsnrs.clear(); + discoLsnrs.clear(); stopped = true; } @@ -320,6 +325,30 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** + * Records discovery events. + * + * @param evt Event to record. + * @param discoCache Discovery cache. + */ + public void record(DiscoveryEvent evt, DiscoCache discoCache) { + assert evt != null; + + if (!enterBusy()) + return; + + try { + // Notify internal discovery listeners first. + notifyDiscoveryListeners(evt, discoCache); + + // Notify all other registered listeners. + record(evt); + } + finally { + leaveBusy(); + } + } + + /** * Gets types of enabled user-recordable events. * * @return Array of types of enabled user-recordable events. @@ -590,7 +619,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> try { for (int t : types) { - getOrCreate(t).add(lsnr); + getOrCreate(lsnrs, t).add(lsnr); if (!isRecordable(t)) U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t)); @@ -615,14 +644,14 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> return; try { - getOrCreate(type).add(lsnr); + getOrCreate(lsnrs, type).add(lsnr); if (!isRecordable(type)) U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type)); if (types != null) { for (int t : types) { - getOrCreate(t).add(lsnr); + getOrCreate(lsnrs, t).add(lsnr); if (!isRecordable(t)) U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t)); @@ -635,16 +664,70 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** + * Adds discovery event listener. Note that this method specifically disallow an empty + * array of event type to prevent accidental subscription for all system event that + * may lead to a drastic performance decrease. + * + * @param lsnr Listener to add. + * @param types Event types to subscribe listener for. + */ + public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int[] types) { + assert lsnr != null; + assert types != null; + assert types.length > 0; + + if (!enterBusy()) + return; + + try { + for (int t : types) { + getOrCreate(discoLsnrs, t).add(lsnr); + } + } + finally { + leaveBusy(); + } + } + + /** + * Adds discovery event listener. + * + * @param lsnr Listener to add. + * @param type Event type to subscribe listener for. + * @param types Additional event types to subscribe listener for. + */ + public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int type, @Nullable int... types) { + assert lsnr != null; + + if (!enterBusy()) + return; + + try { + getOrCreate(discoLsnrs, type).add(lsnr); + + if (types != null) { + for (int t : types) { + getOrCreate(discoLsnrs, t).add(lsnr); + } + } + } + finally { + leaveBusy(); + } + } + + /** + * @param lsnrs Listeners map. * @param type Event type. * @return Listeners for given event type. */ - private Collection<GridLocalEventListener> getOrCreate(Integer type) { - Set<GridLocalEventListener> set = lsnrs.get(type); + private <T> Collection<T> getOrCreate(ConcurrentMap<Integer, Set<T>> lsnrs, Integer type) { + Set<T> set = lsnrs.get(type); if (set == null) { set = new GridConcurrentLinkedHashSet<>(); - Set<GridLocalEventListener> prev = lsnrs.putIfAbsent(type, set); + Set<T> prev = lsnrs.putIfAbsent(type, set); if (prev != null) set = prev; @@ -708,6 +791,38 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** + * Removes listener for specified events, if any. If no event types provided - it + * remove the listener for all its registered events. + * + * @param lsnr Listener. + * @param types Event types. + * @return Returns {@code true} if removed. + */ + public boolean removeDiscoveryEventListener(DiscoveryEventListener lsnr, @Nullable int... types) { + assert lsnr != null; + + boolean found = false; + + if (F.isEmpty(types)) { + for (Set<DiscoveryEventListener> set : discoLsnrs.values()) + if (set.remove(lsnr)) + found = true; + } + else { + assert types != null; + + for (int type : types) { + Set<DiscoveryEventListener> set = discoLsnrs.get(type); + + if (set != null && set.remove(lsnr)) + found = true; + } + } + + return found; + } + + /** * * @param p Optional predicate. * @param types Event types to wait for. @@ -800,6 +915,41 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** + * @param evt Discovery event + * @param cache Discovery cache. + */ + private void notifyDiscoveryListeners(DiscoveryEvent evt, DiscoCache cache) { + assert evt != null; + + notifyDiscoveryListeners(discoLsnrs.get(evt.type()), evt, cache); + } + + /** + * @param set Set of listeners. + * @param evt Discovery event. + * @param cache Discovery cache. + */ + private void notifyDiscoveryListeners(@Nullable Collection<DiscoveryEventListener> set, DiscoveryEvent evt, DiscoCache cache) { + assert evt != null; + + if (!F.isEmpty(set)) { + assert set != null; + + for (DiscoveryEventListener lsnr : set) { + try { + lsnr.onEvent(evt, cache); + } + catch (Throwable e) { + U.error(log, "Unexpected exception in listener notification for event: " + evt, e); + + if (e instanceof Error) + throw (Error)e; + } + } + } + } + + /** * @param p Grid event predicate. * @return Collection of grid events. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 144b162..2399493 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -39,6 +39,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -252,10 +253,12 @@ public class GridAffinityAssignmentCache { * * @param topVer Topology version to calculate affinity cache for. * @param discoEvt Discovery event that caused this topology version change. + * @param discoCache Discovery cache. * @return Affinity assignments. */ @SuppressWarnings("IfMayBeConditional") - public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) { + public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt, + DiscoCache discoCache) { if (log.isDebugEnabled()) log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + ", discoEvt=" + discoEvt + ']'); @@ -266,7 +269,7 @@ public class GridAffinityAssignmentCache { List<ClusterNode> sorted; if (!locCache) { - sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheId(), topVer)); + sorted = new ArrayList<>(discoCache.cacheAffinityNodes(cacheId())); Collections.sort(sorted, GridNodeOrderComparator.INSTANCE); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 7bf5fd8..d287188 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -382,7 +382,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap cctx.cache().prepareCacheStart(req, fut.topologyVersion()); if (fut.isCacheAdded(cacheId, fut.topologyVersion())) { - if (cctx.discovery().cacheAffinityNodes(req.cacheName(), fut.topologyVersion()).isEmpty()) + if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty()) U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName()); } @@ -403,7 +403,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion(); List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), - fut.discoveryEvent()); + fut.discoveryEvent(), fut.discoCache()); aff.initialize(fut.topologyVersion(), assignment); } @@ -753,7 +753,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert old == null : old; - List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent()); + List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); cache.affinity().initialize(fut.topologyVersion(), newAff); } @@ -791,7 +791,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) { List<List<ClusterNode>> assignment = - cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent()); + cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); cache.affinity().initialize(fut.topologyVersion(), assignment); } @@ -817,14 +817,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut, boolean fetch) throws IgniteCheckedException { if (!fetch && canCalculateAffinity(aff, fut)) { - List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent()); + List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); aff.initialize(fut.topologyVersion(), assignment); } else { GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, aff.cacheName(), - fut.topologyVersion()); + fut.topologyVersion(), + fut.discoCache()); fetchFut.init(); @@ -878,7 +879,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap CacheHolder cache = cache(fut, cacheDesc); - List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent()); + List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(), fut.discoCache()); cache.affinity().initialize(topVer, newAff); } @@ -945,14 +946,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cctx.localNodeId().equals(cacheDesc.receivedFrom())) { List<List<ClusterNode>> assignment = - cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent()); + cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); cacheCtx.affinity().affinityCache().initialize(fut.topologyVersion(), assignment); } else { GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, cacheCtx.name(), - topVer); + topVer, + fut.discoCache()); fetchFut.init(); @@ -986,7 +988,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap GridDhtAffinityAssignmentResponse res = fetchFut.get(); if (res == null) { - List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent()); + List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); affCache.initialize(topVer, aff); } @@ -998,7 +1000,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else { assert !affCache.centralizedAffinityFunction() || !lateAffAssign; - affCache.calculate(topVer, fut.discoveryEvent()); + affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); } List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery()); @@ -1028,7 +1030,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cacheCtx.isLocal()) continue; - cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent()); + cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); } centralizedAff = true; @@ -1078,7 +1080,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cache != null) { if (cache.client()) - cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent()); + cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); return; } @@ -1118,7 +1120,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, aff.cacheName(), - prev.topologyVersion()); + prev.topologyVersion(), + prev.discoCache()); fetchFut.init(); @@ -1129,7 +1132,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap throws IgniteCheckedException { fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut); - aff.calculate(fut.topologyVersion(), fut.discoveryEvent()); + aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); affFut.onDone(fut.topologyVersion()); } @@ -1269,7 +1272,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert aff.idealAssignment() != null : "Previous assignment is not available."; - List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent()); + List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); List<List<ClusterNode>> newAssignment = null; if (latePrimary) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index d85e76e..17c9319 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -28,7 +28,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; -import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteFuture; @@ -77,7 +76,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { @Override protected void onKernalStart0() throws IgniteCheckedException { if (cctx.isLocal()) // No discovery event needed for local affinity. - aff.calculate(LOC_CACHE_TOP_VER, null); + aff.calculate(LOC_CACHE_TOP_VER, null, null); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index e44f4a8..86dd4ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -48,14 +48,14 @@ import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; @@ -173,35 +173,33 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); /** Discovery listener. */ - private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { - @Override public void onEvent(Event evt) { + private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() { + @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) { if (!enterBusy()) return; try { - DiscoveryEvent e = (DiscoveryEvent)evt; - ClusterNode loc = cctx.localNode(); - assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED || - e.type() == EVT_DISCOVERY_CUSTOM_EVT; + assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED || + evt.type() == EVT_DISCOVERY_CUSTOM_EVT; - final ClusterNode n = e.eventNode(); + final ClusterNode n = evt.eventNode(); GridDhtPartitionExchangeId exchId = null; GridDhtPartitionsExchangeFuture exchFut = null; - if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) { + if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) { assert !loc.id().equals(n.id()); - if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) { + if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) { assert cctx.discovery().node(n.id()) == null; // Avoid race b/w initial future add and discovery event. GridDhtPartitionsExchangeFuture initFut = null; if (readyTopVer.get().equals(AffinityTopologyVersion.NONE)) { - initFut = exchangeFuture(initialExchangeId(), null, null, null); + initFut = exchangeFuture(initialExchangeId(), null, null, null, null); initFut.onNodeLeft(n); } @@ -213,18 +211,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } assert - e.type() != EVT_NODE_JOINED || n.order() > loc.order() : + evt.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; exchId = exchangeId(n.id(), - affinityTopologyVersion(e), - e.type()); + affinityTopologyVersion(evt), + evt.type()); - exchFut = exchangeFuture(exchId, e, null, null); + exchFut = exchangeFuture(exchId, evt, cache,null, null); } else { - DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e; + DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt; if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) { DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage(); @@ -254,9 +252,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (!F.isEmpty(valid)) { - exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); - exchFut = exchangeFuture(exchId, e, valid, null); + exchFut = exchangeFuture(exchId, evt, cache, valid, null); } } else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) { @@ -264,13 +262,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (msg.exchangeId() == null) { if (msg.exchangeNeeded()) { - exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); - exchFut = exchangeFuture(exchId, e, null, msg); + exchFut = exchangeFuture(exchId, evt, cache, null, msg); } } else - exchangeFuture(msg.exchangeId(), null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg); + exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg); } } @@ -279,7 +277,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana log.debug("Discovery event (will start exchange): " + exchId); // Event callback - without this callback future will never complete. - exchFut.onEvent(exchId, e); + exchFut.onEvent(exchId, evt, cache); // Start exchange process. addFuture(exchFut); @@ -301,7 +299,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchWorker = new ExchangeWorker(); - cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, + cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT); cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class, @@ -359,11 +357,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana assert startTime > 0; // Generate dummy discovery event for local node joining. - DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent(); + T2<DiscoveryEvent, DiscoCache> localJoin = cctx.discovery().localJoin(); + + DiscoveryEvent discoEvt = localJoin.get1(); + DiscoCache discoCache = localJoin.get2(); GridDhtPartitionExchangeId exchId = initialExchangeId(); - GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null, null); + GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, discoCache, null, null); if (reconnect) reconnectExchangeFut = new GridFutureAdapter<>(); @@ -470,7 +471,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { - cctx.gridEvents().removeLocalEventListener(discoLsnr); + cctx.gridEvents().removeDiscoveryEventListener(discoLsnr); cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class); cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class); @@ -1067,12 +1068,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param exchId Exchange ID. * @param discoEvt Discovery event. + * @param cache Discovery data cache. * @param reqs Cache change requests. * @param affChangeMsg Affinity change message. * @return Exchange future. */ private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, @Nullable DiscoveryEvent discoEvt, + @Nullable DiscoCache cache, @Nullable Collection<DynamicCacheChangeRequest> reqs, @Nullable CacheAffinityChangeMessage affChangeMsg) { GridDhtPartitionsExchangeFuture fut; @@ -1091,7 +1094,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (discoEvt != null) - fut.onEvent(exchId, discoEvt); + fut.onEvent(exchId, discoEvt, cache); if (stopErr != null) fut.onDone(stopErr); @@ -1235,7 +1238,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana refreshPartitions(); } else - exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg); + exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg); } finally { leaveBusy(); @@ -1291,6 +1294,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(), null, null, + null, null); exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @@ -1301,7 +1305,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana }); } else - exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg); + exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg); } } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index a1fbd72..9c4e4ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -41,7 +42,6 @@ import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -102,6 +102,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** */ private final Object similarAffKey; + /** */ + private volatile DiscoCache discoCache; + /** * @param cctx Context. * @param cacheId Cache ID. @@ -120,6 +123,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { topVer = exchFut.topologyVersion(); + discoCache = exchFut.discoCache(); + log = cctx.logger(getClass()); lock.writeLock().lock(); @@ -190,6 +195,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { this.stopping = stopping; topVer = exchId.topologyVersion(); + discoCache = exchFut.discoCache(); updateSeq.setIfGreater(updSeq); @@ -263,7 +269,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); assert oldest != null; @@ -416,7 +422,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (!F.isEmpty(nodeIds)) { for (UUID nodeId : nodeIds) { - ClusterNode n = cctx.discovery().node(nodeId); + ClusterNode n = discoCache.node(nodeId); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { if (nodes == null) @@ -442,7 +448,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { * @return List of nodes for the partition. */ private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null; + Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.allNodesWithCaches()) : null; lock.readLock().lock(); @@ -465,7 +471,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { continue; if (hasState(p, id, state, states)) { - ClusterNode n = cctx.discovery().node(id); + ClusterNode n = discoCache.node(id); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) nodes.add(n); @@ -758,7 +764,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId.equals(cctx.localNodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); // If this node became the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -808,7 +814,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId != null; assert lock.writeLock().isHeldByCurrentThread(); - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); ClusterNode loc = cctx.localNode();