Repository: ignite
Updated Branches:
  refs/heads/ignite-zk a6b452823 -> 6cb8c06f7


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6cb8c06f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6cb8c06f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6cb8c06f

Branch: refs/heads/ignite-zk
Commit: 6cb8c06f73ace3030f47ccfe21e2f46d6b054e5f
Parents: a6b4528
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Nov 22 13:28:51 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Nov 22 13:39:39 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  16 +-
 .../zk/ZookeeperDiscoverySpiBasicTest.java      | 911 ------------------
 .../ZookeeperDiscoverySpiBasicTest.java         | 942 +++++++++++++++++++
 3 files changed, 947 insertions(+), 922 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6cb8c06f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 5e9c5a3..11c6a6e 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -322,20 +322,10 @@ public class ZookeeperDiscoveryImpl {
             dirs.add(zkPaths.evtsPath);
             dirs.add(zkPaths.joinDataDir);
             dirs.add(zkPaths.customEvtsDir);
+            dirs.add(zkPaths.customEvtsAcksDir);
             dirs.add(zkPaths.aliveNodesDir);
 
             zkClient.createAllIfNeeded(dirs, PERSISTENT);
-//            zkClient.createIfNeeded(zkPaths.basePath, null, PERSISTENT);
-//
-//            zkClient.createIfNeeded(zkPaths.clusterDir, null, PERSISTENT);
-//
-//            zkClient.createIfNeeded(zkPaths.evtsPath, null, PERSISTENT);
-//
-//            zkClient.createIfNeeded(zkPaths.joinDataDir, null, PERSISTENT);
-//
-//            zkClient.createIfNeeded(zkPaths.customEvtsDir, null, PERSISTENT);
-//
-//            zkClient.createIfNeeded(zkPaths.aliveNodesDir, null, PERSISTENT);
         }
         catch (ZookeeperClientFailedException e) {
             throw new IgniteSpiException("Failed to initialize Zookeeper 
nodes", e);
@@ -821,6 +811,10 @@ public class ZookeeperDiscoveryImpl {
         zkClient.deleteAll(zkPaths.customEvtsDir,
             zkClient.getChildren(zkPaths.customEvtsDir),
             -1);
+
+        zkClient.deleteAll(zkPaths.customEvtsAcksDir,
+            zkClient.getChildren(zkPaths.customEvtsDir),
+            -1);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6cb8c06f/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
deleted file mode 100644
index d579c08..0000000
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ /dev/null
@@ -1,911 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.curator.test.TestingCluster;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.IgnitionEx;
-import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteCallable;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
-import org.apache.zookeeper.ZooKeeper;
-
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static 
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD;
-import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
-
-/**
- *
- */
-public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
-    /** */
-    private TestingCluster zkCluster;
-
-    /** */
-    private static final boolean USE_TEST_CLUSTER = true;
-
-    /** */
-    private boolean client;
-
-    /** */
-    private static ConcurrentHashMap<UUID, Map<Long, DiscoveryEvent>> evts = 
new ConcurrentHashMap<>();
-
-    /** */
-    private static volatile boolean err;
-
-    /** */
-    private boolean testSockNio;
-
-    /** */
-    private int sesTimeout;
-
-    /** */
-    private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new 
ConcurrentHashMap<>();
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
-        if (testSockNio)
-            System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, 
ZkTestClientCnxnSocketNIO.class.getName());
-
-        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
-        cfg.setConsistentId(igniteInstanceName);
-
-        ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
-
-        zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000);
-
-        spis.put(igniteInstanceName, zkSpi);
-
-        if (USE_TEST_CLUSTER) {
-            assert zkCluster != null;
-
-            zkSpi.setZkConnectionString(zkCluster.getConnectString());
-        }
-        else
-            zkSpi.setZkConnectionString("localhost:2181");
-
-        cfg.setDiscoverySpi(zkSpi);
-
-        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
-
-        
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-
-        cfg.setCacheConfiguration(ccfg);
-
-        // cfg.setMarshaller(new JdkMarshaller());
-
-        cfg.setClientMode(client);
-
-        Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>();
-
-        lsnrs.put(new IgnitePredicate<Event>() {
-            /** */
-            @IgniteInstanceResource
-            private Ignite ignite;
-
-            @Override public boolean apply(Event evt) {
-                try {
-                    DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt;
-
-                    UUID locId = ignite.cluster().localNode().id();
-
-                    Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId);
-
-                    if (nodeEvts == null) {
-                        Object old = evts.put(locId, nodeEvts = new 
TreeMap<>());
-
-                        assertNull(old);
-
-                        synchronized (nodeEvts) {
-                            DiscoveryLocalJoinData locJoin = 
((IgniteKernal)ignite).context().discovery().localJoin();
-
-                            nodeEvts.put(locJoin.event().topologyVersion(), 
locJoin.event());
-                        }
-                    }
-
-                    synchronized (nodeEvts) {
-                        DiscoveryEvent old = 
nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt);
-
-                        assertNull(old);
-                    }
-                }
-                catch (Throwable e) {
-                    err = true;
-
-                    info("Unexpected error: " + e);
-                }
-
-                return true;
-            }
-        }, new int[]{EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT});
-
-        cfg.setLocalEventListeners(lsnrs);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1");
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        System.clearProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD);
-
-        super.afterTestsStopped();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTest();
-
-        if (USE_TEST_CLUSTER) {
-            zkCluster = new TestingCluster(1);
-            zkCluster.start();
-        }
-
-        reset();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        if (zkCluster != null) {
-            try {
-                zkCluster.close();
-            }
-            catch (Exception e) {
-                U.error(log, "Failed to stop Zookeeper client: " + e, e);
-            }
-
-            zkCluster = null;
-        }
-
-        super.afterTest();
-
-        try {
-            assertFalse("Unexpected error, see log for details", err);
-
-            checkEventsConsistency();
-        }
-        finally {
-            reset();
-        }
-
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStopNode_1() throws Exception {
-        startGrids(5);
-
-        waitForTopology(5);
-
-        stopGrid(3);
-
-        waitForTopology(4);
-
-        startGrid(3);
-
-        waitForTopology(5);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCustomEventsSimple1_SingleNode() throws Exception {
-        Ignite srv0 = startGrid(0);
-
-        srv0.createCache(new CacheConfiguration<>("c1"));
-
-        waitForEventsAcks(srv0);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCustomEventsSimple1_5_Nodes() throws Exception {
-        Ignite srv0 = startGrids(5);
-
-        srv0.createCache(new CacheConfiguration<>("c1"));
-
-        awaitPartitionMapExchange();
-
-        waitForEventsAcks(srv0);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSegmentation1() throws Exception {
-        sesTimeout = 1000;
-        testSockNio = true;
-
-        Ignite node0 = startGrid(0);
-
-        final CountDownLatch l = new CountDownLatch(1);
-
-        node0.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event event) {
-                l.countDown();
-
-                return false;
-            }
-        }, EventType.EVT_NODE_SEGMENTED);
-
-        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
-
-        c0.closeSocket(true);
-
-        for (int i = 0; i < 10; i++) {
-            Thread.sleep(1_000);
-
-            if (l.getCount() == 0)
-                break;
-        }
-
-        info("Allow connect");
-
-        c0.allowConnect();
-
-        assertTrue(l.await(10, TimeUnit.SECONDS));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConnectionRestore1() throws Exception {
-        testSockNio = true;
-
-        Ignite node0 = startGrid(0);
-
-        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
-
-        c0.closeSocket(false);
-
-        startGrid(1);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConnectionRestore2() throws Exception {
-        testSockNio = true;
-
-        Ignite node0 = startGrid(0);
-
-        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
-
-        c0.closeSocket(false);
-
-        startGridsMultiThreaded(1, 5);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConnectionRestore_NonCoordinator1() throws Exception {
-        connectionRestore_NonCoordinator(false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConnectionRestore_NonCoordinator2() throws Exception {
-        connectionRestore_NonCoordinator(true);
-    }
-
-    /**
-     * @param failWhenDisconnected {@code True} if fail node while another 
node is disconnected.
-     * @throws Exception If failed.
-     */
-    private void connectionRestore_NonCoordinator(boolean 
failWhenDisconnected) throws Exception {
-        testSockNio = true;
-
-        Ignite node0 = startGrid(0);
-        Ignite node1 = startGrid(1);
-
-        ZkTestClientCnxnSocketNIO c1 = 
ZkTestClientCnxnSocketNIO.forNode(node1);
-
-        c1.closeSocket(true);
-
-        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Void>() {
-            @Override public Void call() throws Exception {
-                try {
-                    startGrid(2);
-                }
-                catch (Exception e) {
-                    info("Start error: " + e);
-                }
-
-                return null;
-            }
-        }, "start-node");
-
-        checkEvents(node0, joinEvent(3));
-
-        if (failWhenDisconnected) {
-            ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2));
-
-            closeZkClient(spi);
-
-            checkEvents(node0, failEvent(4));
-        }
-
-        c1.allowConnect();
-
-        checkEvents(ignite(1), joinEvent(3));
-
-        if (failWhenDisconnected) {
-            checkEvents(ignite(1), failEvent(4));
-
-            IgnitionEx.stop(getTestIgniteInstanceName(2), true, true);
-        }
-
-        fut.get();
-
-        waitForTopology(failWhenDisconnected ? 2 : 3);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConnectionRestore_Coordinator1() throws Exception {
-        connectionRestore_Coordinator(1, 1, 0);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConnectionRestore_Coordinator1_1() throws Exception {
-        connectionRestore_Coordinator(1, 1, 1);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConnectionRestore_Coordinator2() throws Exception {
-        connectionRestore_Coordinator(1, 3, 0);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConnectionRestore_Coordinator3() throws Exception {
-        connectionRestore_Coordinator(3, 3, 0);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConnectionRestore_Coordinator4() throws Exception {
-        connectionRestore_Coordinator(3, 3, 1);
-    }
-
-    /**
-     * @param initNodes Number of initially started nodes.
-     * @param startNodes Number of nodes to start after coordinator loose 
connection.
-     * @param failCnt Number of nodes to stop after coordinator loose 
connection.
-     * @throws Exception If failed.
-     */
-    private void connectionRestore_Coordinator(int initNodes, int startNodes, 
int failCnt) throws Exception {
-        sesTimeout = 30_000;
-        testSockNio = true;
-
-        Ignite node0 = startGrids(initNodes);
-
-        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
-
-        c0.closeSocket(true);
-
-        final AtomicInteger nodeIdx = new AtomicInteger(initNodes);
-
-        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
-            @Override public Void call() throws Exception {
-                try {
-                    startGrid(nodeIdx.getAndIncrement());
-                }
-                catch (Exception e) {
-                    error("Start failed: " + e);
-                }
-
-                return null;
-            }
-        }, startNodes, "start-node");
-
-        int cnt = 0;
-
-        DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes - failCnt];
-
-        int expEvtCnt = 0;
-
-        sesTimeout = 1000;
-
-        List<ZkTestClientCnxnSocketNIO> blockedC = new ArrayList<>();
-
-        for (int i = initNodes; i < initNodes + startNodes; i++) {
-            ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i));
-
-            ZookeeperDiscoveryImpl impl = GridTestUtils.getFieldValue(spi, 
"impl");
-
-            impl.waitConnectStart();
-
-            if (cnt++ < failCnt) {
-                ZkTestClientCnxnSocketNIO c = 
ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i));
-
-                c.closeSocket(true);
-
-                blockedC.add(c);
-            }
-            else {
-                expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1);
-
-                expEvtCnt++;
-            }
-        }
-
-        Thread.sleep(5000);
-
-        c0.allowConnect();
-
-        for (ZkTestClientCnxnSocketNIO c : blockedC)
-            c.allowConnect();
-
-        if (expEvts.length > 0) {
-            for (int i = 0; i < initNodes; i++)
-                checkEvents(ignite(i), expEvts);
-        }
-
-        fut.get();
-
-        waitForTopology(initNodes + startNodes - failCnt);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testClusterRestart() throws Exception {
-        startGridsMultiThreaded(3, false);
-
-        stopAllGrids();
-
-        evts.clear();
-
-        startGridsMultiThreaded(3, false);
-
-        waitForTopology(3);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConnectionRestore4() throws Exception {
-        testSockNio = true;
-
-        Ignite node0 = startGrid(0);
-
-        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
-
-        c0.closeSocket(false);
-
-        startGrid(1);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStartStop_1_Node() throws Exception {
-        startGrid(0);
-
-        waitForTopology(1);
-
-        stopGrid(0);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testRestarts_2_Nodes() throws Exception {
-        startGrid(0);
-
-        for (int i = 0; i < 10; i++) {
-            info("Iteration: " + i);
-
-            startGrid(1);
-
-            waitForTopology(2);
-
-            stopGrid(1);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStartStop_2_Nodes_WithCache() throws Exception {
-        startGrids(2);
-
-        for (Ignite node : G.allGrids()) {
-            IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
-
-            assertNotNull(cache);
-
-            for (int i = 0; i < 100; i++) {
-                cache.put(i, node.name());
-
-                assertEquals(node.name(), cache.get(i));
-            }
-        }
-
-        awaitPartitionMapExchange();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStartStop_2_Nodes() throws Exception {
-        startGrid(0);
-
-        waitForTopology(1);
-
-        startGrid(1);
-
-        waitForTopology(2);
-
-        for (Ignite node : G.allGrids())
-            node.compute().broadcast(new DummyCallable(null));
-
-        awaitPartitionMapExchange();
-
-        waitForEventsAcks(ignite(0));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStartStop1() throws Exception {
-        startGridsMultiThreaded(5, false);
-
-        waitForTopology(5);
-
-        awaitPartitionMapExchange();
-
-        waitForEventsAcks(ignite(0));
-
-        stopGrid(0);
-
-        waitForTopology(4);
-
-        for (Ignite node : G.allGrids())
-            node.compute().broadcast(new DummyCallable(null));
-
-        startGrid(0);
-
-        waitForTopology(5);
-
-        awaitPartitionMapExchange();
-
-        waitForEventsAcks(grid(CU.oldest(ignite(1).cluster().nodes())));
-    }
-
-    /**
-     * @param node Node.
-     * @throws Exception If failed.
-     */
-    private void waitForEventsAcks(final Ignite node) throws Exception {
-        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                Map<Object, Object> evts = 
GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(),
-                    "impl", "evtsData", "evts");
-
-                if (!evts.isEmpty()) {
-                    info("Unacked events: " + evts);
-
-                    return false;
-                }
-
-                return true;
-            }
-        }, 10_000));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStartStop2() throws Exception {
-        startGridsMultiThreaded(10, false);
-
-        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
-            @Override public void apply(Integer idx) {
-                stopGrid(idx);
-            }
-        }, 3, "stop-node-thread");
-
-        waitForTopology(7);
-
-        startGridsMultiThreaded(0, 3);
-
-        waitForTopology(10);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStartStopWithClients() throws Exception {
-        final int SRVS = 3;
-
-        startGrids(SRVS);
-
-        client = true;
-
-        final int THREADS = 30;
-
-        for (int i = 0; i < 5; i++) {
-            info("Iteration: " + i);
-
-            startGridsMultiThreaded(SRVS, THREADS);
-
-            waitForTopology(SRVS + THREADS);
-
-            GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
-                @Override public void apply(Integer idx) {
-                    stopGrid(idx + SRVS);
-                }
-            }, THREADS, "stop-node");
-
-            waitForTopology(SRVS);
-
-            checkEventsConsistency();
-        }
-    }
-
-    /**
-     *
-     */
-    private void reset() {
-        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
-
-        ZkTestClientCnxnSocketNIO.reset();
-
-        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
-
-        err = false;
-
-        evts.clear();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkEventsConsistency() throws Exception {
-        for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : 
evts.entrySet()) {
-            UUID nodeId = nodeEvtEntry.getKey();
-            Map<Long, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue();
-
-            for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry0 : 
evts.entrySet()) {
-                if (!nodeId.equals(nodeEvtEntry0.getKey())) {
-                    Map<Long, DiscoveryEvent> nodeEvts0 = 
nodeEvtEntry0.getValue();
-
-                    synchronized (nodeEvts) {
-                        synchronized (nodeEvts0) {
-                            checkEventsConsistency(nodeEvts, nodeEvts0);
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * @param evts1 Received events.
-     * @param evts2 Received events.
-     */
-    private void checkEventsConsistency(Map<Long, DiscoveryEvent> evts1, 
Map<Long, DiscoveryEvent> evts2) {
-        for (Map.Entry<Long, DiscoveryEvent> e1 : evts1.entrySet()) {
-            DiscoveryEvent evt1 = e1.getValue();
-            DiscoveryEvent evt2 = evts2.get(e1.getKey());
-
-            if (evt2 != null) {
-                assertEquals(evt1.topologyVersion(), evt2.topologyVersion());
-                assertEquals(evt1.eventNode(), evt2.eventNode());
-                assertEquals(evt1.topologyNodes(), evt2.topologyNodes());
-            }
-        }
-    }
-
-    /**
-     * @param nodeName Node name.
-     * @return Node's discovery SPI.
-     * @throws Exception If failed.
-     */
-    private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws 
Exception {
-        GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                return spis.contains(nodeName);
-            }
-        }, 5000);
-
-        ZookeeperDiscoverySpi spi = spis.get(nodeName);
-
-        assertNotNull("Failed to get SPI for node: " + nodeName, spi);
-
-        return spi;
-    }
-
-    private static DiscoveryEvent joinEvent(long topVer) {
-        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_JOINED, null);
-
-        expEvt.topologySnapshot(topVer, null);
-
-        return expEvt;
-    }
-
-    private static DiscoveryEvent failEvent(long topVer) {
-        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_FAILED, null);
-
-        expEvt.topologySnapshot(topVer, null);
-
-        return expEvt;
-    }
-
-    /**
-     * @param node Node.
-     * @param expEvts Expected events.
-     * @throws Exception If fialed.
-     */
-    private void checkEvents(final Ignite node, final 
DiscoveryEvent...expEvts) throws Exception {
-        checkEvents(node.cluster().localNode().id(), expEvts);
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param expEvts Expected events.
-     * @throws Exception If failed.
-     */
-    private void checkEvents(final UUID nodeId, final 
DiscoveryEvent...expEvts) throws Exception {
-        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId);
-
-                if (nodeEvts == null) {
-                    info("No events for node: " + nodeId);
-
-                    return false;
-                }
-
-                synchronized (nodeEvts) {
-                    for (DiscoveryEvent expEvt : expEvts) {
-                        DiscoveryEvent evt0 = 
nodeEvts.get(expEvt.topologyVersion());
-
-                        if (evt0 == null) {
-                            info("No event for version: " + 
expEvt.topologyVersion());
-
-                            return false;
-                        }
-
-                        assertEquals(expEvt.type(), evt0.type());
-                    }
-                }
-
-                return true;
-            }
-        }, 10000));
-    }
-
-    /**
-     * @param spi Spi instance.
-     */
-    private void closeZkClient(ZookeeperDiscoverySpi spi) {
-        ZooKeeper zk = GridTestUtils.getFieldValue(spi, "impl", "zkClient", 
"zk");
-
-        try {
-            zk.close();
-        }
-        catch (Exception e) {
-            fail("Unexpected error: " + e);
-        }
-    }
-
-    /**
-     * @param expSize Expected nodes number.
-     * @throws Exception If failed.
-     */
-    private void waitForTopology(final int expSize) throws Exception {
-        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                List<Ignite> nodes = G.allGrids();
-
-                if (nodes.size() != expSize) {
-                    info("Wait all nodes [size=" + nodes.size() + ", exp=" + 
expSize + ']');
-
-                    return false;
-                }
-
-                for (Ignite node: nodes) {
-                    int sizeOnNode = node.cluster().nodes().size();
-
-                    if (sizeOnNode != expSize) {
-                        info("Wait for size on node [node=" + node.name() + ", 
size=" + sizeOnNode + ", exp=" + expSize + ']');
-
-                        return false;
-                    }
-                }
-
-                return true;
-            }
-        }, 5000));
-    }
-
-    /**
-     *
-     */
-    private static class DummyCallable implements IgniteCallable<Object> {
-        /** */
-        private byte[] data;
-
-        /**
-         * @param data Data.
-         */
-        DummyCallable(byte[] data) {
-            this.data = data;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object call() throws Exception {
-            return data;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6cb8c06f/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
new file mode 100644
index 0000000..e9e8f68
--- /dev/null
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -0,0 +1,942 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.curator.test.TestingCluster;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
+import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClient;
+import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
+import org.apache.zookeeper.ZooKeeper;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD;
+import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ *
+ */
+public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
+    /** */
+    private TestingCluster zkCluster;
+
+    /** */
+    private static final boolean USE_TEST_CLUSTER = true;
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static ConcurrentHashMap<UUID, Map<Long, DiscoveryEvent>> evts = 
new ConcurrentHashMap<>();
+
+    /** */
+    private static volatile boolean err;
+
+    /** */
+    private boolean testSockNio;
+
+    /** */
+    private int sesTimeout;
+
+    /** */
+    private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new 
ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        if (testSockNio)
+            System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, 
ZkTestClientCnxnSocketNIO.class.getName());
+
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
+
+        zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000);
+
+        spis.put(igniteInstanceName, zkSpi);
+
+        if (USE_TEST_CLUSTER) {
+            assert zkCluster != null;
+
+            zkSpi.setZkConnectionString(zkCluster.getConnectString());
+        }
+        else
+            zkSpi.setZkConnectionString("localhost:2181");
+
+        cfg.setDiscoverySpi(zkSpi);
+
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        // cfg.setMarshaller(new JdkMarshaller());
+
+        cfg.setClientMode(client);
+
+        Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>();
+
+        lsnrs.put(new IgnitePredicate<Event>() {
+            /** */
+            @IgniteInstanceResource
+            private Ignite ignite;
+
+            @Override public boolean apply(Event evt) {
+                try {
+                    DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt;
+
+                    UUID locId = ignite.cluster().localNode().id();
+
+                    Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId);
+
+                    if (nodeEvts == null) {
+                        Object old = evts.put(locId, nodeEvts = new 
TreeMap<>());
+
+                        assertNull(old);
+
+                        synchronized (nodeEvts) {
+                            DiscoveryLocalJoinData locJoin = 
((IgniteKernal)ignite).context().discovery().localJoin();
+
+                            nodeEvts.put(locJoin.event().topologyVersion(), 
locJoin.event());
+                        }
+                    }
+
+                    synchronized (nodeEvts) {
+                        DiscoveryEvent old = 
nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt);
+
+                        assertNull(old);
+                    }
+                }
+                catch (Throwable e) {
+                    err = true;
+
+                    info("Unexpected error: " + e);
+                }
+
+                return true;
+            }
+        }, new int[]{EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT});
+
+        cfg.setLocalEventListeners(lsnrs);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        System.clearProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD);
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        if (USE_TEST_CLUSTER) {
+            zkCluster = new TestingCluster(1);
+            zkCluster.start();
+        }
+
+        reset();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        if (zkCluster != null) {
+            try {
+                zkCluster.close();
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to stop Zookeeper client: " + e, e);
+            }
+
+            zkCluster = null;
+        }
+
+        super.afterTest();
+
+        try {
+            assertFalse("Unexpected error, see log for details", err);
+
+            checkEventsConsistency();
+        }
+        finally {
+            reset();
+        }
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStopNode_1() throws Exception {
+        startGrids(5);
+
+        waitForTopology(5);
+
+        stopGrid(3);
+
+        waitForTopology(4);
+
+        startGrid(3);
+
+        waitForTopology(5);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomEventsSimple1_SingleNode() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        srv0.createCache(new CacheConfiguration<>("c1"));
+
+        waitForEventsAcks(srv0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomEventsSimple1_5_Nodes() throws Exception {
+        Ignite srv0 = startGrids(5);
+
+        srv0.createCache(new CacheConfiguration<>("c1"));
+
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(srv0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSegmentation1() throws Exception {
+        sesTimeout = 1000;
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        final CountDownLatch l = new CountDownLatch(1);
+
+        node0.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event event) {
+                l.countDown();
+
+                return false;
+            }
+        }, EventType.EVT_NODE_SEGMENTED);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(true);
+
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(1_000);
+
+            if (l.getCount() == 0)
+                break;
+        }
+
+        info("Allow connect");
+
+        c0.allowConnect();
+
+        assertTrue(l.await(10, TimeUnit.SECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore1() throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(false);
+
+        startGrid(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore2() throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(false);
+
+        startGridsMultiThreaded(1, 5);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_NonCoordinator1() throws Exception {
+        connectionRestore_NonCoordinator(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_NonCoordinator2() throws Exception {
+        connectionRestore_NonCoordinator(true);
+    }
+
+    /**
+     * @param failWhenDisconnected {@code True} if fail node while another 
node is disconnected.
+     * @throws Exception If failed.
+     */
+    private void connectionRestore_NonCoordinator(boolean 
failWhenDisconnected) throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+        Ignite node1 = startGrid(1);
+
+        ZkTestClientCnxnSocketNIO c1 = 
ZkTestClientCnxnSocketNIO.forNode(node1);
+
+        c1.closeSocket(true);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                try {
+                    startGrid(2);
+                }
+                catch (Exception e) {
+                    info("Start error: " + e);
+                }
+
+                return null;
+            }
+        }, "start-node");
+
+        checkEvents(node0, joinEvent(3));
+
+        if (failWhenDisconnected) {
+            ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2));
+
+            closeZkClient(spi);
+
+            checkEvents(node0, failEvent(4));
+        }
+
+        c1.allowConnect();
+
+        checkEvents(ignite(1), joinEvent(3));
+
+        if (failWhenDisconnected) {
+            checkEvents(ignite(1), failEvent(4));
+
+            IgnitionEx.stop(getTestIgniteInstanceName(2), true, true);
+        }
+
+        fut.get();
+
+        waitForTopology(failWhenDisconnected ? 2 : 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator1() throws Exception {
+        connectionRestore_Coordinator(1, 1, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator1_1() throws Exception {
+        connectionRestore_Coordinator(1, 1, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator2() throws Exception {
+        connectionRestore_Coordinator(1, 3, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator3() throws Exception {
+        connectionRestore_Coordinator(3, 3, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator4() throws Exception {
+        connectionRestore_Coordinator(3, 3, 1);
+    }
+
+    /**
+     * @param initNodes Number of initially started nodes.
+     * @param startNodes Number of nodes to start after coordinator loose 
connection.
+     * @param failCnt Number of nodes to stop after coordinator loose 
connection.
+     * @throws Exception If failed.
+     */
+    private void connectionRestore_Coordinator(int initNodes, int startNodes, 
int failCnt) throws Exception {
+        sesTimeout = 30_000;
+        testSockNio = true;
+
+        Ignite node0 = startGrids(initNodes);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(true);
+
+        final AtomicInteger nodeIdx = new AtomicInteger(initNodes);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                try {
+                    startGrid(nodeIdx.getAndIncrement());
+                }
+                catch (Exception e) {
+                    error("Start failed: " + e);
+                }
+
+                return null;
+            }
+        }, startNodes, "start-node");
+
+        int cnt = 0;
+
+        DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes - failCnt];
+
+        int expEvtCnt = 0;
+
+        sesTimeout = 1000;
+
+        List<ZkTestClientCnxnSocketNIO> blockedC = new ArrayList<>();
+
+        final List<String> failedZkNodes = new ArrayList<>(failCnt);
+
+        for (int i = initNodes; i < initNodes + startNodes; i++) {
+            ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i));
+
+            ZookeeperDiscoveryImpl impl = GridTestUtils.getFieldValue(spi, 
"impl");
+
+            impl.waitConnectStart();
+
+            if (cnt++ < failCnt) {
+                ZkTestClientCnxnSocketNIO c = 
ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i));
+
+                c.closeSocket(true);
+
+                blockedC.add(c);
+
+                failedZkNodes.add((String)GridTestUtils.getFieldValue(impl, 
"locNodeZkPath"));
+            }
+            else {
+                expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1);
+
+                expEvtCnt++;
+            }
+        }
+
+        final ZookeeperClient zkClient = new ZookeeperClient(log, 
zkCluster.getConnectString(), 10_000, null);
+
+        try {
+            assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    try {
+                        List<String> c = 
zkClient.getChildren("/apacheIgnite/default/alive");
+
+                        for (String failedZkNode : failedZkNodes) {
+                            if (c.contains(failedZkNode))
+                                return false;
+                        }
+
+                        return true;
+                    }
+                    catch (Exception e) {
+                        fail();
+
+                        return true;
+                    }
+                }
+            }, 10_000));
+        }
+        finally {
+            zkClient.close();
+        }
+
+        c0.allowConnect();
+
+        for (ZkTestClientCnxnSocketNIO c : blockedC)
+            c.allowConnect();
+
+        if (expEvts.length > 0) {
+            for (int i = 0; i < initNodes; i++)
+                checkEvents(ignite(i), expEvts);
+        }
+
+        fut.get();
+
+        waitForTopology(initNodes + startNodes - failCnt);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClusterRestart() throws Exception {
+        startGridsMultiThreaded(3, false);
+
+        stopAllGrids();
+
+        evts.clear();
+
+        startGridsMultiThreaded(3, false);
+
+        waitForTopology(3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore4() throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(false);
+
+        startGrid(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop_1_Node() throws Exception {
+        startGrid(0);
+
+        waitForTopology(1);
+
+        stopGrid(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestarts_2_Nodes() throws Exception {
+        startGrid(0);
+
+        for (int i = 0; i < 10; i++) {
+            info("Iteration: " + i);
+
+            startGrid(1);
+
+            waitForTopology(2);
+
+            stopGrid(1);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop_2_Nodes_WithCache() throws Exception {
+        startGrids(2);
+
+        for (Ignite node : G.allGrids()) {
+            IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
+
+            assertNotNull(cache);
+
+            for (int i = 0; i < 100; i++) {
+                cache.put(i, node.name());
+
+                assertEquals(node.name(), cache.get(i));
+            }
+        }
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop_2_Nodes() throws Exception {
+        startGrid(0);
+
+        waitForTopology(1);
+
+        startGrid(1);
+
+        waitForTopology(2);
+
+        for (Ignite node : G.allGrids())
+            node.compute().broadcast(new DummyCallable(null));
+
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(ignite(0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop1() throws Exception {
+        startGridsMultiThreaded(5, false);
+
+        waitForTopology(5);
+
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(ignite(0));
+
+        stopGrid(0);
+
+        waitForTopology(4);
+
+        for (Ignite node : G.allGrids())
+            node.compute().broadcast(new DummyCallable(null));
+
+        startGrid(0);
+
+        waitForTopology(5);
+
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(grid(CU.oldest(ignite(1).cluster().nodes())));
+    }
+
+    /**
+     * @param node Node.
+     * @throws Exception If failed.
+     */
+    private void waitForEventsAcks(final Ignite node) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                Map<Object, Object> evts = 
GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(),
+                    "impl", "evtsData", "evts");
+
+                if (!evts.isEmpty()) {
+                    info("Unacked events: " + evts);
+
+                    return false;
+                }
+
+                return true;
+            }
+        }, 10_000));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop2() throws Exception {
+        startGridsMultiThreaded(10, false);
+
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            @Override public void apply(Integer idx) {
+                stopGrid(idx);
+            }
+        }, 3, "stop-node-thread");
+
+        waitForTopology(7);
+
+        startGridsMultiThreaded(0, 3);
+
+        waitForTopology(10);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStopWithClients() throws Exception {
+        final int SRVS = 3;
+
+        startGrids(SRVS);
+
+        client = true;
+
+        final int THREADS = 30;
+
+        for (int i = 0; i < 5; i++) {
+            info("Iteration: " + i);
+
+            startGridsMultiThreaded(SRVS, THREADS);
+
+            waitForTopology(SRVS + THREADS);
+
+            GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+                @Override public void apply(Integer idx) {
+                    stopGrid(idx + SRVS);
+                }
+            }, THREADS, "stop-node");
+
+            waitForTopology(SRVS);
+
+            checkEventsConsistency();
+        }
+    }
+
+    /**
+     *
+     */
+    private void reset() {
+        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+
+        ZkTestClientCnxnSocketNIO.reset();
+
+        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+
+        err = false;
+
+        evts.clear();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkEventsConsistency() throws Exception {
+        for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : 
evts.entrySet()) {
+            UUID nodeId = nodeEvtEntry.getKey();
+            Map<Long, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue();
+
+            for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry0 : 
evts.entrySet()) {
+                if (!nodeId.equals(nodeEvtEntry0.getKey())) {
+                    Map<Long, DiscoveryEvent> nodeEvts0 = 
nodeEvtEntry0.getValue();
+
+                    synchronized (nodeEvts) {
+                        synchronized (nodeEvts0) {
+                            checkEventsConsistency(nodeEvts, nodeEvts0);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param evts1 Received events.
+     * @param evts2 Received events.
+     */
+    private void checkEventsConsistency(Map<Long, DiscoveryEvent> evts1, 
Map<Long, DiscoveryEvent> evts2) {
+        for (Map.Entry<Long, DiscoveryEvent> e1 : evts1.entrySet()) {
+            DiscoveryEvent evt1 = e1.getValue();
+            DiscoveryEvent evt2 = evts2.get(e1.getKey());
+
+            if (evt2 != null) {
+                assertEquals(evt1.topologyVersion(), evt2.topologyVersion());
+                assertEquals(evt1.eventNode(), evt2.eventNode());
+                assertEquals(evt1.topologyNodes(), evt2.topologyNodes());
+            }
+        }
+    }
+
+    /**
+     * @param nodeName Node name.
+     * @return Node's discovery SPI.
+     * @throws Exception If failed.
+     */
+    private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws 
Exception {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return spis.contains(nodeName);
+            }
+        }, 5000);
+
+        ZookeeperDiscoverySpi spi = spis.get(nodeName);
+
+        assertNotNull("Failed to get SPI for node: " + nodeName, spi);
+
+        return spi;
+    }
+
+    private static DiscoveryEvent joinEvent(long topVer) {
+        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_JOINED, null);
+
+        expEvt.topologySnapshot(topVer, null);
+
+        return expEvt;
+    }
+
+    private static DiscoveryEvent failEvent(long topVer) {
+        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_FAILED, null);
+
+        expEvt.topologySnapshot(topVer, null);
+
+        return expEvt;
+    }
+
+    /**
+     * @param node Node.
+     * @param expEvts Expected events.
+     * @throws Exception If fialed.
+     */
+    private void checkEvents(final Ignite node, final 
DiscoveryEvent...expEvts) throws Exception {
+        checkEvents(node.cluster().localNode().id(), expEvts);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param expEvts Expected events.
+     * @throws Exception If failed.
+     */
+    private void checkEvents(final UUID nodeId, final 
DiscoveryEvent...expEvts) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId);
+
+                if (nodeEvts == null) {
+                    info("No events for node: " + nodeId);
+
+                    return false;
+                }
+
+                synchronized (nodeEvts) {
+                    for (DiscoveryEvent expEvt : expEvts) {
+                        DiscoveryEvent evt0 = 
nodeEvts.get(expEvt.topologyVersion());
+
+                        if (evt0 == null) {
+                            info("No event for version: " + 
expEvt.topologyVersion());
+
+                            return false;
+                        }
+
+                        assertEquals(expEvt.type(), evt0.type());
+                    }
+                }
+
+                return true;
+            }
+        }, 10000));
+    }
+
+    /**
+     * @param spi Spi instance.
+     */
+    private void closeZkClient(ZookeeperDiscoverySpi spi) {
+        ZooKeeper zk = GridTestUtils.getFieldValue(spi, "impl", "zkClient", 
"zk");
+
+        try {
+            zk.close();
+        }
+        catch (Exception e) {
+            fail("Unexpected error: " + e);
+        }
+    }
+
+    /**
+     * @param expSize Expected nodes number.
+     * @throws Exception If failed.
+     */
+    private void waitForTopology(final int expSize) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                List<Ignite> nodes = G.allGrids();
+
+                if (nodes.size() != expSize) {
+                    info("Wait all nodes [size=" + nodes.size() + ", exp=" + 
expSize + ']');
+
+                    return false;
+                }
+
+                for (Ignite node: nodes) {
+                    int sizeOnNode = node.cluster().nodes().size();
+
+                    if (sizeOnNode != expSize) {
+                        info("Wait for size on node [node=" + node.name() + ", 
size=" + sizeOnNode + ", exp=" + expSize + ']');
+
+                        return false;
+                    }
+                }
+
+                return true;
+            }
+        }, 5000));
+    }
+
+    /**
+     *
+     */
+    private static class DummyCallable implements IgniteCallable<Object> {
+        /** */
+        private byte[] data;
+
+        /**
+         * @param data Data.
+         */
+        DummyCallable(byte[] data) {
+            this.data = data;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            return data;
+        }
+    }
+}

Reply via email to