Repository: ignite Updated Branches: refs/heads/ignite-zk 2759dbcaa -> d2daf9fbc
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d2daf9fb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d2daf9fb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d2daf9fb Branch: refs/heads/ignite-zk Commit: d2daf9fbc22d78985882913d9f924c1510365793 Parents: 2759dbc Author: sboikov <sboi...@gridgain.com> Authored: Thu Jan 11 12:18:03 2018 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jan 11 14:14:47 2018 +0300 ---------------------------------------------------------------------- .../CommunicationFailureContext.java | 5 +- .../discovery/GridDiscoveryManager.java | 5 +- .../managers/discovery/IgniteDiscoverySpi.java | 4 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 4 +- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 4 +- .../internal/ZkCommunicationFailureContext.java | 8 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 18 +- .../zk/internal/ZookeeperDiscoverySpiTest.java | 508 ++++++++++++++++--- 8 files changed, 468 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java index 8a9906b..d75cfdc 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java @@ -18,6 +18,7 @@ package org.apache.ignite.configuration; import java.util.List; +import java.util.Map; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.spi.communication.CommunicationSpi; @@ -38,9 +39,9 @@ public interface CommunicationFailureContext { public boolean connectionAvailable(ClusterNode node1, ClusterNode node2); /** - * @return List of currently started cache. + * @return Currently started caches. */ - public List<String> startedCaches(); + public Map<String, CacheConfiguration<?, ?>> startedCaches(); /** * @param cacheName Cache name. http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index cc63384..c0ff6ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -79,7 +79,6 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; @@ -2458,7 +2457,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if SPI supports communication error resolve. */ private static boolean supportsCommunicationErrorResolve(DiscoverySpi spi) { - return spi instanceof IgniteDiscoverySpi && ((IgniteDiscoverySpi)spi).supportsCommunicationErrorResolve(); + return spi instanceof IgniteDiscoverySpi && ((IgniteDiscoverySpi)spi).supportsCommunicationFailureResolve(); } /** @@ -2496,7 +2495,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (!supportsCommunicationErrorResolve(spi) || !supportsCommunicationErrorResolve(ctx.config().getCommunicationSpi())) throw new UnsupportedOperationException(); - ((IgniteDiscoverySpi)spi).resolveCommunicationError(node, err); + ((IgniteDiscoverySpi)spi).resolveCommunicationFailure(node, err); } /** Worker for network segment checks. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java index bf117f1..9aa5d14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java @@ -57,11 +57,11 @@ public interface IgniteDiscoverySpi extends DiscoverySpi { /** * @return {@code True} if supports communication error resolve. */ - public boolean supportsCommunicationErrorResolve(); + public boolean supportsCommunicationFailureResolve(); /** * @param node Problem node. * @param err Connection error. */ - public void resolveCommunicationError(ClusterNode node, Exception err); + public void resolveCommunicationFailure(ClusterNode node, Exception err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 292d67e..809a8c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -2108,12 +2108,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery } /** {@inheritDoc} */ - @Override public boolean supportsCommunicationErrorResolve() { + @Override public boolean supportsCommunicationFailureResolve() { return false; } /** {@inheritDoc} */ - @Override public void resolveCommunicationError(ClusterNode node, Exception err) { + @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index a89d46a..fc1af6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -243,12 +243,12 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery } /** {@inheritDoc} */ - @Override public boolean supportsCommunicationErrorResolve() { + @Override public boolean supportsCommunicationFailureResolve() { return true; } /** {@inheritDoc} */ - @Override public void resolveCommunicationError(ClusterNode node, Exception err) { + @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) { impl.resolveCommunicationError(node, err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java index c9521dc..d27b717 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.UUID; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.CommunicationFailureContext; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheGroupContext; @@ -35,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; /** * @@ -103,14 +105,14 @@ class ZkCommunicationFailureContext implements CommunicationFailureContext { } /** {@inheritDoc} */ - @Override public List<String> startedCaches() { + @Override public Map<String, CacheConfiguration<?, ?>> startedCaches() { Map<Integer, DynamicCacheDescriptor> cachesMap = ctx.affinity().caches(); - List<String> res = new ArrayList<>(cachesMap.size()); + Map<String, CacheConfiguration<?, ?>> res = U.newHashMap(cachesMap.size()); for (DynamicCacheDescriptor desc : cachesMap.values()) { if (desc.cacheType().userCache()) - res.add(desc.cacheName()); + res.put(desc.cacheName(), desc.cacheConfiguration()); } return res; http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index c9328fc..75363e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.GridLongList; @@ -329,8 +330,12 @@ public class ZookeeperDiscoveryImpl { fut.scheduleCheckOnTimeout(); } - else + else { fut = commErrProcFut.get(); + + if (fut == null) + continue; + } } nodeStatusFut = fut.nodeStatusFuture(node); @@ -4211,12 +4216,19 @@ public class ZookeeperDiscoveryImpl { void runOfWaitForExchange() throws Exception { GridCacheSharedContext cacheCtx = ((IgniteKernal)spi.ignite()).context().cache().context(); - IgniteInternalFuture<?> exchFut = - cacheCtx.exchange().affinityReadyFuture(cacheCtx.discovery().topologyVersionEx()); + final AffinityTopologyVersion topVer = cacheCtx.discovery().topologyVersionEx(); + + IgniteInternalFuture<?> exchFut = cacheCtx.exchange().affinityReadyFuture(topVer); if (exchFut != null && !exchFut.isDone()) { + if (log.isInfoEnabled()) + log.info("Wait for current exchange completion [topVer=" + topVer + ']'); + exchFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { + if (log.isInfoEnabled()) + log.info("Finished wait for current exchange completion [topVer=" + topVer + ']'); + // Most probably listener is run from Ignite thread, run fake async operation to return to Zookeeper thread. rtState.zkClient.existsAsync(zkPaths.aliveNodesDir, null, WaitExchangeCompletionCallback.this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java index b6791c2..75ecb8c 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java @@ -49,6 +49,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -65,6 +66,7 @@ import org.apache.ignite.internal.DiscoverySpiTestListener; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.discovery.CustomEventListener; import org.apache.ignite.internal.managers.discovery.DiscoCache; @@ -75,7 +77,10 @@ import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.G; @@ -84,6 +89,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; @@ -91,6 +97,7 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.plugin.security.SecuritySubject; @@ -184,7 +191,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { private boolean persistence; /** */ - private IgniteOutClosure<CommunicationFailureResolver> commProblemRslvr; + private IgniteOutClosure<CommunicationFailureResolver> commFailureRslvr; /** */ private IgniteOutClosure<DiscoverySpiNodeAuthenticator> auth; @@ -325,8 +332,8 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { if (testCommSpi) cfg.setCommunicationSpi(new ZkTestCommunicationSpi()); - if (commProblemRslvr != null) - cfg.setCommunicationFailureResolver(commProblemRslvr.apply()); + if (commFailureRslvr != null) + cfg.setCommunicationFailureResolver(commFailureRslvr.apply()); return cfg; } @@ -464,7 +471,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { checkInternalStructuresCleanup(); - checkZkNodesCleanup(); + // checkZkNodesCleanup(); } finally { reset(); @@ -2366,26 +2373,26 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testNoOpCommunicationErrorResolve_1() throws Exception { - communicationErrorResolve_Simple(2); + public void testNoOpCommunicationFailureResolve_1() throws Exception { + communicationFailureResolve_Simple(2); } /** * @throws Exception If failed. */ public void testNoOpCommunicationErrorResolve_2() throws Exception { - communicationErrorResolve_Simple(10); + communicationFailureResolve_Simple(10); } /** * @param nodes Nodes number. * @throws Exception If failed. */ - private void communicationErrorResolve_Simple(int nodes) throws Exception { + private void communicationFailureResolve_Simple(int nodes) throws Exception { assert nodes > 1; sesTimeout = 2000; - commProblemRslvr = NoOpCommunicationFailureResolver.FACTORY; + commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY; startGridsMultiThreaded(nodes); @@ -2405,7 +2412,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { ZookeeperDiscoverySpi spi = spi(ignite(idx1)); - spi.resolveCommunicationError(ignite(idx2).cluster().localNode(), new Exception("test")); + spi.resolveCommunicationFailure(ignite(idx2).cluster().localNode(), new Exception("test")); checkInternalStructuresCleanup(); } @@ -2418,7 +2425,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { */ public void testNoOpCommunicationErrorResolve_3() throws Exception { sesTimeout = 2000; - commProblemRslvr = NoOpCommunicationFailureResolver.FACTORY; + commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY; startGridsMultiThreaded(3); @@ -2433,7 +2440,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { @Override public Object call() { ZookeeperDiscoverySpi spi = spi(ignite(0)); - spi.resolveCommunicationError(ignite(1).cluster().localNode(), new Exception("test")); + spi.resolveCommunicationFailure(ignite(1).cluster().localNode(), new Exception("test")); return null; } @@ -2466,13 +2473,13 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { testCommSpi = true; sesTimeout = 2000; - commProblemRslvr = NoOpCommunicationFailureResolver.FACTORY; + commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY; startGrid(0); startGridsMultiThreaded(1, 3); - ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(3)); + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.testSpi(ignite(3)); commSpi.pingLatch = new CountDownLatch(1); @@ -2480,7 +2487,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { @Override public Object call() { ZookeeperDiscoverySpi spi = spi(ignite(1)); - spi.resolveCommunicationError(ignite(2).cluster().localNode(), new Exception("test")); + spi.resolveCommunicationFailure(ignite(2).cluster().localNode(), new Exception("test")); return null; } @@ -2508,13 +2515,13 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { testCommSpi = true; sesTimeout = 2000; - commProblemRslvr = NoOpCommunicationFailureResolver.FACTORY; + commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY; startGrid(0); startGridsMultiThreaded(1, 3); - ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(3)); + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.testSpi(ignite(3)); commSpi.pingStartLatch = new CountDownLatch(1); commSpi.pingLatch = new CountDownLatch(1); @@ -2523,7 +2530,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { @Override public Object call() { ZookeeperDiscoverySpi spi = spi(ignite(1)); - spi.resolveCommunicationError(ignite(2).cluster().localNode(), new Exception("test")); + spi.resolveCommunicationFailure(ignite(2).cluster().localNode(), new Exception("test")); return null; } @@ -2566,49 +2573,49 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testCommunicationErrorResolve_KillNode_1() throws Exception { - communicationErrorResolve_KillNodes(2, Collections.singleton(2L)); + communicationFailureResolve_KillNodes(2, Collections.singleton(2L)); } /** * @throws Exception If failed. */ public void testCommunicationErrorResolve_KillNode_2() throws Exception { - communicationErrorResolve_KillNodes(3, Collections.singleton(2L)); + communicationFailureResolve_KillNodes(3, Collections.singleton(2L)); } /** * @throws Exception If failed. */ public void testCommunicationErrorResolve_KillNode_3() throws Exception { - communicationErrorResolve_KillNodes(10, Arrays.asList(2L, 4L, 6L)); + communicationFailureResolve_KillNodes(10, Arrays.asList(2L, 4L, 6L)); } /** * @throws Exception If failed. */ public void testCommunicationErrorResolve_KillCoordinator_1() throws Exception { - communicationErrorResolve_KillNodes(2, Collections.singleton(1L)); + communicationFailureResolve_KillNodes(2, Collections.singleton(1L)); } /** * @throws Exception If failed. */ public void testCommunicationErrorResolve_KillCoordinator_2() throws Exception { - communicationErrorResolve_KillNodes(3, Collections.singleton(1L)); + communicationFailureResolve_KillNodes(3, Collections.singleton(1L)); } /** * @throws Exception If failed. */ public void testCommunicationErrorResolve_KillCoordinator_3() throws Exception { - communicationErrorResolve_KillNodes(10, Arrays.asList(1L, 4L, 6L)); + communicationFailureResolve_KillNodes(10, Arrays.asList(1L, 4L, 6L)); } /** * @throws Exception If failed. */ public void testCommunicationErrorResolve_KillCoordinator_4() throws Exception { - communicationErrorResolve_KillNodes(10, Arrays.asList(1L, 2L, 3L)); + communicationFailureResolve_KillNodes(10, Arrays.asList(1L, 2L, 3L)); } /** @@ -2616,14 +2623,14 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { * @param killNodes Nodes to kill by resolve process. * @throws Exception If failed. */ - private void communicationErrorResolve_KillNodes(int startNodes, Collection<Long> killNodes) throws Exception { + private void communicationFailureResolve_KillNodes(int startNodes, Collection<Long> killNodes) throws Exception { testCommSpi = true; - commProblemRslvr = TestNodeKillCommunicationFailureResolver.factory(killNodes); + commFailureRslvr = TestNodeKillCommunicationFailureResolver.factory(killNodes); startGrids(startNodes); - ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(0)); + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.testSpi(ignite(0)); commSpi.checkRes = new BitSet(startNodes); @@ -2643,7 +2650,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { assertNotNull(killNodeId); try { - spi.resolveCommunicationError(spi.getNode(killNodeId), new Exception("test")); + spi.resolveCommunicationFailure(spi.getNode(killNodeId), new Exception("test")); fail("Exception is not thrown"); } @@ -2666,11 +2673,11 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testCommunicationErrorResolve_KillCoordinator_5() throws Exception { + public void testCommunicationFailureResolve_KillCoordinator_5() throws Exception { sesTimeout = 2000; testCommSpi = true; - commProblemRslvr = KillCoordinatorCommunicationFailureResolver.FACTORY; + commFailureRslvr = KillCoordinatorCommunicationFailureResolver.FACTORY; startGrids(10); @@ -2682,14 +2689,14 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { info("Iteration: " + i); for (Ignite node : G.allGrids()) - ZkTestCommunicationSpi.spi(node).initCheckResult(10); + ZkTestCommunicationSpi.testSpi(node).initCheckResult(10); UUID crdId = ignite(crd).cluster().localNode().id(); ZookeeperDiscoverySpi spi = spi(ignite(crd + 1)); try { - spi.resolveCommunicationError(spi.getNode(crdId), new Exception("test")); + spi.resolveCommunicationFailure(spi.getNode(crdId), new Exception("test")); fail("Exception is not thrown"); } @@ -2710,11 +2717,11 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testCommunicationErrorResolve_KillRandom() throws Exception { + public void testCommunicationFailureResolve_KillRandom() throws Exception { sesTimeout = 2000; testCommSpi = true; - commProblemRslvr = KillRandomCommunicationFailureResolver.FACTORY; + commFailureRslvr = KillRandomCommunicationFailureResolver.FACTORY; startGridsMultiThreaded(10); @@ -2730,7 +2737,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { ZookeeperDiscoverySpi spi = null; for (Ignite node : G.allGrids()) { - ZkTestCommunicationSpi.spi(node).initCheckResult(100); + ZkTestCommunicationSpi.testSpi(node).initCheckResult(100); spi = spi(node); } @@ -2738,7 +2745,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { assert spi != null; try { - spi.resolveCommunicationError(spi.getRemoteNodes().iterator().next(), new Exception("test")); + spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new Exception("test")); } catch (IgniteSpiException ignore) { // No-op. @@ -2755,15 +2762,15 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testDefaultCommunicationErrorResolver1() throws Exception { + public void testDefaultCommunicationFailureResolver1() throws Exception { testCommSpi = true; sesTimeout = 5000; startGrids(3); - ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(3, 0, 1); - ZkTestCommunicationSpi.spi(ignite(1)).initCheckResult(3, 0, 1); - ZkTestCommunicationSpi.spi(ignite(2)).initCheckResult(3, 2); + ZkTestCommunicationSpi.testSpi(ignite(0)).initCheckResult(3, 0, 1); + ZkTestCommunicationSpi.testSpi(ignite(1)).initCheckResult(3, 0, 1); + ZkTestCommunicationSpi.testSpi(ignite(2)).initCheckResult(3, 2); UUID killedId = nodeId(2); @@ -2771,7 +2778,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { ZookeeperDiscoverySpi spi = spi(ignite(0)); - spi.resolveCommunicationError(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test")); + spi.resolveCommunicationFailure(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test")); waitForTopology(2); @@ -2781,7 +2788,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testDefaultCommunicationErrorResolver2() throws Exception { + public void testDefaultCommunicationFailureResolver2() throws Exception { testCommSpi = true; sesTimeout = 5000; @@ -2791,15 +2798,15 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { startGridsMultiThreaded(3, 2); - ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(5, 0, 1); - ZkTestCommunicationSpi.spi(ignite(1)).initCheckResult(5, 0, 1); - ZkTestCommunicationSpi.spi(ignite(2)).initCheckResult(5, 2, 3, 4); - ZkTestCommunicationSpi.spi(ignite(3)).initCheckResult(5, 2, 3, 4); - ZkTestCommunicationSpi.spi(ignite(4)).initCheckResult(5, 2, 3, 4); + ZkTestCommunicationSpi.testSpi(ignite(0)).initCheckResult(5, 0, 1); + ZkTestCommunicationSpi.testSpi(ignite(1)).initCheckResult(5, 0, 1); + ZkTestCommunicationSpi.testSpi(ignite(2)).initCheckResult(5, 2, 3, 4); + ZkTestCommunicationSpi.testSpi(ignite(3)).initCheckResult(5, 2, 3, 4); + ZkTestCommunicationSpi.testSpi(ignite(4)).initCheckResult(5, 2, 3, 4); ZookeeperDiscoverySpi spi = spi(ignite(0)); - spi.resolveCommunicationError(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test")); + spi.resolveCommunicationFailure(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test")); waitForTopology(2); } @@ -2807,22 +2814,22 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testDefaultCommunicationErrorResolver3() throws Exception { - defaultCommunicationErrorResolver_BreakCommunication(3, 1); + public void testDefaultCommunicationFailureResolver3() throws Exception { + defaultCommunicationFailureResolver_BreakCommunication(3, 1); } /** * @throws Exception If failed. */ - public void testDefaultCommunicationErrorResolver4() throws Exception { - defaultCommunicationErrorResolver_BreakCommunication(3, 0); + public void testDefaultCommunicationFailureResolver4() throws Exception { + defaultCommunicationFailureResolver_BreakCommunication(3, 0); } /** * @throws Exception If failed. */ - public void testDefaultCommunicationErrorResolver5() throws Exception { - defaultCommunicationErrorResolver_BreakCommunication(10, 1, 3, 6); + public void testDefaultCommunicationFailureResolver5() throws Exception { + defaultCommunicationFailureResolver_BreakCommunication(10, 1, 3, 6); } /** @@ -2830,7 +2837,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { * @param breakNodes Node indices where communication server is closed. * @throws Exception If failed. */ - private void defaultCommunicationErrorResolver_BreakCommunication(int startNodes, final int...breakNodes) throws Exception { + private void defaultCommunicationFailureResolver_BreakCommunication(int startNodes, final int...breakNodes) throws Exception { sesTimeout = 5000; startGridsMultiThreaded(startNodes); @@ -2860,13 +2867,13 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testCommunicationErrorResolve_CachesInfo1() throws Exception { + public void testCommunicationFailureResolve_CachesInfo1() throws Exception { testCommSpi = true; sesTimeout = 5000; final CacheInfoCommunicationFailureResolver rslvr = new CacheInfoCommunicationFailureResolver(); - commProblemRslvr = new IgniteOutClosure<CommunicationFailureResolver>() { + commFailureRslvr = new IgniteOutClosure<CommunicationFailureResolver>() { @Override public CommunicationFailureResolver apply() { return rslvr; } @@ -2874,21 +2881,320 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { startGrids(2); - ZookeeperDiscoverySpi spi = spi(ignite(0)); + awaitPartitionMapExchange(); + + Map<String, T3<Integer, Integer, Integer>> expCaches = new HashMap<>(); + + expCaches.put(DEFAULT_CACHE_NAME, new T3<>(RendezvousAffinityFunction.DFLT_PARTITION_COUNT, 0, 1)); + + checkResolverCachesInfo(ignite(0), expCaches); + + List<CacheConfiguration> caches = new ArrayList<>(); + + CacheConfiguration c1 = new CacheConfiguration("c1"); + c1.setBackups(1); + c1.setAffinity(new RendezvousAffinityFunction(false, 64)); + caches.add(c1); + + CacheConfiguration c2 = new CacheConfiguration("c2"); + c2.setBackups(2); + c2.setAffinity(new RendezvousAffinityFunction(false, 128)); + caches.add(c2); + + CacheConfiguration c3 = new CacheConfiguration("c3"); + c3.setCacheMode(CacheMode.REPLICATED); + c3.setAffinity(new RendezvousAffinityFunction(false, 256)); + caches.add(c3); + + ignite(0).createCaches(caches); + + expCaches.put("c1", new T3<>(64, 1, 2)); + expCaches.put("c2", new T3<>(128, 2, 2)); + expCaches.put("c3", new T3<>(256, 1, 2)); + + checkResolverCachesInfo(ignite(0), expCaches); + + startGrid(2); + startGrid(3); + + awaitPartitionMapExchange(); + + expCaches.put("c2", new T3<>(128, 2, 3)); + expCaches.put("c3", new T3<>(256, 1, 4)); + + checkResolverCachesInfo(ignite(0), expCaches); + + CacheConfiguration c4 = new CacheConfiguration("c4"); + c4.setCacheMode(CacheMode.PARTITIONED); + c4.setBackups(0); + c4.setAffinity(new RendezvousAffinityFunction(false, 256)); + c4.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0), getTestIgniteInstanceName(1))); + + ignite(2).createCache(c4); + + expCaches.put("c4", new T3<>(256, 0, 1)); + + checkResolverCachesInfo(ignite(0), expCaches); + + stopGrid(0); // Stop current coordinator, check new coordinator will initialize required caches information. + + awaitPartitionMapExchange(); + + expCaches.put("c3", new T3<>(256, 1, 3)); + + checkResolverCachesInfo(ignite(1), expCaches); + + startGrid(0); + + expCaches.put("c3", new T3<>(256, 1, 4)); + + checkResolverCachesInfo(ignite(1), expCaches); + + stopGrid(1); + + expCaches.put("c3", new T3<>(256, 1, 3)); + + checkResolverCachesInfo(ignite(3), expCaches); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationFailureResolve_CachesInfo2() throws Exception { + testCommSpi = true; + sesTimeout = 5000; + + final CacheInfoCommunicationFailureResolver rslvr = new CacheInfoCommunicationFailureResolver(); + + commFailureRslvr = new IgniteOutClosure<CommunicationFailureResolver>() { + @Override public CommunicationFailureResolver apply() { + return rslvr; + } + }; + + Ignite srv0 = startGrid(0); + + CacheConfiguration ccfg = new CacheConfiguration("c1"); + ccfg.setBackups(1); + + srv0.createCache(ccfg); + + // Block rebalance to make sure node0 will be the only owner. + TestRecordingCommunicationSpi.spi(srv0).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg instanceof GridDhtPartitionSupplyMessage && + ((GridDhtPartitionSupplyMessage) msg).groupId() == CU.cacheId("c1"); + } + }); + + startGrid(1); + + U.sleep(1000); + + ZookeeperDiscoverySpi spi = spi(srv0); rslvr.latch = new CountDownLatch(1); - ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(2, 0); + ZkTestCommunicationSpi.testSpi(srv0).initCheckResult(2, 0); - spi.resolveCommunicationError(spi.getRemoteNodes().iterator().next(), new Exception("test")); + spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new Exception("test")); assertTrue(rslvr.latch.await(10, SECONDS)); - List<String> caches = Arrays.asList(DEFAULT_CACHE_NAME); + List<List<ClusterNode>> cacheOwners = rslvr.ownersMap.get("c1"); + + ClusterNode node0 = srv0.cluster().localNode(); + + for (int p = 0; p < RendezvousAffinityFunction.DFLT_PARTITION_COUNT; p++) { + List<ClusterNode> owners = cacheOwners.get(p); + + assertEquals(1, owners.size()); + assertEquals(node0, owners.get(0)); + } + + TestRecordingCommunicationSpi.spi(srv0).stopBlock(); + + awaitPartitionMapExchange(); - Collections.sort(caches); + Map<String, T3<Integer, Integer, Integer>> expCaches = new HashMap<>(); - assertEquals(caches, rslvr.caches); + expCaches.put(DEFAULT_CACHE_NAME, new T3<>(RendezvousAffinityFunction.DFLT_PARTITION_COUNT, 0, 1)); + expCaches.put("c1", new T3<>(RendezvousAffinityFunction.DFLT_PARTITION_COUNT, 1, 2)); + + checkResolverCachesInfo(srv0, expCaches); + } + + /** + * @param crd Coordinator node. + * @param expCaches Expected caches info. + * @throws Exception If failed. + */ + private void checkResolverCachesInfo(Ignite crd, Map<String, T3<Integer, Integer, Integer>> expCaches) + throws Exception + { + CacheInfoCommunicationFailureResolver rslvr = + (CacheInfoCommunicationFailureResolver)crd.configuration().getCommunicationFailureResolver(); + + assertNotNull(rslvr); + + ZookeeperDiscoverySpi spi = spi(crd); + + rslvr.latch = new CountDownLatch(1); + + ZkTestCommunicationSpi.testSpi(crd).initCheckResult(crd.cluster().nodes().size(), 0); + + spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new Exception("test")); + + assertTrue(rslvr.latch.await(10, SECONDS)); + + rslvr.checkCachesInfo(expCaches); + + rslvr.reset(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testCommunicationFailureResolve_ConcurrentDiscoveyEvents() throws Exception { + sesTimeout = 5000; + + commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY; + + final int INITIAL_NODES = 5; + + startGridsMultiThreaded(INITIAL_NODES); + + final CyclicBarrier b = new CyclicBarrier(4); + + GridCompoundFuture<?, ?> fut = new GridCompoundFuture<>(); + + final AtomicBoolean stop = new AtomicBoolean(); + + fut.add((IgniteInternalFuture)GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + b.await(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 10; i++) { + startGrid(i + INITIAL_NODES); + + Thread.sleep(rnd.nextLong(1000) + 10); + + if (stop.get()) + break; + } + + return null; + } + }, "test-node-start")); + + fut.add((IgniteInternalFuture)GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + b.await(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + startGrid(100); + + Thread.sleep(rnd.nextLong(1000) + 10); + + stopGrid(100); + + Thread.sleep(rnd.nextLong(1000) + 10); + } + + return null; + } + }, "test-node-restart")); + + fut.add((IgniteInternalFuture)GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + b.await(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int idx = 0; + + while (!stop.get()) { + CacheConfiguration ccfg = new CacheConfiguration("c-" + idx++); + ccfg.setBackups(rnd.nextInt(5)); + + ignite(rnd.nextInt(INITIAL_NODES)).createCache(ccfg); + + Thread.sleep(rnd.nextLong(1000) + 10); + + ignite(rnd.nextInt(INITIAL_NODES)).destroyCache(ccfg.getName()); + + Thread.sleep(rnd.nextLong(1000) + 10); + } + + return null; + } + }, "test-create-cache")); + + fut.add((IgniteInternalFuture)GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try { + b.await(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 5; i++) { + info("resolveCommunicationFailure: " + i); + + ZookeeperDiscoverySpi spi = spi(ignite(rnd.nextInt(INITIAL_NODES))); + + spi.resolveCommunicationFailure(ignite(rnd.nextInt(INITIAL_NODES)).cluster().localNode(), + new Exception("test")); + } + + return null; + } + finally { + stop.set(true); + } + } + }, 5, "test-resolve-failure")); + + fut.markInitialized(); + + fut.get(); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationFailureResolve_ConcurrentMultinode() throws Exception { + sesTimeout = 5000; + + commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY; + + startGridsMultiThreaded(5); + + client = true; + + startGridsMultiThreaded(5, 5); + + final int NODES = 10; + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 5; i++) { + info("resolveCommunicationFailure: " + i); + + ZookeeperDiscoverySpi spi = spi(ignite(rnd.nextInt(NODES))); + + spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new Exception("test")); + } + + return null; + } + }, 30, "test-resolve-failure"); } /** @@ -4100,7 +4406,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { private IgniteLogger log; /** */ - List<String> caches; + Map<String, CacheConfiguration<?, ?>> caches; /** */ Map<String, List<List<ClusterNode>>> affMap; @@ -4118,22 +4424,82 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { caches = ctx.startedCaches(); - Collections.sort(caches); - - log.info("Resolver called, started caches: " + caches); + log.info("Resolver called, started caches: " + caches.keySet()); assertNotNull(caches); affMap = new HashMap<>(); ownersMap = new HashMap<>(); - for (String cache : caches) { + for (String cache : caches.keySet()) { affMap.put(cache, ctx.cacheAffinity(cache)); ownersMap.put(cache, ctx.cachePartitionOwners(cache)); } latch.countDown(); } + + /** + * @param expCaches Expected caches information (when late assignment doen and rebalance finished). + */ + void checkCachesInfo(Map<String, T3<Integer, Integer, Integer>> expCaches) { + assertNotNull(caches); + assertNotNull(affMap); + assertNotNull(ownersMap); + + for (Map.Entry<String, T3<Integer, Integer, Integer>> e : expCaches.entrySet()) { + String cacheName = e.getKey(); + + int parts = e.getValue().get1(); + int backups = e.getValue().get2(); + int expNodes = e.getValue().get3(); + + assertTrue(cacheName, caches.containsKey(cacheName)); + + CacheConfiguration ccfg = caches.get(cacheName); + + assertEquals(cacheName, ccfg.getName()); + + if (ccfg.getCacheMode() == CacheMode.REPLICATED) + assertEquals(Integer.MAX_VALUE, ccfg.getBackups()); + else + assertEquals(backups, ccfg.getBackups()); + + assertEquals(parts, ccfg.getAffinity().partitions()); + + List<List<ClusterNode>> aff = affMap.get(cacheName); + + assertNotNull(cacheName, aff); + assertEquals(parts, aff.size()); + + List<List<ClusterNode>> owners = ownersMap.get(cacheName); + + assertNotNull(cacheName, owners); + assertEquals(parts, owners.size()); + + for (int i = 0; i < parts; i++) { + List<ClusterNode> partAff = aff.get(i); + + assertEquals(cacheName, expNodes, partAff.size()); + + List<ClusterNode> partOwners = owners.get(i); + + assertEquals(cacheName, expNodes, partOwners.size()); + + assertTrue(cacheName, partAff.containsAll(partOwners)); + assertTrue(cacheName, partOwners.containsAll(partAff)); + } + } + } + + /** + * + */ + void reset() { + caches = null; + affMap = null; + ownersMap = null; + } } /** @@ -4262,7 +4628,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * */ - static class ZkTestCommunicationSpi extends TcpCommunicationSpi { + static class ZkTestCommunicationSpi extends TestRecordingCommunicationSpi { /** */ private volatile CountDownLatch pingStartLatch; @@ -4276,7 +4642,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { * @param ignite Node. * @return Node's communication SPI. */ - static ZkTestCommunicationSpi spi(Ignite ignite) { + static ZkTestCommunicationSpi testSpi(Ignite ignite) { return (ZkTestCommunicationSpi)ignite.configuration().getCommunicationSpi(); }