Repository: ignite Updated Branches: refs/heads/ignite-zk 804c84171 -> 42813c8b0
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/42813c8b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/42813c8b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/42813c8b Branch: refs/heads/ignite-zk Commit: 42813c8b0bca4e8cf1074ba6cbeff1a14247fbd3 Parents: 804c841 Author: sboikov <sboi...@gridgain.com> Authored: Mon Nov 13 18:08:46 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Nov 13 18:08:46 2017 +0300 ---------------------------------------------------------------------- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 171 +++++++++------ .../zk/ZookeeperDiscoverySpiBasicTest.java | 209 ++++++++++++++++++- .../zookeeper/ZkTestClientCnxnSocketNIO.java | 123 +++++++++++ 3 files changed, 431 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/42813c8b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 4059b0b..5660949 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -112,6 +112,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery private ZooKeeper zk; /** */ + private CuratorFramework zkCurator; + + /** */ private int sesTimeout = 5000; /** */ @@ -319,6 +322,14 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery log.debug("Local node initialized: " + locNode); } + private boolean igniteClusterStarted() throws Exception { + boolean started = zkCurator.checkExists().forPath(IGNITE_PATH) != null && + zkCurator.checkExists().forPath(ALIVE_NODES_PATH) != null && + !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty(); + + return started; + } + /** {@inheritDoc} */ @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException { try { @@ -333,14 +344,12 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery // ZK generates internal threads' names using current thread name. Thread.currentThread().setName("zk-" + igniteInstanceName); - CuratorFramework c; - try { - c = CuratorFrameworkFactory.newClient(connectString, sesTimeout, sesTimeout, new RetryForever(500)); + zkCurator = CuratorFrameworkFactory.newClient(connectString, sesTimeout, sesTimeout, new RetryForever(500)); - c.start(); + zkCurator.start(); - zk = c.getZookeeperClient().getZooKeeper(); + zk = zkCurator.getZookeeperClient().getZooKeeper(); // zk = new ZooKeeper(connectString, sesTimeout, zkWatcher); } finally { @@ -348,28 +357,24 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery } for (;;) { - boolean started = zk.exists(IGNITE_PATH, false) != null && - zk.exists(ALIVE_NODES_PATH, false) != null && - !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty(); + boolean started = igniteClusterStarted(); if (!started) { - InterProcessMutex mux = new InterProcessMutex(c, IGNITE_INIT_LOCK_PATH); + InterProcessMutex mux = new InterProcessMutex(zkCurator, IGNITE_INIT_LOCK_PATH); mux.acquire(); try { - started = zk.exists(IGNITE_PATH, false) != null && - zk.exists(ALIVE_NODES_PATH, false) != null && - !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty(); + started = igniteClusterStarted(); if (!started) { log.info("First node starts, reset ZK state"); - if (zk.exists(IGNITE_PATH, false) != null) - ZKUtil.deleteRecursive(zk, IGNITE_PATH); + if (zkCurator.checkExists().forPath(IGNITE_PATH) != null) + zkCurator.delete().deletingChildrenIfNeeded().forPath(IGNITE_PATH); // TODO ZK: properly handle first node start and init after full cluster restart. - if (zk.exists(IGNITE_PATH, false) == null) { + if (zkCurator.checkExists().forPath(IGNITE_PATH) == null) { log.info("Initialize Zookeeper nodes."); List<Op> initOps = new ArrayList<>(); @@ -401,8 +406,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery gridStartTime = clusterData.gridStartTime; zk.getData(EVENTS_PATH, zkWatcher, dataUpdateCallback, null); - zk.getChildren(ALIVE_NODES_PATH, zkWatcher, nodesUpdateCallback, null); zk.getChildren(JOIN_HIST_PATH, zkWatcher, nodesUpdateCallback, null); + zk.getChildren(ALIVE_NODES_PATH, zkWatcher, nodesUpdateCallback, null); List<Op> joinOps = new ArrayList<>(); @@ -435,6 +440,10 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Override public void spiStop() throws IgniteSpiException { + closeZkClient(); + } + + private void closeZkClient() { if (zk != null) { try { log.info("Close Zookeeper client."); @@ -487,13 +496,17 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery final UUID nodeId; /** */ + final String zkPath; + + /** */ transient ZKJoiningNodeData joinData; /** * @param order Node order. * @param nodeId Node ID. */ - ZKNodeData(long order, UUID nodeId) { + ZKNodeData(String zkPath, long order, UUID nodeId) { + this.zkPath = zkPath; this.order = order; this.nodeId = nodeId; } @@ -550,7 +563,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery int nodeOrder = Integer.parseInt(path.substring(ID_LEN + 1)) + 1; - return new ZKNodeData(nodeOrder, nodeId); + return new ZKNodeData(path, nodeOrder, nodeId); } /** */ @@ -562,71 +575,90 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** */ private ZKAliveNodes curAlive; - /** - * - */ - class NodesUpdateCallback implements AsyncCallback.Children2Callback { - @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - if (children == null || children.isEmpty()) - return; + private void readJoinNodeData(ZKNodeData data, String path) throws Exception { + //byte[] bytes = zk.getData(path, null, null); + byte[] bytes = zkCurator.getData().forPath(path); - if (path.equals(JOIN_HIST_PATH)) { - log.info("Join nodes changed [rc=" + rc + - ", path=" + path + - ", nodes=" + children + - ", ver=" + (stat != null ? stat.getCversion() : null) + ']'); + assert bytes.length > 0; - for (String child : children) { - ZKNodeData data = parseNodePath(child); + ZKJoiningNodeData joinData = unmarshal(bytes); - if (joinHist.put(data.order, data) == null) { - try { - log.info("New joined node data: " + data); + assert joinData.node != null && joinData.joiningNodeData != null : joinData; - byte[] bytes = zk.getData(path + "/" + child, null, null); + joinData.node.order(data.order); - assert bytes.length > 0; + data.joinData = joinData; + } + + /** + * + */ + class NodesUpdateCallback implements AsyncCallback.Children2Callback { + @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + try { + if (children == null || children.isEmpty()) + return; - ZKJoiningNodeData joinData = unmarshal(bytes); + if (path.equals(JOIN_HIST_PATH)) { + log.info("Join nodes changed [rc=" + rc + + ", path=" + path + + ", nodes=" + children + + ", ver=" + (stat != null ? stat.getCversion() : null) + ']'); - assert joinData.node != null && joinData.joiningNodeData != null : joinData; + for (String child : children) { + ZKNodeData data = parseNodePath(child); - joinData.node.order(data.order); + if (joinHist.put(data.order, data) == null) { + try { + log.info("New joined node data: " + data); - data.joinData = joinData; - } - catch (Exception e) { - // TODO ZK - U.error(log, "Failed to get node data: " + e, e); + readJoinNodeData(data, path + "/" + child); + } + catch (Exception e) { + // TODO ZK + U.error(log, "Failed to get node data: " + e, e); + } } } } - } - else if (path.equals(ALIVE_NODES_PATH)) { - log.info("Alive nodes changed [rc=" + rc + - ", path=" + path + - ", nodes=" + children + - ", ver=" + (stat != null ? stat.getCversion() : null) + ']'); + else if (path.equals(ALIVE_NODES_PATH)) { + log.info("Alive nodes changed [rc=" + rc + + ", path=" + path + + ", nodes=" + children + + ", ver=" + (stat != null ? stat.getCversion() : null) + ']'); - assert stat != null; + assert stat != null; - TreeMap<Long, ZKNodeData> nodes = new TreeMap<>(); + TreeMap<Long, ZKNodeData> nodes = new TreeMap<>(); - for (String child : children) { - ZKNodeData data = parseNodePath(child); + for (String child : children) { + ZKNodeData data = parseNodePath(child); - nodes.put(data.order, data); - } + nodes.put(data.order, data); + } + + ZKAliveNodes newAlive = new ZKAliveNodes(stat.getCversion(), nodes); - ZKAliveNodes newAlive = new ZKAliveNodes(stat.getCversion(), nodes); + generateEvents(curAlive, newAlive); - generateEvents(curAlive, newAlive); + curAlive = newAlive; + } + } + catch (Throwable e) { + log.info("Uncaught error: " + e); - curAlive = newAlive; + throw e; } } } + /** + * For testing only. + */ + public void closeClient() { + closeZkClient(); + } + /** */ private final TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(); @@ -695,6 +727,21 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery ZKNodeData data = joinHist.get(joined.order); + if (data == null) { + try { + readJoinNodeData(joined, JOIN_HIST_PATH + "/" + joined.zkPath); + + assert joined.joinData != null : joined; + + joinHist.put(joined.order, joined); + + data = joined; + } + catch (Exception e) { + U.error(log, "Failed to read node data: " + e); + } + } + assert data != null : joined; ZKJoiningNodeData joinData = data.joinData; @@ -792,7 +839,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery newEvents.ver = expVer + 1; try { - zk.setData(EVENTS_PATH, marshal(newEvents), expVer); + zkCurator.setData().forPath(EVENTS_PATH, marshal(newEvents)); + + // zk.setData(EVENTS_PATH, marshal(newEvents), expVer); } catch (Exception e) { U.error(log, "Events update error: " + e, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/42813c8b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java index f32a7e6..3b049dc 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import org.apache.curator.test.TestingCluster; import org.apache.ignite.Ignite; @@ -31,10 +32,13 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; @@ -42,10 +46,12 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; /** * @@ -66,14 +72,25 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** */ private static volatile boolean err; + /** */ + private boolean testSockNio; + + /** */ + private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new ConcurrentHashMap<>(); + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + if (testSockNio) + System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, ZkTestClientCnxnSocketNIO.class.getName()); + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); cfg.setConsistentId(igniteInstanceName); ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi(); + spis.put(igniteInstanceName, zkSpi); + if (USE_TEST_CLUSTER) { assert zkCluster != null; @@ -114,14 +131,18 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { assertNull(old); - DiscoveryLocalJoinData locJoin = ((IgniteKernal)ignite).context().discovery().localJoin(); + synchronized (nodeEvts) { + DiscoveryLocalJoinData locJoin = ((IgniteKernal)ignite).context().discovery().localJoin(); - nodeEvts.put(locJoin.event().topologyVersion(), locJoin.event()); + nodeEvts.put(locJoin.event().topologyVersion(), locJoin.event()); + } } - DiscoveryEvent old = nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt); + synchronized (nodeEvts) { + DiscoveryEvent old = nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt); - assertNull(old); + assertNull(old); + } } catch (Throwable e) { err = true; @@ -142,30 +163,50 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { @Override protected void beforeTest() throws Exception { super.beforeTest(); - err = false; - - evts.clear(); - if (USE_TEST_CLUSTER) { zkCluster = new TestingCluster(1); zkCluster.start(); } + reset(); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { if (zkCluster != null) { - zkCluster.close(); + try { + zkCluster.close(); + } + catch (Exception e) { + U.error(log, "Failed to stop Zookeeper client: " + e, e); + } zkCluster = null; } super.afterTest(); - assertFalse("Unexpected error, see log for details", err); + try { + assertFalse("Unexpected error, see log for details", err); - checkEventsConsistency(); + checkEventsConsistency(); + } + finally { + reset(); + } + } + + /** + * + */ + private void reset() { + System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); + + ZkTestClientCnxnSocketNIO.reset(); + + System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); + + err = false; evts.clear(); } @@ -209,7 +250,149 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testConnectionRestore1() throws Exception { + testSockNio = true; + + Ignite node0 = startGrid(0); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + c0.closeSocket(false); + + startGrid(1); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore2() throws Exception { + testSockNio = true; + + Ignite node0 = startGrid(0); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + + c0.closeSocket(false); + + startGridsMultiThreaded(1, 5); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_NonCoordinator1() throws Exception { + connectionRestore_NonCoordinator(false); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_NonCoordinator2() throws Exception { + connectionRestore_NonCoordinator(true); + } + + /** + * @throws Exception If failed. + */ + private void connectionRestore_NonCoordinator(boolean failWhenDisconnected) throws Exception { + testSockNio = true; + + Ignite node0 = startGrid(0); + Ignite node1 = startGrid(1); + + ZkTestClientCnxnSocketNIO c1 = ZkTestClientCnxnSocketNIO.forNode(node1); + + c1.closeSocket(true); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(2); + + return null; + } + }, "start-node"); + + checkEvents(node0.configuration().getNodeId(), joinEvent(3)); + + if (failWhenDisconnected) { + ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2)); + + spi.closeClient(); + + checkEvents(node0.configuration().getNodeId(), failEvent(4)); + } + + c1.allowConnect(); + + checkEvents(ignite(1).configuration().getNodeId(), joinEvent(3), failEvent(4)); + + if (!failWhenDisconnected) + fut.get(); + } + + private static DiscoveryEvent joinEvent(long topVer) { + DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_JOINED, null); + + expEvt.topologySnapshot(topVer, null); + + return expEvt; + } + + private static DiscoveryEvent failEvent(long topVer) { + DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_FAILED, null); + + expEvt.topologySnapshot(topVer, null); + + return expEvt; + } + + /** + * @param nodeId Node ID. + * @param expEvts Expected events. + * @throws Exception If fialed. + */ + private void checkEvents(final UUID nodeId, final DiscoveryEvent...expEvts) throws Exception { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId); + + if (nodeEvts == null) { + info("No events for node: " + nodeId); + + return false; + } + + synchronized (nodeEvts) { + for (DiscoveryEvent expEvt : expEvts) { + DiscoveryEvent evt0 = nodeEvts.get(expEvt.topologyVersion()); + + if (evt0 == null) { + info("No event for version: " + expEvt.topologyVersion()); + + return false; + } + + assertEquals(expEvt.type(), evt0.type()); + } + } + + return true; + } + }, 10000)); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore4() throws Exception { + testSockNio = true; + + Ignite node0 = startGrid(0); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + + c0.closeSocket(false); + + startGrid(1); } /** @@ -291,6 +474,10 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { for (Ignite node : G.allGrids()) node.compute().broadcast(new DummyCallable(null)); + + startGrid(0); + + waitForTopology(5); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/42813c8b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java new file mode 100644 index 0000000..4a11c68 --- /dev/null +++ b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.logger.java.JavaLogger; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * + */ +public class ZkTestClientCnxnSocketNIO extends ClientCnxnSocketNIO { + /** */ + public static final IgniteLogger log = new JavaLogger().getLogger(ZkTestClientCnxnSocketNIO.class); + + /** */ + public volatile CountDownLatch blockConnectLatch; + + /** */ + public static ConcurrentHashMap<String, ZkTestClientCnxnSocketNIO> clients = new ConcurrentHashMap<>(); + + /** */ + private final String nodeName; + + /** + * + */ + public static void reset() { + clients.clear(); + } + + /** + * @param node Node. + * @return ZK client. + */ + public static ZkTestClientCnxnSocketNIO forNode(Ignite node) { + return clients.get(node.name()); + } + + /** + * @throws IOException If failed. + */ + public ZkTestClientCnxnSocketNIO() throws IOException { + super(); + + String threadName = Thread.currentThread().getName(); + + nodeName = threadName.substring(threadName.indexOf('-') + 1); + + log.info("ZkTestClientCnxnSocketNIO created for node: " + nodeName); + + clients.put(nodeName, this); + } + + /** {@inheritDoc} */ + @Override void connect(InetSocketAddress addr) throws IOException { + CountDownLatch blockConnect = this.blockConnectLatch; + + log.info("ZkTestClientCnxnSocketNIO connect [node=" + nodeName + ", addr=" + addr + ']'); + + if (blockConnect != null && blockConnect.getCount() > 0) { + try { + log.info("ZkTestClientCnxnSocketNIO block connect"); + + blockConnect.await(); + + log.info("ZkTestClientCnxnSocketNIO finish block connect"); + } + catch (Exception e) { + log.error("Error in ZkTestClientCnxnSocketNIO: " + e, e); + } + } + + super.connect(addr); + } + + /** + * + */ + public void allowConnect() { + assert blockConnectLatch != null && blockConnectLatch.getCount() == 1; + + log.info("ZkTestClientCnxnSocketNIO allowConnect [node=" + nodeName + ']'); + + blockConnectLatch.countDown(); + } + + /** + * @param blockConnect {@code True} to block client reconnect. + * @throws Exception If failed. + */ + public void closeSocket(boolean blockConnect) throws Exception { + if (blockConnect) + blockConnectLatch = new CountDownLatch(1); + + log.info("ZkTestClientCnxnSocketNIO closeSocket [node=" + nodeName + ", block=" + blockConnect + ']'); + + SelectionKey k = GridTestUtils.getFieldValue(this, ClientCnxnSocketNIO.class, "sockKey"); + + k.channel().close(); + } +}