Merge remote-tracking branch 'remotes/origin/master' into ignite-6467 # Conflicts: # modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30bac7fc Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30bac7fc Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30bac7fc Branch: refs/heads/ignite-6467 Commit: 30bac7fc18ba09cd092192bcc4e79377b655c1ae Parents: a867641 Author: sboikov <sboi...@apache.org> Authored: Sat Jul 28 10:26:03 2018 +0300 Committer: sboikov <sboi...@apache.org> Committed: Sat Jul 28 10:26:03 2018 +0300 ---------------------------------------------------------------------- .../distributed/CacheExchangeMergeTest.java | 951 ++++++++++++++++++- 1 file changed, 950 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/30bac7fc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index a2259ff..6502d3d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -206,6 +206,213 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testDelayExchangeMessages() throws Exception { + testDelaySpi = true; + + System.setProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY, "2000"); + + try { + final int srvs = 6; + final int clients = 3; + + startGridsMultiThreaded(srvs); + + for (int i = 0; i < clients; i++) { + client.set(true); + + startGrid(srvs + i); + } + + final int initNodes = srvs + clients; + + final AtomicInteger stopIdx = new AtomicInteger(); + + IgniteInternalFuture stopFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + Thread.sleep(ThreadLocalRandom.current().nextLong(500) + 1); + + stopGrid(stopIdx.incrementAndGet()); + + return null; + } + }, 3, "stop-srv"); + + final AtomicInteger startIdx = new AtomicInteger(initNodes); + + IgniteInternalFuture startFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int nodeIdx = startIdx.incrementAndGet(); + + if (rnd.nextInt(3) == 0) { + log.info("Start client: " + nodeIdx); + + client.set(true); + } + else + log.info("Start server: " + nodeIdx); + + startGrid(nodeIdx); + + if (rnd.nextBoolean()) { + log.info("Stop started node: " + nodeIdx); + + stopGrid(nodeIdx); + } + + return null; + } + }, 5, "start-node"); + + stopFut.get(); + startFut.get(); + + checkCaches(); + } + finally { + System.clearProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY); + } + } + + /** + * @throws Exception If failed. + */ + public void testMergeStartRandomClientsServers() throws Exception { + for (int iter = 0; iter < 3; iter++) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + final int srvs = rnd.nextInt(3) + 1; + final int clients = rnd.nextInt(3); + + log.info("Iteration [iter=" + iter + ", srvs=" + srvs + ", clients=" + clients + ']'); + + Ignite srv0 = startGrids(srvs); + + for (int i = 0; i < clients; i++) { + client.set(true); + + startGrid(srvs + i); + } + + final int threads = 8; + + final int initNodes = srvs + clients; + + mergeExchangeWaitVersion(srv0, initNodes + threads); + + final AtomicInteger idx = new AtomicInteger(initNodes); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int nodeIdx = idx.incrementAndGet(); + + if (rnd.nextInt(3) == 0) { + log.info("Start client: " + nodeIdx); + + client.set(true); + } + else + log.info("Start server: " + nodeIdx); + + startGrid(nodeIdx); + + return null; + } + }, threads, "test-thread"); + + fut.get(); + + checkCaches(); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testMergeStartStopRandomClientsServers() throws Exception { + for (int iter = 0; iter < 3; iter++) { + log.info("Iteration: " + iter); + + final int srvs = 5; + final int clients = 5; + + Ignite srv0 = startGrids(srvs); + + for (int i = 0; i < clients; i++) { + client.set(true); + + startGrid(srvs + i); + } + + final int threads = 8; + + final int initNodes = srvs + clients; + + mergeExchangeWaitVersion(srv0, initNodes + threads); + + final AtomicInteger idx = new AtomicInteger(initNodes); + + final ConcurrentHashSet<Integer> stopNodes = new ConcurrentHashSet<>(); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + if (rnd.nextBoolean()) { + Integer stopIdx; + + for (;;) { + stopIdx = rnd.nextInt(initNodes - 1) + 1; + + if (stopNodes.add(stopIdx)) + break; + } + + log.info("Stop node: " + stopIdx); + + stopGrid(getTestIgniteInstanceName(stopIdx), true, false); + } + else { + int nodeIdx = idx.incrementAndGet(); + + if (rnd.nextInt(5) == 0) { + log.info("Start client: " + nodeIdx); + + client.set(true); + } + else + log.info("Start server: " + nodeIdx); + + startGrid(nodeIdx); + } + + return null; + } + }, threads, "test-thread"); + + fut.get(); + + checkCaches(); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentStartServers() throws Exception { + concurrentStart(false); + } + + /** + * @throws Exception If failed. + */ public void testConcurrentStartServersAndClients() throws Exception { concurrentStart(true); } @@ -246,9 +453,751 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { checkCaches(); stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testMergeServerAndClientJoin1() throws Exception { + final IgniteEx srv0 = startGrid(0); + + mergeExchangeWaitVersion(srv0, 3); + + IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(1); + + return null; + } + }, 1, "start-srv"); + + waitForExchangeStart(srv0, 2); + + IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + client.set(true); + + startGrid(2); + + return null; + } + }, 1, "start-client"); + + fut1.get(); + fut2.get(); + + checkCaches(); + + checkExchanges(srv0, 1, 3); + checkExchanges(ignite(1), 3); + checkExchanges(ignite(2), 3); + } + + /** + * @throws Exception If failed. + */ + public void testStartCacheOnJoinAndJoinMerge_2_nodes() throws Exception { + startCacheOnJoinAndJoinMerge1(2, false); + } + + /** + * @throws Exception If failed. + */ + public void testStartCacheOnJoinAndJoinMerge_4_nodes() throws Exception { + startCacheOnJoinAndJoinMerge1(4, false); + } + + /** + * @throws Exception If failed. + */ + public void testStartCacheOnJoinAndJoinMerge_WithClients() throws Exception { + startCacheOnJoinAndJoinMerge1(5, true); + } + + /** + * @param nodes Number of nodes to start. + * @param withClients If {@code true} starts both servers and clients. + * @throws Exception If failed. + */ + private void startCacheOnJoinAndJoinMerge1(int nodes, boolean withClients) throws Exception { + cfgCache = false; + + final IgniteEx srv0 = startGrid(0); + + mergeExchangeWaitVersion(srv0, nodes + 1); + + if (withClients) { + clientC = new IgniteClosure<String, Boolean>() { + @Override public Boolean apply(String nodeName) { + return getTestIgniteInstanceIndex(nodeName) % 2 == 0; + } + }; + } + + cfgCache = true; + + IgniteInternalFuture fut = startGridsAsync(srv0, 1, nodes); + + fut.get(); + + checkCaches(); + } + + /** + * @throws Exception If failed. + */ + public void testMergeAndHistoryCleanup() throws Exception { + final int histSize = 5; + + String oldHistVal = System.getProperty(IGNITE_EXCHANGE_HISTORY_SIZE); + + System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, String.valueOf(histSize)); + + try { + final Ignite srv0 = startGrid(0); + + int topVer = 1; + + for (int i = 0; i < 3; i++) { + mergeExchangeWaitVersion(srv0, topVer + 3); + + startGridsAsync(srv0, topVer, 3).get(); + + topVer += 3; + } + + checkHistorySize(histSize); + + awaitPartitionMapExchange(); + + checkHistorySize(histSize); + + mergeExchangeWaitVersion(srv0, topVer + 2); + + stopGrid(1); + stopGrid(2); + + checkHistorySize(histSize); + + awaitPartitionMapExchange(); + + checkHistorySize(histSize); + } + finally { + if (oldHistVal != null) + System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, oldHistVal); + else + System.clearProperty(IGNITE_EXCHANGE_HISTORY_SIZE); + } + } + + /** + * @param histSize History size. + */ + private void checkHistorySize(int histSize) { + List<Ignite> nodes = G.allGrids(); + + assertTrue(nodes.size() > 0); + + for (Ignite node : nodes) { + List<GridDhtPartitionsExchangeFuture> exchFuts = + ((IgniteEx)node).context().cache().context().exchange().exchangeFutures(); + + assertTrue("Unexpected size: " + exchFuts.size(), exchFuts.size() > 0 && exchFuts.size() <= histSize); + } + } + + /** + * @throws Exception If failed. + */ + public void testStartCacheOnJoinAndMergeWithFail() throws Exception { + cfgCache = false; + + final Ignite srv0 = startGrids(2); + + mergeExchangeWaitVersion(srv0, 5); + + cfgCache = true; + + IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2); + + stopGrid(1); + + fut.get(); + + checkCaches(); + + checkExchanges(srv0, 1, 2, 3, 5); + checkExchanges(ignite(2), 3, 5); + checkExchanges(ignite(3), 5); + } + + /** + * @throws Exception If failed. + */ + public void testStartCacheOnJoinAndCoordinatorFailed1() throws Exception { + cfgCache = false; + + final Ignite srv0 = startGrids(2); + + mergeExchangeWaitVersion(srv0, 5); + + cfgCache = true; + + IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2); + + stopGrid(0); + + fut.get(); + + checkCaches(); + } + + /** + * @throws Exception If failed. + */ + public void testStartCacheOnJoinAndCoordinatorFailed2() throws Exception { + cfgCache = false; + + final Ignite srv0 = startGrid(0); + + mergeExchangeWaitVersion(srv0, 3); + + cfgCache = true; + + IgniteInternalFuture fut = startGridsAsync(srv0, 1, 2); + + stopGrid(0); + + fut.get(); + + checkCaches(); + } + + /** + * @throws Exception If failed. + */ + public void testMergeServersJoin1() throws Exception { + IgniteEx srv0 = startGrid(0); + + mergeExchangeWaitVersion(srv0, 3); + + final AtomicInteger idx = new AtomicInteger(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(idx.getAndIncrement()); + + return null; + } + }, 2, "start-node"); + + fut.get(); + + checkCaches(); + + checkExchanges(srv0, 1, 3); + checkExchanges(ignite(1), 3); + checkExchanges(ignite(2), 3); + } + + /** + * @throws Exception If failed. + */ + public void testMergeServerJoin1ClientsInTopology() throws Exception { + IgniteEx srv0 = startGrid(0); + + client.set(true); + + startGrid(1); + + client.set(true); + + startGrid(2); + + mergeExchangeWaitVersion(srv0, 5); + + final AtomicInteger idx = new AtomicInteger(3); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(idx.getAndIncrement()); + + return null; + } + }, 2, "start-node"); + + fut.get(); + + checkCaches(); + + checkExchanges(srv0, 1, 2, 3, 5); + checkExchanges(ignite(1), 2, 3, 5); + checkExchanges(ignite(2), 3, 5); + checkExchanges(ignite(3), 5); + checkExchanges(ignite(4), 5); + } + + /** + * @throws Exception If failed. + */ + public void testMergeAndNewCoordinator() throws Exception { + final Ignite srv0 = startGrids(3); + + mergeExchangeWaitVersion(srv0, 6); + + IgniteInternalFuture fut = startGridsAsync(srv0, 3, 3); + + fut.get(); + + checkCaches(); + + stopGrid(0); + + checkCaches(); + } + + /** + * @throws Exception If failed. + */ + public void testMergeServersFail1_1() throws Exception { + mergeServersFail1(false); + } + + /** + * @throws Exception If failed. + */ + public void testMergeServersFail1_2() throws Exception { + mergeServersFail1(true); + } + + /** + * @param waitRebalance Wait for rebalance end before start tested topology change. + * @throws Exception If failed. + */ + private void mergeServersFail1(boolean waitRebalance) throws Exception { + final Ignite srv0 = startGrids(5); + + if (waitRebalance) + awaitPartitionMapExchange(); + + final List<DiscoveryEvent> mergedEvts = new ArrayList<>(); + + mergeExchangeWaitVersion(srv0, 8, mergedEvts); + + UUID grid3Id = grid(3).localNode().id(); + UUID grid2Id = grid(2).localNode().id(); + + stopGrid(getTestIgniteInstanceName(4), true, false); + stopGrid(getTestIgniteInstanceName(3), true, false); + stopGrid(getTestIgniteInstanceName(2), true, false); + + checkCaches(); + + awaitPartitionMapExchange(); + + assertTrue("Unexpected number of merged disco events: " + mergedEvts.size(), mergedEvts.size() == 2); + + for (DiscoveryEvent discoEvt : mergedEvts) { + ClusterNode evtNode = discoEvt.eventNode(); + + assertTrue("eventNode is null for DiscoEvent " + discoEvt, evtNode != null); + + assertTrue("Unexpected eventNode ID: " + + evtNode.id() + " while expecting " + grid2Id + " or " + grid3Id, + evtNode.id().equals(grid2Id) || evtNode.id().equals(grid3Id)); + } + } + + /** + * @throws Exception If failed. + */ + public void testMergeServersAndClientsFail1() throws Exception { + mergeServersAndClientsFail(false); + } + + /** + * @throws Exception If failed. + */ + public void testMergeServersAndClientsFail2() throws Exception { + mergeServersAndClientsFail(true); + } + + + /** + * @param waitRebalance Wait for rebalance end before start tested topology change. + * @throws Exception If failed. + */ + private void mergeServersAndClientsFail(boolean waitRebalance) throws Exception { + clientC = new IgniteClosure<String, Boolean>() { + @Override public Boolean apply(String nodeName) { + return nodeName.equals(getTestIgniteInstanceName(2)) || nodeName.equals(getTestIgniteInstanceName(3)); + } + }; + + final Ignite srv0 = startGrids(6); + + if (waitRebalance) + awaitPartitionMapExchange(); + + mergeExchangeWaitVersion(srv0, 10); + + stopGrid(getTestIgniteInstanceName(1), true, false); + stopGrid(getTestIgniteInstanceName(2), true, false); + stopGrid(getTestIgniteInstanceName(3), true, false); + stopGrid(getTestIgniteInstanceName(4), true, false); + + checkAffinity(); + + mergeExchangeWaitVersion(srv0, 12); + + IgniteInternalFuture fut = startGridsAsync(srv0, 6, 2); + + fut.get(); + + checkCaches(); + } + + /** + * @throws Exception If failed. + */ + public void testJoinExchangeCoordinatorChange_NoMerge_1() throws Exception { + for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { + exchangeCoordinatorChangeNoMerge(4, true, mode); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testJoinExchangeCoordinatorChange_NoMerge_2() throws Exception { + for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { + exchangeCoordinatorChangeNoMerge(8, true, mode); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testFailExchangeCoordinatorChange_NoMerge_1() throws Exception { + for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { + exchangeCoordinatorChangeNoMerge(5, false, mode); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testFailExchangeCoordinatorChange_NoMerge_2() throws Exception { + for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { + exchangeCoordinatorChangeNoMerge(8, false, mode); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testMergeJoinExchangesCoordinatorChange1_4_servers() throws Exception { + for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { + mergeJoinExchangesCoordinatorChange1(4, mode); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testMergeJoinExchangesCoordinatorChange1_8_servers() throws Exception { + for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { + mergeJoinExchangesCoordinatorChange1(8, mode); + + stopAllGrids(); + } + } + + /** + * @param srvs Number of server nodes. + * @param mode Test mode. + * @throws Exception If failed. + */ + private void mergeJoinExchangesCoordinatorChange1(final int srvs, CoordinatorChangeMode mode) + throws Exception + { + log.info("Test mergeJoinExchangesCoordinatorChange1 [srvs=" + srvs + ", mode=" + mode + ']'); + + testSpi = true; + + Ignite srv0 = startGrids(srvs); + + mergeExchangeWaitVersion(srv0, 6); + + CountDownLatch latch = blockExchangeFinish(srvs, mode); + + IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, 2); + + if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS)) + fail("Failed to wait for expected messages."); + + stopGrid(getTestIgniteInstanceName(0), true, false); + + fut.get(); + + checkCaches(); + } + + /** + * @throws Exception If failed. + */ + public void testMergeJoinExchangesCoordinatorChange2_4_servers() throws Exception { + mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 4), F.asList(5)); + + stopAllGrids(); + + mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 5), F.asList(4)); + } + + /** + * @param srvs Number of server nodes. + * @param startNodes Number of nodes to start. + * @param blockNodes Nodes which do not receive messages. + * @param waitMsgNodes Nodes which should receive messages. + * @throws Exception If failed. + */ + private void mergeJoinExchangeCoordinatorChange2(final int srvs, + final int startNodes, + List<Integer> blockNodes, + List<Integer> waitMsgNodes) throws Exception + { + testSpi = true; + + Ignite srv0 = startGrids(srvs); + + mergeExchangeWaitVersion(srv0, srvs + startNodes); + + CountDownLatch latch = blockExchangeFinish(srv0, srvs + 1, blockNodes, waitMsgNodes); + + IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, startNodes); + + if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS)) + fail("Failed to wait for expected messages."); + + stopGrid(getTestIgniteInstanceName(0), true, false); + + fut.get(); + + checkCaches(); + } + + /** + * @throws Exception If failed. + */ + public void testMergeExchangeCoordinatorChange4() throws Exception { + testSpi = true; + + final int srvs = 4; + + Ignite srv0 = startGrids(srvs); + + mergeExchangeWaitVersion(srv0, 6); + + final AtomicInteger idx = new AtomicInteger(srvs); + + CountDownLatch latch = blockExchangeFinish(srv0, 5, F.asList(1, 2, 3, 4), F.asList(5)); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(idx.getAndIncrement()); + + return null; + } + }, 2, "start-node"); + + if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS)) + fail("Failed to wait for expected messages."); + + stopGrid(getTestIgniteInstanceName(0), true, false); + + fut.get(); + + checkCaches(); + } + + /** + * @param srvs Number of servers. + * @param join If {@code true} starts new node, otherwise stops node. + * @param mode Tested scenario. + * @throws Exception If failed. + */ + private void exchangeCoordinatorChangeNoMerge(int srvs, final boolean join, CoordinatorChangeMode mode) throws Exception { + log.info("Test mergeJoinExchangeCoordinatorChange [nodes=" + srvs + ", mode=" + mode + ']'); + + testSpi = true; + + final int nodes = srvs; + + startGrids(nodes); + + CountDownLatch latch = blockExchangeFinish(srvs, mode); - TestDebugLog1.clear(); + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + if (join) + startGrid(nodes); + else + stopGrid(nodes - 1); + + return null; + } + }); + + waitForExchangeStart(ignite(0), nodes + 1); + + if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS)) + fail("Failed to wait for expected messages."); + + stopGrid(0); + + fut.get(); + + checkCaches(); + } + + /** + * @param srvs Number of server nodes. + * @param mode Test scenario. + * @return Awaited state latch. + * @throws Exception If failed. + */ + private CountDownLatch blockExchangeFinish(int srvs, CoordinatorChangeMode mode) throws Exception { + Ignite crd = ignite(0); + + long topVer = srvs + 1; + + switch (mode) { + case NOBODY_RCVD: { + blockExchangeFinish(crd, topVer); + + break; + } + + case NEW_CRD_RCDV: { + List<Integer> finishNodes = F.asList(1); + + return blockExchangeFinish(crd, topVer, blockNodes(srvs, finishNodes), finishNodes); + } + + case NON_CRD_RCVD: { + assert srvs > 2 : srvs; + + List<Integer> finishNodes = F.asList(2); + + return blockExchangeFinish(crd, topVer, blockNodes(srvs, finishNodes), finishNodes); + } + + default: + fail(); + } + + return null; + } + + /** + * @param srvs Number of servers. + * @param waitNodes Nodes which should receive message. + * @return Blocked nodes indexes. + */ + private List<Integer> blockNodes(int srvs, List<Integer> waitNodes) { + List<Integer> block = new ArrayList<>(); + + for (int i = 0; i < srvs + 1; i++) { + if (!waitNodes.contains(i)) + block.add(i); } + + return block; + } + + /** + * @param crd Exchange coordinator. + * @param topVer Exchange topology version. + */ + private void blockExchangeFinish(Ignite crd, long topVer) { + final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer); + + TestRecordingCommunicationSpi.spi(crd).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + if (msg instanceof GridDhtPartitionsFullMessage) { + GridDhtPartitionsFullMessage msg0 = (GridDhtPartitionsFullMessage)msg; + + return msg0.exchangeId() != null && msg0.exchangeId().topologyVersion().equals(topVer0); + } + + return false; + } + }); + } + + /** + * @param crd Exchange coordinator. + * @param topVer Exchange topology version. + * @param blockNodes Nodes which do not receive messages. + * @param waitMsgNodes Nodes which should receive messages. + * @return Awaited state latch. + */ + private CountDownLatch blockExchangeFinish(Ignite crd, + long topVer, + final List<Integer> blockNodes, + final List<Integer> waitMsgNodes) + { + log.info("blockExchangeFinish [crd=" + crd.cluster().localNode().id() + + ", block=" + blockNodes + + ", wait=" + waitMsgNodes + ']'); + + final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer); + + final CountDownLatch latch = new CountDownLatch(waitMsgNodes.size()); + + TestRecordingCommunicationSpi.spi(crd).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + if (msg instanceof GridDhtPartitionsFullMessage) { + GridDhtPartitionsFullMessage msg0 = (GridDhtPartitionsFullMessage)msg; + + if (msg0.exchangeId() == null || msg0.exchangeId().topologyVersion().compareTo(topVer0) < 0) + return false; + + String name = node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME); + + assert name != null : node; + + for (Integer idx : blockNodes) { + if (name.equals(getTestIgniteInstanceName(idx))) + return true; + } + + for (Integer idx : waitMsgNodes) { + if (name.equals(getTestIgniteInstanceName(idx))) { + log.info("Coordinators sends awaited message [node=" + node.id() + ']'); + + latch.countDown(); + } + } + } + + return false; + } + }); + + return latch; } /**