Merge branch ignite-1.8.4-p1 into ignite-1.8.5-p1 # Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4fce2805 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4fce2805 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4fce2805 Branch: refs/heads/ignite-1561-1 Commit: 4fce28054bc325741f65035ae384f9b4b9c3fee8 Parents: 62dbba8 8273e67 Author: Alexander Fedotov <alexander.fedot...@gmail.com> Authored: Fri Apr 7 16:06:34 2017 +0300 Committer: Alexander Fedotov <alexander.fedot...@gmail.com> Committed: Fri Apr 7 16:06:34 2017 +0300 ---------------------------------------------------------------------- .../internal/managers/discovery/DiscoCache.java | 310 ++++++++ .../discovery/GridDiscoveryManager.java | 710 ++++++------------- .../eventstorage/DiscoveryEventListener.java | 33 + .../eventstorage/GridEventStorageManager.java | 162 ++++- .../affinity/GridAffinityAssignmentCache.java | 7 +- .../cache/CacheAffinitySharedManager.java | 35 +- .../cache/GridCacheAffinityManager.java | 2 +- .../GridCachePartitionExchangeManager.java | 64 +- .../dht/GridClientPartitionTopology.java | 20 +- .../dht/GridDhtAssignmentFetchFuture.java | 7 +- .../dht/GridDhtPartitionTopologyImpl.java | 44 +- .../dht/atomic/GridDhtAtomicCache.java | 4 +- .../GridDhtPartitionsExchangeFuture.java | 33 +- .../service/GridServiceProcessor.java | 21 +- .../GridDiscoveryManagerAliveCacheSelfTest.java | 64 +- .../GridDiscoveryManagerAttributesSelfTest.java | 14 +- .../discovery/GridDiscoveryManagerSelfTest.java | 214 ------ .../IgniteTopologyPrintFormatSelfTest.java | 8 +- .../testsuites/IgniteKernalSelfTestSuite.java | 3 - 19 files changed, 854 insertions(+), 901 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 2ec1070,80549dc..53e6069 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@@ -112,8 -108,8 +108,9 @@@ import org.apache.ignite.spi.discovery. import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.thread.IgniteThread; + import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@@ -1892,29 -1901,114 +1902,137 @@@ public class GridDiscoveryManager exten } /** + * @return {@code True} if local node client and discovery SPI supports reconnect. + */ + public boolean reconnectSupported() { + DiscoverySpi spi = getSpi(); + + return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) && + !(((TcpDiscoverySpi) spi).isClientReconnectDisabled()); + } + + /** + * Leave cluster and try to join again. + * + * @throws IgniteSpiException If failed. + */ + public void reconnect() { + assert reconnectSupported(); + + DiscoverySpi discoverySpi = getSpi(); + + ((TcpDiscoverySpi)discoverySpi).reconnect(); + } + + /** + * @param loc Local node. + * @param topSnapshot Topology snapshot. + * @return Newly created discovery cache. + */ + @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) { + HashSet<UUID> alives = U.newHashSet(topSnapshot.size()); + HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size()); + + ArrayList<ClusterNode> daemonNodes = new ArrayList<>(topSnapshot.size()); + ArrayList<ClusterNode> srvNodes = new ArrayList<>(topSnapshot.size()); + ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size()); + ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size()); + + for (ClusterNode node : topSnapshot) { + if (alive(node)) + alives.add(node.id()); + + if (node.isDaemon()) + daemonNodes.add(node); + else { + allNodes.add(node); + + if (!node.isLocal()) + rmtNodes.add(node); + + if (!CU.clientNode(node)) + srvNodes.add(node); + } + + nodeMap.put(node.id(), node); + } + + assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" + + " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']'; + + Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size()); + Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size()); + + Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + + Set<Integer> nearEnabledCaches = new HashSet<>(); + + for (ClusterNode node : allNodes) { + assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']'; + assert !node.isDaemon(); + + for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { + String cacheName = entry.getKey(); + CachePredicate filter = entry.getValue(); + + if (filter.cacheNode(node)) { + allNodesWithCaches.add(node); + + if(!CU.clientNode(node)) + srvNodesWithCaches.add(node); + + if (!node.isLocal()) + rmtNodesWithCaches.add(node); + + addToMap(allCacheNodes, cacheName, node); + + if (filter.dataNode(node)) + addToMap(affCacheNodes, cacheName, node); + + if (filter.nearNode(node)) + nearEnabledCaches.add(CU.cacheId(cacheName)); + } + } + } + + return new DiscoCache( + loc, + Collections.unmodifiableList(rmtNodes), + Collections.unmodifiableList(allNodes), + Collections.unmodifiableList(srvNodes), + Collections.unmodifiableList(daemonNodes), + U.sealList(srvNodesWithCaches), + U.sealList(allNodesWithCaches), + U.sealList(rmtNodesWithCaches), + Collections.unmodifiableMap(allCacheNodes), + Collections.unmodifiableMap(affCacheNodes), + Collections.unmodifiableMap(nodeMap), + Collections.unmodifiableSet(nearEnabledCaches), + alives); + } + + /** + * Adds node to map. + * + * @param cacheMap Map to add to. + * @param cacheName Cache name. + * @param rich Node to add + */ + private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) { + List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName)); + + if (cacheNodes == null) { + cacheNodes = new ArrayList<>(); + + cacheMap.put(CU.cacheId(cacheName), cacheNodes); + } + + cacheNodes.add(rich); + } + + /** * Updates topology version if current version is smaller than updated. * * @param updated Updated topology version. http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 6ced5e6,99146aa..adfbc11 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@@ -315,10 -313,8 +315,10 @@@ public class GridServiceProcessor exten busyLock.block(); + U.shutdownNow(GridServiceProcessor.class, depExe, log); + if (!ctx.clientNode()) - ctx.event().removeLocalEventListener(topLsnr); + ctx.event().removeDiscoveryEventListener(topLsnr); Collection<ServiceContextImpl> ctxs = new ArrayList<>(); @@@ -1576,19 -1586,16 +1576,22 @@@ if (!((CacheAffinityChangeMessage)msg).exchangeNeeded()) return; } + else if (msg instanceof DynamicCacheChangeBatch) { + if (!((DynamicCacheChangeBatch)msg).exchangeNeeded()) + return; + } + else + return; } else - topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0); + topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0); - depExe.execute(new BusyRunnable() { + depExe.execute(new DepRunnable() { @Override public void run0() { - ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer); + // In case the cache instance isn't tracked by DiscoveryManager anymore. + discoCache.updateAlives(ctx.discovery()); + + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); if (oldest != null && oldest.isLocal()) { final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java ----------------------------------------------------------------------