Repository: ignite Updated Branches: refs/heads/ignite-6467 9c87026a9 -> 55e2c6b17
Merge remote-tracking branch 'remotes/origin/master' into ignite-6467 # Conflicts: # modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/55e2c6b1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/55e2c6b1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/55e2c6b1 Branch: refs/heads/ignite-6467 Commit: 55e2c6b17f26f54e68f3ac452199c7888d30c544 Parents: 9c87026 Author: sboikov <sboi...@apache.org> Authored: Sat Jul 28 11:47:45 2018 +0300 Committer: sboikov <sboi...@apache.org> Committed: Sat Jul 28 11:47:45 2018 +0300 ---------------------------------------------------------------------- .../distributed/CacheExchangeMergeTest.java | 704 ------------------- 1 file changed, 704 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/55e2c6b1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index 8d0201c..7c18f6b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -284,134 +284,6 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testMergeStartRandomClientsServers() throws Exception { - for (int iter = 0; iter < 3; iter++) { - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - final int srvs = rnd.nextInt(3) + 1; - final int clients = rnd.nextInt(3); - - log.info("Iteration [iter=" + iter + ", srvs=" + srvs + ", clients=" + clients + ']'); - - Ignite srv0 = startGrids(srvs); - - for (int i = 0; i < clients; i++) { - client.set(true); - - startGrid(srvs + i); - } - - final int threads = 8; - - final int initNodes = srvs + clients; - - mergeExchangeWaitVersion(srv0, initNodes + threads); - - final AtomicInteger idx = new AtomicInteger(initNodes); - - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - int nodeIdx = idx.incrementAndGet(); - - if (rnd.nextInt(3) == 0) { - log.info("Start client: " + nodeIdx); - - client.set(true); - } - else - log.info("Start server: " + nodeIdx); - - startGrid(nodeIdx); - - return null; - } - }, threads, "test-thread"); - - fut.get(); - - checkCaches(); - - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testMergeStartStopRandomClientsServers() throws Exception { - for (int iter = 0; iter < 3; iter++) { - log.info("Iteration: " + iter); - - final int srvs = 5; - final int clients = 5; - - Ignite srv0 = startGrids(srvs); - - for (int i = 0; i < clients; i++) { - client.set(true); - - startGrid(srvs + i); - } - - final int threads = 8; - - final int initNodes = srvs + clients; - - mergeExchangeWaitVersion(srv0, initNodes + threads); - - final AtomicInteger idx = new AtomicInteger(initNodes); - - final ConcurrentHashSet<Integer> stopNodes = new ConcurrentHashSet<>(); - - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - if (rnd.nextBoolean()) { - Integer stopIdx; - - for (;;) { - stopIdx = rnd.nextInt(initNodes - 1) + 1; - - if (stopNodes.add(stopIdx)) - break; - } - - log.info("Stop node: " + stopIdx); - - stopGrid(getTestIgniteInstanceName(stopIdx), true, false); - } - else { - int nodeIdx = idx.incrementAndGet(); - - if (rnd.nextInt(5) == 0) { - log.info("Start client: " + nodeIdx); - - client.set(true); - } - else - log.info("Start server: " + nodeIdx); - - startGrid(nodeIdx); - } - - return null; - } - }, threads, "test-thread"); - - fut.get(); - - checkCaches(); - - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ public void testConcurrentStartServers() throws Exception { concurrentStart(false); } @@ -463,582 +335,6 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { } /** - * @throws Exception If failed. - */ - public void testMergeServerAndClientJoin1() throws Exception { - final IgniteEx srv0 = startGrid(0); - - mergeExchangeWaitVersion(srv0, 3); - - IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - startGrid(1); - - return null; - } - }, 1, "start-srv"); - - waitForExchangeStart(srv0, 2); - - IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - client.set(true); - - startGrid(2); - - return null; - } - }, 1, "start-client"); - - fut1.get(); - fut2.get(); - - checkCaches(); - - checkExchanges(srv0, 1, 3); - checkExchanges(ignite(1), 3); - checkExchanges(ignite(2), 3); - } - - /** - * @throws Exception If failed. - */ - public void testStartCacheOnJoinAndJoinMerge_2_nodes() throws Exception { - startCacheOnJoinAndJoinMerge1(2, false); - } - - /** - * @throws Exception If failed. - */ - public void testStartCacheOnJoinAndJoinMerge_4_nodes() throws Exception { - startCacheOnJoinAndJoinMerge1(4, false); - } - - /** - * @throws Exception If failed. - */ - public void testStartCacheOnJoinAndJoinMerge_WithClients() throws Exception { - startCacheOnJoinAndJoinMerge1(5, true); - } - - /** - * @param nodes Number of nodes to start. - * @param withClients If {@code true} starts both servers and clients. - * @throws Exception If failed. - */ - private void startCacheOnJoinAndJoinMerge1(int nodes, boolean withClients) throws Exception { - cfgCache = false; - - final IgniteEx srv0 = startGrid(0); - - mergeExchangeWaitVersion(srv0, nodes + 1); - - if (withClients) { - clientC = new IgniteClosure<String, Boolean>() { - @Override public Boolean apply(String nodeName) { - return getTestIgniteInstanceIndex(nodeName) % 2 == 0; - } - }; - } - - cfgCache = true; - - IgniteInternalFuture fut = startGridsAsync(srv0, 1, nodes); - - fut.get(); - - checkCaches(); - } - - /** - * @throws Exception If failed. - */ - public void testMergeAndHistoryCleanup() throws Exception { - final int histSize = 5; - - String oldHistVal = System.getProperty(IGNITE_EXCHANGE_HISTORY_SIZE); - - System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, String.valueOf(histSize)); - - try { - final Ignite srv0 = startGrid(0); - - int topVer = 1; - - for (int i = 0; i < 3; i++) { - mergeExchangeWaitVersion(srv0, topVer + 3); - - startGridsAsync(srv0, topVer, 3).get(); - - topVer += 3; - } - - checkHistorySize(histSize); - - awaitPartitionMapExchange(); - - checkHistorySize(histSize); - - mergeExchangeWaitVersion(srv0, topVer + 2); - - stopGrid(1); - stopGrid(2); - - checkHistorySize(histSize); - - awaitPartitionMapExchange(); - - checkHistorySize(histSize); - } - finally { - if (oldHistVal != null) - System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, oldHistVal); - else - System.clearProperty(IGNITE_EXCHANGE_HISTORY_SIZE); - } - } - - /** - * @param histSize History size. - */ - private void checkHistorySize(int histSize) { - List<Ignite> nodes = G.allGrids(); - - assertTrue(nodes.size() > 0); - - for (Ignite node : nodes) { - List<GridDhtPartitionsExchangeFuture> exchFuts = - ((IgniteEx)node).context().cache().context().exchange().exchangeFutures(); - - assertTrue("Unexpected size: " + exchFuts.size(), exchFuts.size() > 0 && exchFuts.size() <= histSize); - } - } - - /** - * @throws Exception If failed. - */ - public void testStartCacheOnJoinAndMergeWithFail() throws Exception { - cfgCache = false; - - final Ignite srv0 = startGrids(2); - - mergeExchangeWaitVersion(srv0, 5); - - cfgCache = true; - - IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2); - - stopGrid(1); - - fut.get(); - - checkCaches(); - - checkExchanges(srv0, 1, 2, 3, 5); - checkExchanges(ignite(2), 3, 5); - checkExchanges(ignite(3), 5); - } - - /** - * @throws Exception If failed. - */ - public void testStartCacheOnJoinAndCoordinatorFailed1() throws Exception { - cfgCache = false; - - final Ignite srv0 = startGrids(2); - - mergeExchangeWaitVersion(srv0, 5); - - cfgCache = true; - - IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2); - - stopGrid(0); - - fut.get(); - - checkCaches(); - } - - /** - * @throws Exception If failed. - */ - public void testStartCacheOnJoinAndCoordinatorFailed2() throws Exception { - cfgCache = false; - - final Ignite srv0 = startGrid(0); - - mergeExchangeWaitVersion(srv0, 3); - - cfgCache = true; - - IgniteInternalFuture fut = startGridsAsync(srv0, 1, 2); - - stopGrid(0); - - fut.get(); - - checkCaches(); - } - - /** - * @throws Exception If failed. - */ - public void testMergeServersJoin1() throws Exception { - IgniteEx srv0 = startGrid(0); - - mergeExchangeWaitVersion(srv0, 3); - - final AtomicInteger idx = new AtomicInteger(1); - - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - startGrid(idx.getAndIncrement()); - - return null; - } - }, 2, "start-node"); - - fut.get(); - - checkCaches(); - - checkExchanges(srv0, 1, 3); - checkExchanges(ignite(1), 3); - checkExchanges(ignite(2), 3); - } - - /** - * @throws Exception If failed. - */ - public void testMergeServerJoin1ClientsInTopology() throws Exception { - IgniteEx srv0 = startGrid(0); - - client.set(true); - - startGrid(1); - - client.set(true); - - startGrid(2); - - mergeExchangeWaitVersion(srv0, 5); - - final AtomicInteger idx = new AtomicInteger(3); - - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - startGrid(idx.getAndIncrement()); - - return null; - } - }, 2, "start-node"); - - fut.get(); - - checkCaches(); - - checkExchanges(srv0, 1, 2, 3, 5); - checkExchanges(ignite(1), 2, 3, 5); - checkExchanges(ignite(2), 3, 5); - checkExchanges(ignite(3), 5); - checkExchanges(ignite(4), 5); - } - - /** - * @throws Exception If failed. - */ - public void testMergeAndNewCoordinator() throws Exception { - final Ignite srv0 = startGrids(3); - - mergeExchangeWaitVersion(srv0, 6); - - IgniteInternalFuture fut = startGridsAsync(srv0, 3, 3); - - fut.get(); - - checkCaches(); - - stopGrid(0); - - checkCaches(); - } - - /** - * @throws Exception If failed. - */ - public void testMergeServersFail1_1() throws Exception { - mergeServersFail1(false); - } - - /** - * @throws Exception If failed. - */ - public void testMergeServersFail1_2() throws Exception { - mergeServersFail1(true); - } - - /** - * @param waitRebalance Wait for rebalance end before start tested topology change. - * @throws Exception If failed. - */ - private void mergeServersFail1(boolean waitRebalance) throws Exception { - final Ignite srv0 = startGrids(5); - - if (waitRebalance) - awaitPartitionMapExchange(); - - final List<DiscoveryEvent> mergedEvts = new ArrayList<>(); - - mergeExchangeWaitVersion(srv0, 8, mergedEvts); - - UUID grid3Id = grid(3).localNode().id(); - UUID grid2Id = grid(2).localNode().id(); - - stopGrid(getTestIgniteInstanceName(4), true, false); - stopGrid(getTestIgniteInstanceName(3), true, false); - stopGrid(getTestIgniteInstanceName(2), true, false); - - checkCaches(); - - awaitPartitionMapExchange(); - - assertTrue("Unexpected number of merged disco events: " + mergedEvts.size(), mergedEvts.size() == 2); - - for (DiscoveryEvent discoEvt : mergedEvts) { - ClusterNode evtNode = discoEvt.eventNode(); - - assertTrue("eventNode is null for DiscoEvent " + discoEvt, evtNode != null); - - assertTrue("Unexpected eventNode ID: " - + evtNode.id() + " while expecting " + grid2Id + " or " + grid3Id, - evtNode.id().equals(grid2Id) || evtNode.id().equals(grid3Id)); - } - } - - /** - * @throws Exception If failed. - */ - public void testMergeServersAndClientsFail1() throws Exception { - mergeServersAndClientsFail(false); - } - - /** - * @throws Exception If failed. - */ - public void testMergeServersAndClientsFail2() throws Exception { - mergeServersAndClientsFail(true); - } - - - /** - * @param waitRebalance Wait for rebalance end before start tested topology change. - * @throws Exception If failed. - */ - private void mergeServersAndClientsFail(boolean waitRebalance) throws Exception { - clientC = new IgniteClosure<String, Boolean>() { - @Override public Boolean apply(String nodeName) { - return nodeName.equals(getTestIgniteInstanceName(2)) || nodeName.equals(getTestIgniteInstanceName(3)); - } - }; - - final Ignite srv0 = startGrids(6); - - if (waitRebalance) - awaitPartitionMapExchange(); - - mergeExchangeWaitVersion(srv0, 10); - - stopGrid(getTestIgniteInstanceName(1), true, false); - stopGrid(getTestIgniteInstanceName(2), true, false); - stopGrid(getTestIgniteInstanceName(3), true, false); - stopGrid(getTestIgniteInstanceName(4), true, false); - - checkAffinity(); - - mergeExchangeWaitVersion(srv0, 12); - - IgniteInternalFuture fut = startGridsAsync(srv0, 6, 2); - - fut.get(); - - checkCaches(); - } - - /** - * @throws Exception If failed. - */ - public void testJoinExchangeCoordinatorChange_NoMerge_1() throws Exception { - for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { - exchangeCoordinatorChangeNoMerge(4, true, mode); - - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testJoinExchangeCoordinatorChange_NoMerge_2() throws Exception { - for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { - exchangeCoordinatorChangeNoMerge(8, true, mode); - - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testFailExchangeCoordinatorChange_NoMerge_1() throws Exception { - for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { - exchangeCoordinatorChangeNoMerge(5, false, mode); - - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testFailExchangeCoordinatorChange_NoMerge_2() throws Exception { - for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { - exchangeCoordinatorChangeNoMerge(8, false, mode); - - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testMergeJoinExchangesCoordinatorChange1_4_servers() throws Exception { - for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { - mergeJoinExchangesCoordinatorChange1(4, mode); - - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testMergeJoinExchangesCoordinatorChange1_8_servers() throws Exception { - for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { - mergeJoinExchangesCoordinatorChange1(8, mode); - - stopAllGrids(); - } - } - - /** - * @param srvs Number of server nodes. - * @param mode Test mode. - * @throws Exception If failed. - */ - private void mergeJoinExchangesCoordinatorChange1(final int srvs, CoordinatorChangeMode mode) - throws Exception - { - log.info("Test mergeJoinExchangesCoordinatorChange1 [srvs=" + srvs + ", mode=" + mode + ']'); - - testSpi = true; - - Ignite srv0 = startGrids(srvs); - - mergeExchangeWaitVersion(srv0, 6); - - CountDownLatch latch = blockExchangeFinish(srvs, mode); - - IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, 2); - - if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS)) - fail("Failed to wait for expected messages."); - - stopGrid(getTestIgniteInstanceName(0), true, false); - - fut.get(); - - checkCaches(); - } - - /** - * @throws Exception If failed. - */ - public void testMergeJoinExchangesCoordinatorChange2_4_servers() throws Exception { - mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 4), F.asList(5)); - - stopAllGrids(); - - mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 5), F.asList(4)); - } - - /** - * @param srvs Number of server nodes. - * @param startNodes Number of nodes to start. - * @param blockNodes Nodes which do not receive messages. - * @param waitMsgNodes Nodes which should receive messages. - * @throws Exception If failed. - */ - private void mergeJoinExchangeCoordinatorChange2(final int srvs, - final int startNodes, - List<Integer> blockNodes, - List<Integer> waitMsgNodes) throws Exception - { - testSpi = true; - - Ignite srv0 = startGrids(srvs); - - mergeExchangeWaitVersion(srv0, srvs + startNodes); - - CountDownLatch latch = blockExchangeFinish(srv0, srvs + 1, blockNodes, waitMsgNodes); - - IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, startNodes); - - if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS)) - fail("Failed to wait for expected messages."); - - stopGrid(getTestIgniteInstanceName(0), true, false); - - fut.get(); - - checkCaches(); - } - - /** - * @throws Exception If failed. - */ - public void testMergeExchangeCoordinatorChange4() throws Exception { - testSpi = true; - - final int srvs = 4; - - Ignite srv0 = startGrids(srvs); - - mergeExchangeWaitVersion(srv0, 6); - - final AtomicInteger idx = new AtomicInteger(srvs); - - CountDownLatch latch = blockExchangeFinish(srv0, 5, F.asList(1, 2, 3, 4), F.asList(5)); - - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - startGrid(idx.getAndIncrement()); - - return null; - } - }, 2, "start-node"); - - if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS)) - fail("Failed to wait for expected messages."); - - stopGrid(getTestIgniteInstanceName(0), true, false); - - fut.get(); - - checkCaches(); - } - - /** * @param srvs Number of servers. * @param join If {@code true} starts new node, otherwise stops node. * @param mode Tested scenario.