Merge remote-tracking branch 'remotes/origin/master' into ignite-6467

# Conflicts:
#       
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30bac7fc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30bac7fc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30bac7fc

Branch: refs/heads/ignite-6467
Commit: 30bac7fc18ba09cd092192bcc4e79377b655c1ae
Parents: a867641
Author: sboikov <sboi...@apache.org>
Authored: Sat Jul 28 10:26:03 2018 +0300
Committer: sboikov <sboi...@apache.org>
Committed: Sat Jul 28 10:26:03 2018 +0300

----------------------------------------------------------------------
 .../distributed/CacheExchangeMergeTest.java     | 951 ++++++++++++++++++-
 1 file changed, 950 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/30bac7fc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index a2259ff..6502d3d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -206,6 +206,213 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testDelayExchangeMessages() throws Exception {
+        testDelaySpi = true;
+
+        System.setProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY, 
"2000");
+
+        try {
+            final int srvs = 6;
+            final int clients = 3;
+
+            startGridsMultiThreaded(srvs);
+
+            for (int i = 0; i < clients; i++) {
+                client.set(true);
+
+                startGrid(srvs + i);
+            }
+
+            final int initNodes = srvs + clients;
+
+            final AtomicInteger stopIdx = new AtomicInteger();
+
+            IgniteInternalFuture stopFut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    Thread.sleep(ThreadLocalRandom.current().nextLong(500) + 
1);
+
+                    stopGrid(stopIdx.incrementAndGet());
+
+                    return null;
+                }
+            }, 3, "stop-srv");
+
+            final AtomicInteger startIdx = new AtomicInteger(initNodes);
+
+            IgniteInternalFuture startFut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int nodeIdx = startIdx.incrementAndGet();
+
+                    if (rnd.nextInt(3) == 0) {
+                        log.info("Start client: " + nodeIdx);
+
+                        client.set(true);
+                    }
+                    else
+                        log.info("Start server: " + nodeIdx);
+
+                    startGrid(nodeIdx);
+
+                    if (rnd.nextBoolean()) {
+                        log.info("Stop started node: " + nodeIdx);
+
+                        stopGrid(nodeIdx);
+                    }
+
+                    return null;
+                }
+            }, 5, "start-node");
+
+            stopFut.get();
+            startFut.get();
+
+            checkCaches();
+        }
+        finally {
+            
System.clearProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeStartRandomClientsServers() throws Exception {
+        for (int iter = 0; iter < 3; iter++) {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            final int srvs = rnd.nextInt(3) + 1;
+            final int clients = rnd.nextInt(3);
+
+            log.info("Iteration [iter=" + iter + ", srvs=" + srvs + ", 
clients=" + clients + ']');
+
+            Ignite srv0 = startGrids(srvs);
+
+            for (int i = 0; i < clients; i++) {
+                client.set(true);
+
+                startGrid(srvs + i);
+            }
+
+            final int threads = 8;
+
+            final int initNodes = srvs + clients;
+
+            mergeExchangeWaitVersion(srv0, initNodes + threads);
+
+            final AtomicInteger idx = new AtomicInteger(initNodes);
+
+            IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int nodeIdx = idx.incrementAndGet();
+
+                    if (rnd.nextInt(3) == 0) {
+                        log.info("Start client: " + nodeIdx);
+
+                        client.set(true);
+                    }
+                    else
+                        log.info("Start server: " + nodeIdx);
+
+                    startGrid(nodeIdx);
+
+                    return null;
+                }
+            }, threads, "test-thread");
+
+            fut.get();
+
+            checkCaches();
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeStartStopRandomClientsServers() throws Exception {
+        for (int iter = 0; iter < 3; iter++) {
+            log.info("Iteration: " + iter);
+
+            final int srvs = 5;
+            final int clients = 5;
+
+            Ignite srv0 = startGrids(srvs);
+
+            for (int i = 0; i < clients; i++) {
+                client.set(true);
+
+                startGrid(srvs + i);
+            }
+
+            final int threads = 8;
+
+            final int initNodes = srvs + clients;
+
+            mergeExchangeWaitVersion(srv0, initNodes + threads);
+
+            final AtomicInteger idx = new AtomicInteger(initNodes);
+
+            final ConcurrentHashSet<Integer> stopNodes = new 
ConcurrentHashSet<>();
+
+            IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    if (rnd.nextBoolean()) {
+                        Integer stopIdx;
+
+                        for (;;) {
+                            stopIdx = rnd.nextInt(initNodes - 1) + 1;
+
+                            if (stopNodes.add(stopIdx))
+                                break;
+                        }
+
+                        log.info("Stop node: " + stopIdx);
+
+                        stopGrid(getTestIgniteInstanceName(stopIdx), true, 
false);
+                    }
+                    else {
+                        int nodeIdx = idx.incrementAndGet();
+
+                        if (rnd.nextInt(5) == 0) {
+                            log.info("Start client: " + nodeIdx);
+
+                            client.set(true);
+                        }
+                        else
+                            log.info("Start server: " + nodeIdx);
+
+                        startGrid(nodeIdx);
+                    }
+
+                    return null;
+                }
+            }, threads, "test-thread");
+
+            fut.get();
+
+            checkCaches();
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentStartServers() throws Exception {
+        concurrentStart(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testConcurrentStartServersAndClients() throws Exception {
         concurrentStart(true);
     }
@@ -246,9 +453,751 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
             checkCaches();
 
             stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeServerAndClientJoin1() throws Exception {
+        final IgniteEx srv0 = startGrid(0);
+
+        mergeExchangeWaitVersion(srv0, 3);
+
+        IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                startGrid(1);
+
+                return null;
+            }
+        }, 1, "start-srv");
+
+        waitForExchangeStart(srv0, 2);
+
+        IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                client.set(true);
+
+                startGrid(2);
+
+                return null;
+            }
+        }, 1, "start-client");
+
+        fut1.get();
+        fut2.get();
+
+        checkCaches();
+
+        checkExchanges(srv0, 1, 3);
+        checkExchanges(ignite(1), 3);
+        checkExchanges(ignite(2), 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheOnJoinAndJoinMerge_2_nodes() throws Exception {
+        startCacheOnJoinAndJoinMerge1(2, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheOnJoinAndJoinMerge_4_nodes() throws Exception {
+        startCacheOnJoinAndJoinMerge1(4, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheOnJoinAndJoinMerge_WithClients() throws 
Exception {
+        startCacheOnJoinAndJoinMerge1(5, true);
+    }
+
+    /**
+     * @param nodes Number of nodes to start.
+     * @param withClients If {@code true} starts both servers and clients.
+     * @throws Exception If failed.
+     */
+    private void startCacheOnJoinAndJoinMerge1(int nodes, boolean withClients) 
throws Exception {
+        cfgCache = false;
+
+        final IgniteEx srv0 = startGrid(0);
+
+        mergeExchangeWaitVersion(srv0, nodes + 1);
+
+        if (withClients) {
+            clientC = new IgniteClosure<String, Boolean>() {
+                @Override public Boolean apply(String nodeName) {
+                    return getTestIgniteInstanceIndex(nodeName) % 2 == 0;
+                }
+            };
+        }
+
+        cfgCache = true;
+
+        IgniteInternalFuture fut = startGridsAsync(srv0, 1, nodes);
+
+        fut.get();
+
+        checkCaches();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeAndHistoryCleanup() throws Exception {
+        final int histSize = 5;
+
+        String oldHistVal = System.getProperty(IGNITE_EXCHANGE_HISTORY_SIZE);
+
+        System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, 
String.valueOf(histSize));
+
+        try {
+            final Ignite srv0 = startGrid(0);
+
+            int topVer = 1;
+
+            for (int i = 0; i < 3; i++) {
+                mergeExchangeWaitVersion(srv0, topVer + 3);
+
+                startGridsAsync(srv0, topVer, 3).get();
+
+                topVer += 3;
+            }
+
+            checkHistorySize(histSize);
+
+            awaitPartitionMapExchange();
+
+            checkHistorySize(histSize);
+
+            mergeExchangeWaitVersion(srv0, topVer + 2);
+
+            stopGrid(1);
+            stopGrid(2);
+
+            checkHistorySize(histSize);
+
+            awaitPartitionMapExchange();
+
+            checkHistorySize(histSize);
+        }
+        finally {
+            if (oldHistVal != null)
+                System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, oldHistVal);
+            else
+                System.clearProperty(IGNITE_EXCHANGE_HISTORY_SIZE);
+        }
+    }
+
+    /**
+     * @param histSize History size.
+     */
+    private void checkHistorySize(int histSize) {
+        List<Ignite> nodes = G.allGrids();
+
+        assertTrue(nodes.size() > 0);
+
+        for (Ignite node : nodes) {
+            List<GridDhtPartitionsExchangeFuture> exchFuts =
+                    
((IgniteEx)node).context().cache().context().exchange().exchangeFutures();
+
+            assertTrue("Unexpected size: " + exchFuts.size(), exchFuts.size() 
> 0 && exchFuts.size() <= histSize);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheOnJoinAndMergeWithFail() throws Exception {
+        cfgCache = false;
+
+        final Ignite srv0 = startGrids(2);
+
+        mergeExchangeWaitVersion(srv0, 5);
+
+        cfgCache = true;
+
+        IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2);
+
+        stopGrid(1);
+
+        fut.get();
+
+        checkCaches();
+
+        checkExchanges(srv0, 1, 2, 3, 5);
+        checkExchanges(ignite(2), 3, 5);
+        checkExchanges(ignite(3), 5);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheOnJoinAndCoordinatorFailed1() throws Exception {
+        cfgCache = false;
+
+        final Ignite srv0 = startGrids(2);
+
+        mergeExchangeWaitVersion(srv0, 5);
+
+        cfgCache = true;
+
+        IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2);
+
+        stopGrid(0);
+
+        fut.get();
+
+        checkCaches();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheOnJoinAndCoordinatorFailed2() throws Exception {
+        cfgCache = false;
+
+        final Ignite srv0 = startGrid(0);
+
+        mergeExchangeWaitVersion(srv0, 3);
+
+        cfgCache = true;
+
+        IgniteInternalFuture fut = startGridsAsync(srv0, 1, 2);
+
+        stopGrid(0);
+
+        fut.get();
+
+        checkCaches();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeServersJoin1() throws Exception {
+        IgniteEx srv0 = startGrid(0);
+
+        mergeExchangeWaitVersion(srv0, 3);
+
+        final AtomicInteger idx = new AtomicInteger(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                startGrid(idx.getAndIncrement());
+
+                return null;
+            }
+        }, 2, "start-node");
+
+        fut.get();
+
+        checkCaches();
+
+        checkExchanges(srv0, 1, 3);
+        checkExchanges(ignite(1), 3);
+        checkExchanges(ignite(2), 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeServerJoin1ClientsInTopology() throws Exception {
+        IgniteEx srv0 = startGrid(0);
+
+        client.set(true);
+
+        startGrid(1);
+
+        client.set(true);
+
+        startGrid(2);
+
+        mergeExchangeWaitVersion(srv0, 5);
+
+        final AtomicInteger idx = new AtomicInteger(3);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                startGrid(idx.getAndIncrement());
+
+                return null;
+            }
+        }, 2, "start-node");
+
+        fut.get();
+
+        checkCaches();
+
+        checkExchanges(srv0, 1, 2, 3, 5);
+        checkExchanges(ignite(1), 2, 3, 5);
+        checkExchanges(ignite(2), 3, 5);
+        checkExchanges(ignite(3), 5);
+        checkExchanges(ignite(4), 5);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeAndNewCoordinator() throws Exception {
+        final Ignite srv0 = startGrids(3);
+
+        mergeExchangeWaitVersion(srv0, 6);
+
+        IgniteInternalFuture fut = startGridsAsync(srv0, 3, 3);
+
+        fut.get();
+
+        checkCaches();
+
+        stopGrid(0);
+
+        checkCaches();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeServersFail1_1() throws Exception {
+        mergeServersFail1(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeServersFail1_2() throws Exception {
+        mergeServersFail1(true);
+    }
+
+    /**
+     * @param waitRebalance Wait for rebalance end before start tested 
topology change.
+     * @throws Exception If failed.
+     */
+    private void mergeServersFail1(boolean waitRebalance) throws Exception {
+        final Ignite srv0 = startGrids(5);
+
+        if (waitRebalance)
+            awaitPartitionMapExchange();
+
+        final List<DiscoveryEvent> mergedEvts = new ArrayList<>();
+
+        mergeExchangeWaitVersion(srv0, 8, mergedEvts);
+
+        UUID grid3Id = grid(3).localNode().id();
+        UUID grid2Id = grid(2).localNode().id();
+
+        stopGrid(getTestIgniteInstanceName(4), true, false);
+        stopGrid(getTestIgniteInstanceName(3), true, false);
+        stopGrid(getTestIgniteInstanceName(2), true, false);
+
+        checkCaches();
+
+        awaitPartitionMapExchange();
+
+        assertTrue("Unexpected number of merged disco events: " + 
mergedEvts.size(), mergedEvts.size() == 2);
+
+        for (DiscoveryEvent discoEvt : mergedEvts) {
+            ClusterNode evtNode = discoEvt.eventNode();
+
+            assertTrue("eventNode is null for DiscoEvent " + discoEvt, evtNode 
!= null);
+
+            assertTrue("Unexpected eventNode ID: "
+                    + evtNode.id() + " while expecting " + grid2Id + " or " + 
grid3Id,
+                evtNode.id().equals(grid2Id) || evtNode.id().equals(grid3Id));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeServersAndClientsFail1() throws Exception {
+        mergeServersAndClientsFail(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeServersAndClientsFail2() throws Exception {
+        mergeServersAndClientsFail(true);
+    }
+
+
+    /**
+     * @param waitRebalance Wait for rebalance end before start tested 
topology change.
+     * @throws Exception If failed.
+     */
+    private void mergeServersAndClientsFail(boolean waitRebalance) throws 
Exception {
+        clientC = new IgniteClosure<String, Boolean>() {
+            @Override public Boolean apply(String nodeName) {
+                return nodeName.equals(getTestIgniteInstanceName(2)) || 
nodeName.equals(getTestIgniteInstanceName(3));
+            }
+        };
+
+        final Ignite srv0 = startGrids(6);
+
+        if (waitRebalance)
+            awaitPartitionMapExchange();
+
+        mergeExchangeWaitVersion(srv0, 10);
+
+        stopGrid(getTestIgniteInstanceName(1), true, false);
+        stopGrid(getTestIgniteInstanceName(2), true, false);
+        stopGrid(getTestIgniteInstanceName(3), true, false);
+        stopGrid(getTestIgniteInstanceName(4), true, false);
+
+        checkAffinity();
+
+        mergeExchangeWaitVersion(srv0, 12);
+
+        IgniteInternalFuture fut = startGridsAsync(srv0, 6, 2);
+
+        fut.get();
+
+        checkCaches();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinExchangeCoordinatorChange_NoMerge_1() throws Exception 
{
+        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+            exchangeCoordinatorChangeNoMerge(4, true, mode);
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinExchangeCoordinatorChange_NoMerge_2() throws Exception 
{
+        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+            exchangeCoordinatorChangeNoMerge(8, true, mode);
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailExchangeCoordinatorChange_NoMerge_1() throws Exception 
{
+        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+            exchangeCoordinatorChangeNoMerge(5, false, mode);
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailExchangeCoordinatorChange_NoMerge_2() throws Exception 
{
+        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+            exchangeCoordinatorChangeNoMerge(8, false, mode);
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeJoinExchangesCoordinatorChange1_4_servers() throws 
Exception {
+        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+            mergeJoinExchangesCoordinatorChange1(4, mode);
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeJoinExchangesCoordinatorChange1_8_servers() throws 
Exception {
+        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+            mergeJoinExchangesCoordinatorChange1(8, mode);
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param srvs Number of server nodes.
+     * @param mode Test mode.
+     * @throws Exception If failed.
+     */
+    private void mergeJoinExchangesCoordinatorChange1(final int srvs, 
CoordinatorChangeMode mode)
+        throws Exception
+    {
+        log.info("Test mergeJoinExchangesCoordinatorChange1 [srvs=" + srvs + 
", mode=" + mode + ']');
+
+        testSpi = true;
+
+        Ignite srv0 = startGrids(srvs);
+
+        mergeExchangeWaitVersion(srv0, 6);
+
+        CountDownLatch latch = blockExchangeFinish(srvs, mode);
+
+        IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, 2);
+
+        if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
+            fail("Failed to wait for expected messages.");
+
+        stopGrid(getTestIgniteInstanceName(0), true, false);
+
+        fut.get();
+
+        checkCaches();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeJoinExchangesCoordinatorChange2_4_servers() throws 
Exception {
+        mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 4), 
F.asList(5));
+
+        stopAllGrids();
+
+        mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 5), 
F.asList(4));
+    }
+
+    /**
+     * @param srvs Number of server nodes.
+     * @param startNodes Number of nodes to start.
+     * @param blockNodes Nodes which do not receive messages.
+     * @param waitMsgNodes Nodes which should receive messages.
+     * @throws Exception If failed.
+     */
+    private void mergeJoinExchangeCoordinatorChange2(final int srvs,
+        final int startNodes,
+        List<Integer> blockNodes,
+        List<Integer> waitMsgNodes) throws Exception
+    {
+        testSpi = true;
+
+        Ignite srv0 = startGrids(srvs);
+
+        mergeExchangeWaitVersion(srv0, srvs + startNodes);
+
+        CountDownLatch latch = blockExchangeFinish(srv0, srvs + 1, blockNodes, 
waitMsgNodes);
+
+        IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, startNodes);
+
+        if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
+            fail("Failed to wait for expected messages.");
+
+        stopGrid(getTestIgniteInstanceName(0), true, false);
+
+        fut.get();
+
+        checkCaches();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeExchangeCoordinatorChange4() throws Exception {
+        testSpi = true;
+
+        final int srvs = 4;
+
+        Ignite srv0 = startGrids(srvs);
+
+        mergeExchangeWaitVersion(srv0, 6);
+
+        final AtomicInteger idx = new AtomicInteger(srvs);
+
+        CountDownLatch latch = blockExchangeFinish(srv0, 5, F.asList(1, 2, 3, 
4), F.asList(5));
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                startGrid(idx.getAndIncrement());
+
+                return null;
+            }
+        }, 2, "start-node");
+
+        if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
+            fail("Failed to wait for expected messages.");
+
+        stopGrid(getTestIgniteInstanceName(0), true, false);
+
+        fut.get();
+
+        checkCaches();
+    }
+
+    /**
+     * @param srvs Number of servers.
+     * @param join If {@code true} starts new node, otherwise stops node.
+     * @param mode Tested scenario.
+     * @throws Exception If failed.
+     */
+    private void exchangeCoordinatorChangeNoMerge(int srvs, final boolean 
join, CoordinatorChangeMode mode) throws Exception {
+        log.info("Test mergeJoinExchangeCoordinatorChange [nodes=" + srvs + ", 
mode=" + mode + ']');
+
+        testSpi = true;
+
+        final int nodes = srvs;
+
+        startGrids(nodes);
+
+        CountDownLatch latch = blockExchangeFinish(srvs, mode);
 
-            TestDebugLog1.clear();
+        IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                if (join)
+                    startGrid(nodes);
+                else
+                    stopGrid(nodes - 1);
+
+                return null;
+            }
+        });
+
+        waitForExchangeStart(ignite(0), nodes + 1);
+
+        if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
+            fail("Failed to wait for expected messages.");
+
+        stopGrid(0);
+
+        fut.get();
+
+        checkCaches();
+    }
+
+    /**
+     * @param srvs Number of server nodes.
+     * @param mode Test scenario.
+     * @return Awaited state latch.
+     * @throws Exception If failed.
+     */
+    private CountDownLatch blockExchangeFinish(int srvs, CoordinatorChangeMode 
mode) throws Exception {
+        Ignite crd = ignite(0);
+
+        long topVer = srvs + 1;
+
+        switch (mode) {
+            case NOBODY_RCVD: {
+                blockExchangeFinish(crd, topVer);
+
+                break;
+            }
+
+            case NEW_CRD_RCDV: {
+                List<Integer> finishNodes = F.asList(1);
+
+                return blockExchangeFinish(crd, topVer, blockNodes(srvs, 
finishNodes), finishNodes);
+            }
+
+            case NON_CRD_RCVD: {
+                assert srvs > 2 : srvs;
+
+                List<Integer> finishNodes = F.asList(2);
+
+                return blockExchangeFinish(crd, topVer, blockNodes(srvs, 
finishNodes), finishNodes);
+            }
+
+            default:
+                fail();
+        }
+
+        return null;
+    }
+
+    /**
+     * @param srvs Number of servers.
+     * @param waitNodes Nodes which should receive message.
+     * @return Blocked nodes indexes.
+     */
+    private List<Integer> blockNodes(int srvs, List<Integer> waitNodes) {
+        List<Integer> block = new ArrayList<>();
+
+        for (int i = 0; i < srvs + 1; i++) {
+            if (!waitNodes.contains(i))
+                block.add(i);
         }
+
+        return block;
+    }
+
+    /**
+     * @param crd Exchange coordinator.
+     * @param topVer Exchange topology version.
+     */
+    private void blockExchangeFinish(Ignite crd, long topVer) {
+        final AffinityTopologyVersion topVer0 = new 
AffinityTopologyVersion(topVer);
+
+        TestRecordingCommunicationSpi.spi(crd).blockMessages(new 
IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                if (msg instanceof GridDhtPartitionsFullMessage) {
+                    GridDhtPartitionsFullMessage msg0 = 
(GridDhtPartitionsFullMessage)msg;
+
+                    return msg0.exchangeId() != null && 
msg0.exchangeId().topologyVersion().equals(topVer0);
+                }
+
+                return false;
+            }
+        });
+    }
+
+    /**
+     * @param crd Exchange coordinator.
+     * @param topVer Exchange topology version.
+     * @param blockNodes Nodes which do not receive messages.
+     * @param waitMsgNodes Nodes which should receive messages.
+     * @return Awaited state latch.
+     */
+    private CountDownLatch blockExchangeFinish(Ignite crd,
+        long topVer,
+        final List<Integer> blockNodes,
+        final List<Integer> waitMsgNodes)
+    {
+        log.info("blockExchangeFinish [crd=" + crd.cluster().localNode().id() +
+            ", block=" + blockNodes +
+            ", wait=" + waitMsgNodes + ']');
+
+        final AffinityTopologyVersion topVer0 = new 
AffinityTopologyVersion(topVer);
+
+        final CountDownLatch latch = new CountDownLatch(waitMsgNodes.size());
+
+        TestRecordingCommunicationSpi.spi(crd).blockMessages(new 
IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                if (msg instanceof GridDhtPartitionsFullMessage) {
+                    GridDhtPartitionsFullMessage msg0 = 
(GridDhtPartitionsFullMessage)msg;
+
+                    if (msg0.exchangeId() == null || 
msg0.exchangeId().topologyVersion().compareTo(topVer0) < 0)
+                        return false;
+
+                    String name = 
node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME);
+
+                    assert name != null : node;
+
+                    for (Integer idx : blockNodes) {
+                        if (name.equals(getTestIgniteInstanceName(idx)))
+                            return true;
+                    }
+
+                    for (Integer idx : waitMsgNodes) {
+                        if (name.equals(getTestIgniteInstanceName(idx))) {
+                            log.info("Coordinators sends awaited message 
[node=" + node.id() + ']');
+
+                            latch.countDown();
+                        }
+                    }
+                }
+
+                return false;
+            }
+        });
+
+        return latch;
     }
 
     /**

Reply via email to