Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 804c84171 -> 42813c8b0


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/42813c8b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/42813c8b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/42813c8b

Branch: refs/heads/ignite-zk
Commit: 42813c8b0bca4e8cf1074ba6cbeff1a14247fbd3
Parents: 804c841
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Nov 13 18:08:46 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Nov 13 18:08:46 2017 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 171 +++++++++------
 .../zk/ZookeeperDiscoverySpiBasicTest.java      | 209 ++++++++++++++++++-
 .../zookeeper/ZkTestClientCnxnSocketNIO.java    | 123 +++++++++++
 3 files changed, 431 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/42813c8b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 4059b0b..5660949 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -112,6 +112,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
     private ZooKeeper zk;
 
     /** */
+    private CuratorFramework zkCurator;
+
+    /** */
     private int sesTimeout = 5000;
 
     /** */
@@ -319,6 +322,14 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
             log.debug("Local node initialized: " + locNode);
     }
 
+    private boolean igniteClusterStarted() throws Exception {
+        boolean started = zkCurator.checkExists().forPath(IGNITE_PATH) != null 
&&
+            zkCurator.checkExists().forPath(ALIVE_NODES_PATH) != null &&
+            !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty();
+
+        return started;
+    }
+
     /** {@inheritDoc} */
     @Override public void spiStart(@Nullable String igniteInstanceName) throws 
IgniteSpiException {
         try {
@@ -333,14 +344,12 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
             // ZK generates internal threads' names using current thread name.
             Thread.currentThread().setName("zk-" + igniteInstanceName);
 
-            CuratorFramework c;
-
             try {
-                c = CuratorFrameworkFactory.newClient(connectString, 
sesTimeout, sesTimeout, new RetryForever(500));
+                zkCurator = CuratorFrameworkFactory.newClient(connectString, 
sesTimeout, sesTimeout, new RetryForever(500));
 
-                c.start();
+                zkCurator.start();
 
-                zk = c.getZookeeperClient().getZooKeeper();
+                zk = zkCurator.getZookeeperClient().getZooKeeper();
                 // zk = new ZooKeeper(connectString, sesTimeout, zkWatcher);
             }
             finally {
@@ -348,28 +357,24 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
             }
 
             for (;;) {
-                boolean started = zk.exists(IGNITE_PATH, false) != null &&
-                    zk.exists(ALIVE_NODES_PATH, false) != null &&
-                    !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty();
+                boolean started = igniteClusterStarted();
 
                 if (!started) {
-                    InterProcessMutex mux = new InterProcessMutex(c, 
IGNITE_INIT_LOCK_PATH);
+                    InterProcessMutex mux = new InterProcessMutex(zkCurator, 
IGNITE_INIT_LOCK_PATH);
 
                     mux.acquire();
 
                     try {
-                        started = zk.exists(IGNITE_PATH, false) != null &&
-                            zk.exists(ALIVE_NODES_PATH, false) != null &&
-                            !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty();
+                        started = igniteClusterStarted();
 
                         if (!started) {
                             log.info("First node starts, reset ZK state");
 
-                            if (zk.exists(IGNITE_PATH, false) != null)
-                                ZKUtil.deleteRecursive(zk, IGNITE_PATH);
+                            if (zkCurator.checkExists().forPath(IGNITE_PATH) 
!= null)
+                                
zkCurator.delete().deletingChildrenIfNeeded().forPath(IGNITE_PATH);
 
                             // TODO ZK: properly handle first node start and 
init after full cluster restart.
-                            if (zk.exists(IGNITE_PATH, false) == null) {
+                            if (zkCurator.checkExists().forPath(IGNITE_PATH) 
== null) {
                                 log.info("Initialize Zookeeper nodes.");
 
                                 List<Op> initOps = new ArrayList<>();
@@ -401,8 +406,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
             gridStartTime = clusterData.gridStartTime;
 
             zk.getData(EVENTS_PATH, zkWatcher, dataUpdateCallback, null);
-            zk.getChildren(ALIVE_NODES_PATH, zkWatcher, nodesUpdateCallback, 
null);
             zk.getChildren(JOIN_HIST_PATH, zkWatcher, nodesUpdateCallback, 
null);
+            zk.getChildren(ALIVE_NODES_PATH, zkWatcher, nodesUpdateCallback, 
null);
 
             List<Op> joinOps = new ArrayList<>();
 
@@ -435,6 +440,10 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
 
     /** {@inheritDoc} */
     @Override public void spiStop() throws IgniteSpiException {
+        closeZkClient();
+    }
+
+    private void closeZkClient() {
         if (zk != null) {
             try {
                 log.info("Close Zookeeper client.");
@@ -487,13 +496,17 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
         final UUID nodeId;
 
         /** */
+        final String zkPath;
+
+        /** */
         transient ZKJoiningNodeData joinData;
 
         /**
          * @param order Node order.
          * @param nodeId Node ID.
          */
-        ZKNodeData(long order, UUID nodeId) {
+        ZKNodeData(String zkPath, long order, UUID nodeId) {
+            this.zkPath = zkPath;
             this.order = order;
             this.nodeId = nodeId;
         }
@@ -550,7 +563,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
 
         int nodeOrder = Integer.parseInt(path.substring(ID_LEN + 1)) + 1;
 
-        return new ZKNodeData(nodeOrder, nodeId);
+        return new ZKNodeData(path, nodeOrder, nodeId);
     }
 
     /** */
@@ -562,71 +575,90 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
     /** */
     private ZKAliveNodes curAlive;
 
-    /**
-     *
-     */
-    class NodesUpdateCallback implements AsyncCallback.Children2Callback {
-        @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
-            if (children == null || children.isEmpty())
-                return;
+    private void readJoinNodeData(ZKNodeData data, String path) throws 
Exception {
+        //byte[] bytes = zk.getData(path, null, null);
+        byte[] bytes = zkCurator.getData().forPath(path);
 
-            if (path.equals(JOIN_HIST_PATH)) {
-                log.info("Join nodes changed [rc=" + rc +
-                    ", path=" + path +
-                    ", nodes=" + children +
-                    ", ver=" + (stat != null ? stat.getCversion() : null) + 
']');
+        assert bytes.length > 0;
 
-                for (String child : children) {
-                    ZKNodeData data = parseNodePath(child);
+        ZKJoiningNodeData joinData = unmarshal(bytes);
 
-                    if (joinHist.put(data.order, data) == null) {
-                        try {
-                            log.info("New joined node data: " + data);
+        assert joinData.node != null && joinData.joiningNodeData != null : 
joinData;
 
-                            byte[] bytes = zk.getData(path + "/" + child, 
null, null);
+        joinData.node.order(data.order);
 
-                            assert bytes.length > 0;
+        data.joinData = joinData;
+    }
+
+    /**
+     *
+     */
+    class NodesUpdateCallback implements AsyncCallback.Children2Callback {
+        @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
+            try {
+                if (children == null || children.isEmpty())
+                    return;
 
-                            ZKJoiningNodeData joinData = unmarshal(bytes);
+                if (path.equals(JOIN_HIST_PATH)) {
+                    log.info("Join nodes changed [rc=" + rc +
+                        ", path=" + path +
+                        ", nodes=" + children +
+                        ", ver=" + (stat != null ? stat.getCversion() : null) 
+ ']');
 
-                            assert joinData.node != null && 
joinData.joiningNodeData != null : joinData;
+                    for (String child : children) {
+                        ZKNodeData data = parseNodePath(child);
 
-                            joinData.node.order(data.order);
+                        if (joinHist.put(data.order, data) == null) {
+                            try {
+                                log.info("New joined node data: " + data);
 
-                            data.joinData = joinData;
-                        }
-                        catch (Exception e) {
-                            // TODO ZK
-                            U.error(log, "Failed to get node data: " + e, e);
+                                readJoinNodeData(data, path + "/" + child);
+                            }
+                            catch (Exception e) {
+                                // TODO ZK
+                                U.error(log, "Failed to get node data: " + e, 
e);
+                            }
                         }
                     }
                 }
-            }
-            else if (path.equals(ALIVE_NODES_PATH)) {
-                log.info("Alive nodes changed [rc=" + rc +
-                    ", path=" + path +
-                    ", nodes=" + children +
-                    ", ver=" + (stat != null ? stat.getCversion() : null) + 
']');
+                else if (path.equals(ALIVE_NODES_PATH)) {
+                    log.info("Alive nodes changed [rc=" + rc +
+                        ", path=" + path +
+                        ", nodes=" + children +
+                        ", ver=" + (stat != null ? stat.getCversion() : null) 
+ ']');
 
-                assert stat != null;
+                    assert stat != null;
 
-                TreeMap<Long, ZKNodeData> nodes = new TreeMap<>();
+                    TreeMap<Long, ZKNodeData> nodes = new TreeMap<>();
 
-                for (String child : children) {
-                    ZKNodeData data = parseNodePath(child);
+                    for (String child : children) {
+                        ZKNodeData data = parseNodePath(child);
 
-                    nodes.put(data.order, data);
-                }
+                        nodes.put(data.order, data);
+                    }
+
+                    ZKAliveNodes newAlive = new 
ZKAliveNodes(stat.getCversion(), nodes);
 
-                ZKAliveNodes newAlive = new ZKAliveNodes(stat.getCversion(), 
nodes);
+                    generateEvents(curAlive, newAlive);
 
-                generateEvents(curAlive, newAlive);
+                    curAlive = newAlive;
+                }
+            }
+            catch (Throwable e) {
+                log.info("Uncaught error: " + e);
 
-                curAlive = newAlive;
+                throw e;
             }
         }
     }
 
+    /**
+     * For testing only.
+     */
+    public void closeClient() {
+        closeZkClient();
+    }
+
     /** */
     private final TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>();
 
@@ -695,6 +727,21 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
 
                     ZKNodeData data = joinHist.get(joined.order);
 
+                    if (data == null) {
+                        try {
+                            readJoinNodeData(joined, JOIN_HIST_PATH + "/" + 
joined.zkPath);
+
+                            assert joined.joinData != null : joined;
+
+                            joinHist.put(joined.order, joined);
+
+                            data = joined;
+                        }
+                        catch (Exception e) {
+                            U.error(log, "Failed to read node data: " + e);
+                        }
+                    }
+
                     assert data != null : joined;
 
                     ZKJoiningNodeData joinData = data.joinData;
@@ -792,7 +839,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
         newEvents.ver = expVer + 1;
 
         try {
-            zk.setData(EVENTS_PATH, marshal(newEvents), expVer);
+            zkCurator.setData().forPath(EVENTS_PATH, marshal(newEvents));
+
+            // zk.setData(EVENTS_PATH, marshal(newEvents), expVer);
         }
         catch (Exception e) {
             U.error(log, "Events update error: " + e, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/42813c8b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index f32a7e6..3b049dc 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.curator.test.TestingCluster;
 import org.apache.ignite.Ignite;
@@ -31,10 +32,13 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -42,10 +46,12 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
 
 /**
  *
@@ -66,14 +72,25 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /** */
     private static volatile boolean err;
 
+    /** */
+    private boolean testSockNio;
+
+    /** */
+    private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new 
ConcurrentHashMap<>();
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        if (testSockNio)
+            System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, 
ZkTestClientCnxnSocketNIO.class.getName());
+
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
         cfg.setConsistentId(igniteInstanceName);
 
         ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
 
+        spis.put(igniteInstanceName, zkSpi);
+
         if (USE_TEST_CLUSTER) {
             assert zkCluster != null;
 
@@ -114,14 +131,18 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
                         assertNull(old);
 
-                        DiscoveryLocalJoinData locJoin = 
((IgniteKernal)ignite).context().discovery().localJoin();
+                        synchronized (nodeEvts) {
+                            DiscoveryLocalJoinData locJoin = 
((IgniteKernal)ignite).context().discovery().localJoin();
 
-                        nodeEvts.put(locJoin.event().topologyVersion(), 
locJoin.event());
+                            nodeEvts.put(locJoin.event().topologyVersion(), 
locJoin.event());
+                        }
                     }
 
-                    DiscoveryEvent old = 
nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt);
+                    synchronized (nodeEvts) {
+                        DiscoveryEvent old = 
nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt);
 
-                    assertNull(old);
+                        assertNull(old);
+                    }
                 }
                 catch (Throwable e) {
                     err = true;
@@ -142,30 +163,50 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
-        err = false;
-
-        evts.clear();
-
         if (USE_TEST_CLUSTER) {
             zkCluster = new TestingCluster(1);
             zkCluster.start();
         }
 
+        reset();
     }
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         if (zkCluster != null) {
-            zkCluster.close();
+            try {
+                zkCluster.close();
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to stop Zookeeper client: " + e, e);
+            }
 
             zkCluster = null;
         }
 
         super.afterTest();
 
-        assertFalse("Unexpected error, see log for details", err);
+        try {
+            assertFalse("Unexpected error, see log for details", err);
 
-        checkEventsConsistency();
+            checkEventsConsistency();
+        }
+        finally {
+            reset();
+        }
+    }
+
+    /**
+     *
+     */
+    private void reset() {
+        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+
+        ZkTestClientCnxnSocketNIO.reset();
+
+        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+
+        err = false;
 
         evts.clear();
     }
@@ -209,7 +250,149 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testConnectionRestore1() throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
 
+        c0.closeSocket(false);
+
+        startGrid(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore2() throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(false);
+
+        startGridsMultiThreaded(1, 5);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_NonCoordinator1() throws Exception {
+        connectionRestore_NonCoordinator(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_NonCoordinator2() throws Exception {
+        connectionRestore_NonCoordinator(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void connectionRestore_NonCoordinator(boolean 
failWhenDisconnected) throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+        Ignite node1 = startGrid(1);
+
+        ZkTestClientCnxnSocketNIO c1 = 
ZkTestClientCnxnSocketNIO.forNode(node1);
+
+        c1.closeSocket(true);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                startGrid(2);
+
+                return null;
+            }
+        }, "start-node");
+
+        checkEvents(node0.configuration().getNodeId(), joinEvent(3));
+
+        if (failWhenDisconnected) {
+            ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2));
+
+            spi.closeClient();
+
+            checkEvents(node0.configuration().getNodeId(), failEvent(4));
+        }
+
+        c1.allowConnect();
+
+        checkEvents(ignite(1).configuration().getNodeId(), joinEvent(3), 
failEvent(4));
+
+        if (!failWhenDisconnected)
+            fut.get();
+    }
+
+    private static DiscoveryEvent joinEvent(long topVer) {
+        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_JOINED, null);
+
+        expEvt.topologySnapshot(topVer, null);
+
+        return expEvt;
+    }
+
+    private static DiscoveryEvent failEvent(long topVer) {
+        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_FAILED, null);
+
+        expEvt.topologySnapshot(topVer, null);
+
+        return expEvt;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param expEvts Expected events.
+     * @throws Exception If fialed.
+     */
+    private void checkEvents(final UUID nodeId, final 
DiscoveryEvent...expEvts) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId);
+
+                if (nodeEvts == null) {
+                    info("No events for node: " + nodeId);
+
+                    return false;
+                }
+
+                synchronized (nodeEvts) {
+                    for (DiscoveryEvent expEvt : expEvts) {
+                        DiscoveryEvent evt0 = 
nodeEvts.get(expEvt.topologyVersion());
+
+                        if (evt0 == null) {
+                            info("No event for version: " + 
expEvt.topologyVersion());
+
+                            return false;
+                        }
+
+                        assertEquals(expEvt.type(), evt0.type());
+                    }
+                }
+
+                return true;
+            }
+        }, 10000));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore4() throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(false);
+
+        startGrid(1);
     }
 
     /**
@@ -291,6 +474,10 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
         for (Ignite node : G.allGrids())
             node.compute().broadcast(new DummyCallable(null));
+
+        startGrid(0);
+
+        waitForTopology(5);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/42813c8b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
new file mode 100644
index 0000000..4a11c68
--- /dev/null
+++ 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ *
+ */
+public class ZkTestClientCnxnSocketNIO extends ClientCnxnSocketNIO {
+    /** */
+    public static final IgniteLogger log = new 
JavaLogger().getLogger(ZkTestClientCnxnSocketNIO.class);
+
+    /** */
+    public volatile CountDownLatch blockConnectLatch;
+
+    /** */
+    public static ConcurrentHashMap<String, ZkTestClientCnxnSocketNIO> clients 
= new ConcurrentHashMap<>();
+
+    /** */
+    private final String nodeName;
+
+    /**
+     *
+     */
+    public static void reset() {
+        clients.clear();
+    }
+
+    /**
+     * @param node Node.
+     * @return ZK client.
+     */
+    public static ZkTestClientCnxnSocketNIO forNode(Ignite node) {
+        return clients.get(node.name());
+    }
+
+    /**
+     * @throws IOException If failed.
+     */
+    public ZkTestClientCnxnSocketNIO() throws IOException {
+        super();
+
+        String threadName = Thread.currentThread().getName();
+
+        nodeName = threadName.substring(threadName.indexOf('-') + 1);
+
+        log.info("ZkTestClientCnxnSocketNIO created for node: " + nodeName);
+
+        clients.put(nodeName, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override void connect(InetSocketAddress addr) throws IOException {
+        CountDownLatch blockConnect = this.blockConnectLatch;
+
+        log.info("ZkTestClientCnxnSocketNIO connect [node=" + nodeName + ", 
addr=" + addr + ']');
+
+        if (blockConnect != null && blockConnect.getCount() > 0) {
+            try {
+                log.info("ZkTestClientCnxnSocketNIO block connect");
+
+                blockConnect.await();
+
+                log.info("ZkTestClientCnxnSocketNIO finish block connect");
+            }
+            catch (Exception e) {
+                log.error("Error in ZkTestClientCnxnSocketNIO: " + e, e);
+            }
+        }
+
+        super.connect(addr);
+    }
+
+    /**
+     *
+     */
+    public void allowConnect() {
+        assert blockConnectLatch != null && blockConnectLatch.getCount() == 1;
+
+        log.info("ZkTestClientCnxnSocketNIO allowConnect [node=" + nodeName + 
']');
+
+        blockConnectLatch.countDown();
+    }
+
+    /**
+     * @param blockConnect {@code True} to block client reconnect.
+     * @throws Exception If failed.
+     */
+    public void closeSocket(boolean blockConnect) throws Exception {
+        if (blockConnect)
+            blockConnectLatch = new CountDownLatch(1);
+
+        log.info("ZkTestClientCnxnSocketNIO closeSocket [node=" + nodeName + 
", block=" + blockConnect + ']');
+
+        SelectionKey k = GridTestUtils.getFieldValue(this, 
ClientCnxnSocketNIO.class, "sockKey");
+
+        k.channel().close();
+    }
+}

Reply via email to