Repository: ignite Updated Branches: refs/heads/ignite-zk 42813c8b0 -> 32f7fa898
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/32f7fa89 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/32f7fa89 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/32f7fa89 Branch: refs/heads/ignite-zk Commit: 32f7fa89899bd005791d941aed50c8f4f35dd46c Parents: 42813c8 Author: sboikov <sboi...@gridgain.com> Authored: Tue Nov 14 11:05:08 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Nov 14 13:22:04 2017 +0300 ---------------------------------------------------------------------- .../managers/discovery/JoiningNodesAware.java | 27 +++ .../GridDhtPartitionsExchangeFuture.java | 3 +- .../communication/tcp/TcpCommunicationSpi.java | 3 + .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 212 ++++++++++++------- .../zk/ZookeeperDiscoverySpiBasicTest.java | 132 +++++++++++- 5 files changed, 300 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/32f7fa89/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/JoiningNodesAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/JoiningNodesAware.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/JoiningNodesAware.java new file mode 100644 index 0000000..85128e4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/JoiningNodesAware.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import java.util.UUID; + +/** + * + */ +public interface JoiningNodesAware { + public boolean knownNode(UUID nodeId); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/32f7fa89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index d29293e..5d9186b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1598,7 +1598,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte mergedJoinExchMsgs = new LinkedHashMap<>(); if (msg != null) { - assert msg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(node.order())); + // TODO ZK + // assert msg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(node.order())); if (log.isInfoEnabled()) { log.info("Merge server join exchange, message received [curFut=" + initialVersion() + http://git-wip-us.apache.org/repos/asf/ignite/blob/32f7fa89/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 04683ac..fdba1ca 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.discovery.JoiningNodesAware; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; import org.apache.ignite.internal.util.GridConcurrentFactory; @@ -492,6 +493,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati unknownNode = false; } } + else if (discoverySpi instanceof JoiningNodesAware) + unknownNode = !((JoiningNodesAware) discoverySpi).knownNode(sndId); if (unknownNode) { U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/32f7fa89/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 5660949..c45c559 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -37,6 +37,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.managers.discovery.JoiningNodesAware; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -59,10 +60,8 @@ import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZKUtil; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; @@ -74,7 +73,7 @@ import org.jetbrains.annotations.Nullable; @IgniteSpiMultipleInstancesSupport(true) @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) -public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { +public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, JoiningNodesAware { /** */ private static final String IGNITE_PATH = "/ignite"; @@ -130,6 +129,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery private final DataUpdateCallback dataUpdateCallback; /** */ + private final JoinedNodes joinHist = new JoinedNodes(); + + /** */ private ZookeeperClusterNode locNode; /** */ @@ -148,6 +150,12 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** */ private CountDownLatch joinLatch = new CountDownLatch(1); + /** */ + private Exception joinErr; + + /** For testing only. */ + private CountDownLatch connectStart = new CountDownLatch(1); + /** * */ @@ -178,6 +186,25 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery private Serializable consistentId; /** {@inheritDoc} */ + @Override public boolean knownNode(UUID nodeId) { + try { + for (String child : zkCurator.getChildren().forPath(ALIVE_NODES_PATH)) { + ZKNodeData nodeData = parseNodePath(child); + + if (nodeData.nodeId.equals(nodeId)) + return true; + } + + return false; + } + catch (Exception e) { + U.error(log, "Failed to read alive nodes: " + e, e); + + return false; + } + } + + /** {@inheritDoc} */ @Nullable @Override public Serializable consistentId() throws IgniteSpiException { if (consistentId == null) { final Serializable cfgId = ignite.configuration().getConsistentId(); @@ -417,10 +444,17 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery String zkNode = "/" + locNode.id().toString() + "-"; - joinOps.add(Op.create(JOIN_HIST_PATH + zkNode, nodeData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL)); - joinOps.add(Op.create(ALIVE_NODES_PATH + zkNode, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL)); +// joinOps.add(Op.create(JOIN_HIST_PATH + zkNode, nodeData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL)); +// joinOps.add(Op.create(ALIVE_NODES_PATH + zkNode, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL)); +// List<OpResult> res = zk.multi(joinOps); + + zkCurator.inTransaction(). + create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(JOIN_HIST_PATH + zkNode, nodeData). + and(). + create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(ALIVE_NODES_PATH + zkNode). + and().commit(); - List<OpResult> res = zk.multi(joinOps); + connectStart.countDown(); log.info("Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']'); @@ -432,12 +466,25 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery break; } + if (joinErr != null) + throw new IgniteSpiException(joinErr); } catch (Exception e) { + connectStart.countDown(); + throw new IgniteSpiException(e); } } + /** + * For testing only. + * + * @throws Exception If failed. + */ + public void waitConnectStart() throws Exception { + connectStart.await(); + } + /** {@inheritDoc} */ @Override public void spiStop() throws IgniteSpiException { closeZkClient(); @@ -567,9 +614,6 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery } /** */ - private Map<Long, ZKNodeData> joinHist = new HashMap<>(); - - /** */ private boolean crd; /** */ @@ -583,13 +627,48 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery ZKJoiningNodeData joinData = unmarshal(bytes); - assert joinData.node != null && joinData.joiningNodeData != null : joinData; + assert joinData != null && joinData.node != null && joinData.joiningNodeData != null : joinData; joinData.node.order(data.order); data.joinData = joinData; } + private void processJoinedNodesHistory(List<String> children) { + for (String child : children) { + ZKNodeData data = parseNodePath(child); + + if (!joinHist.hist.containsKey(data.order)) { + try { + Object old = joinHist.hist.put(data.order, data); + + assert old == null : old; + + readJoinNodeData(data, JOIN_HIST_PATH + "/" + child); + + assert data.joinData != null && joinHist.hist.get(data.order) == data : data; + + log.info("New joined node data: " + data); + } + catch (Exception e) { + // TODO ZK + U.error(log, "Failed to get node data: " + e, e); + } + } + } + } + + /** + * + */ + private static class JoinedNodes { + /** */ + private Stat stat; + + /** */ + private final Map<Long, ZKNodeData> hist = new HashMap<>(); + } + /** * */ @@ -605,21 +684,10 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery ", nodes=" + children + ", ver=" + (stat != null ? stat.getCversion() : null) + ']'); - for (String child : children) { - ZKNodeData data = parseNodePath(child); - - if (joinHist.put(data.order, data) == null) { - try { - log.info("New joined node data: " + data); + if (stat != null) + joinHist.stat = stat; - readJoinNodeData(data, path + "/" + child); - } - catch (Exception e) { - // TODO ZK - U.error(log, "Failed to get node data: " + e, e); - } - } - } + processJoinedNodesHistory(children); } else if (path.equals(ALIVE_NODES_PATH)) { log.info("Alive nodes changed [rc=" + rc + @@ -657,6 +725,10 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery */ public void closeClient() { closeZkClient(); + + joinErr = new Exception("Start error"); + + joinLatch.countDown(); } /** */ @@ -698,7 +770,11 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery if (!crd) return; - log.info("Generate discovery events [oldNodes=" + oldNodes + ", newNodes=" + newNodes + ']'); + long nextJoinOrder = curCrdEvts != null ? curCrdEvts.nextJoinOrder : 1L; + + log.info("Generate discovery events [oldNodes=" + oldNodes + + ", newNodes=" + newNodes + + ", nextJoinOrder=" + nextJoinOrder + ']'); if (oldNodes.ver == newNodes.ver) return; @@ -706,48 +782,32 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery TreeMap<Integer, ZKDiscoveryEvent> evts = new TreeMap<>(); Set<Long> failedNodes = new HashSet<>(); - Set<Long> joinedNodes = new HashSet<>(); - synchronized (curTop) { - for (int v = oldNodes.ver + 1; v <= newNodes.ver; v++) { - ZKNodeData joined = null; - - for (ZKNodeData newData : newNodes.nodesByOrder.values()) { - if (!curTop.containsKey(newData.order) && !joinedNodes.contains(newData.order)) { - joined = newData; + for (int v = oldNodes.ver + 1; v <= newNodes.ver; v++) { + ZKNodeData joined = joinHist.hist.get(nextJoinOrder); - break; - } - } + if (joined == null) { + try { + // TODO ZK: check version. + List<String> children = zkCurator.getChildren().forPath(JOIN_HIST_PATH); - // TODO ZK: process joinHist + processJoinedNodesHistory(children); - if (joined != null) { - joinedNodes.add(joined.order); - - ZKNodeData data = joinHist.get(joined.order); - - if (data == null) { - try { - readJoinNodeData(joined, JOIN_HIST_PATH + "/" + joined.zkPath); - - assert joined.joinData != null : joined; - - joinHist.put(joined.order, joined); - - data = joined; - } - catch (Exception e) { - U.error(log, "Failed to read node data: " + e); - } - } + joined = joinHist.hist.get(nextJoinOrder); + } + catch (Exception e) { + U.error(log, "Failed to read joined nodes: " + e, e); + } + } - assert data != null : joined; + // TODO ZK: process joinHist - ZKJoiningNodeData joinData = data.joinData; + if (joined != null) { + assert joined.joinData != null : joined; - assert joinData != null : data; + ZKJoiningNodeData joinData = joined.joinData; + synchronized (curTop) { curTop.put(joinData.node.order(), joinData.node); ZKDiscoveryEvent joinEvt = new ZKDiscoveryEvent(EventType.EVT_NODE_JOINED, @@ -773,12 +833,12 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery evts.put(v, joinEvt); - if (!newNodes.nodesByOrder.containsKey(data.order)) { + if (!newNodes.nodesByOrder.containsKey(joined.order)) { v++; - ZookeeperClusterNode failedNode = curTop.remove(data.order); + ZookeeperClusterNode failedNode = curTop.remove(joined.order); - assert failedNode != null : data.order; + assert failedNode != null : joined.order; log.info("ZK event [type=FAIL, node=" + failedNode.id() + ", ver=" + v + ']'); @@ -787,12 +847,16 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery failedNode, new ArrayList<>(curTop.values()))); } + + nextJoinOrder++; } - else { - for (ZKNodeData oldData : oldNodes.nodesByOrder.values()) { - if (!failedNodes.contains(oldData.order) && !newNodes.nodesByOrder.containsKey(oldData.order)) { - failedNodes.add(oldData.order); + } + else { + for (ZKNodeData oldData : oldNodes.nodesByOrder.values()) { + if (!failedNodes.contains(oldData.order) && !newNodes.nodesByOrder.containsKey(oldData.order)) { + failedNodes.add(oldData.order); + synchronized (curTop) { ZookeeperClusterNode failedNode = curTop.remove(oldData.order); assert failedNode != null : oldData.order; @@ -803,9 +867,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery v, failedNode, new ArrayList<>(curTop.values()))); - - break; } + + break; } } } @@ -820,7 +884,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery if (curCrdEvts == null) { expVer = 0; - newEvents = new ZKDiscoveryEvents(newNodes, evts); + newEvents = new ZKDiscoveryEvents(nextJoinOrder, newNodes, evts); } else { TreeMap<Integer, ZKDiscoveryEvent> evts0 = new TreeMap<>(curCrdEvts.evts); @@ -831,7 +895,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery evts0.put(e.topVer, e); } - newEvents = new ZKDiscoveryEvents(newNodes, evts0); + newEvents = new ZKDiscoveryEvents(nextJoinOrder, newNodes, evts0); expVer = curCrdEvts.ver; } @@ -839,7 +903,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery newEvents.ver = expVer + 1; try { - zkCurator.setData().forPath(EVENTS_PATH, marshal(newEvents)); + zkCurator.setData().withVersion(expVer).forPath(EVENTS_PATH, marshal(newEvents)); // zk.setData(EVENTS_PATH, marshal(newEvents), expVer); } @@ -1002,11 +1066,15 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery @GridToStringInclude final TreeMap<Integer, ZKDiscoveryEvent> evts; + /** */ + final long nextJoinOrder; + /** * @param aliveNodes * @param evts */ - ZKDiscoveryEvents(ZKAliveNodes aliveNodes, TreeMap<Integer, ZKDiscoveryEvent> evts) { + ZKDiscoveryEvents(long nextJoinOrder, ZKAliveNodes aliveNodes, TreeMap<Integer, ZKDiscoveryEvent> evts) { + this.nextJoinOrder = nextJoinOrder; this.aliveNodes = aliveNodes; this.evts = evts; } http://git-wip-us.apache.org/repos/asf/ignite/blob/32f7fa89/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java index 3b049dc..729e09f 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java @@ -24,6 +24,7 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.curator.test.TestingCluster; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; @@ -89,6 +90,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi(); + zkSpi.setSessionTimeout(30_000); + spis.put(igniteInstanceName, zkSpi); if (USE_TEST_CLUSTER) { @@ -194,6 +197,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { finally { reset(); } + + stopAllGrids(); } /** @@ -223,7 +228,11 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { if (!nodeId.equals(nodeEvtEntry0.getKey())) { Map<Long, DiscoveryEvent> nodeEvts0 = nodeEvtEntry0.getValue(); - checkEventsConsistency(nodeEvts, nodeEvts0); + synchronized (nodeEvts) { + synchronized (nodeEvts0) { + checkEventsConsistency(nodeEvts, nodeEvts0); + } + } } } } @@ -291,6 +300,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @param failWhenDisconnected {@code True} if fail node while another node is disconnected. * @throws Exception If failed. */ private void connectionRestore_NonCoordinator(boolean failWhenDisconnected) throws Exception { @@ -311,24 +321,129 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } }, "start-node"); - checkEvents(node0.configuration().getNodeId(), joinEvent(3)); + checkEvents(node0, joinEvent(3)); if (failWhenDisconnected) { ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2)); spi.closeClient(); - checkEvents(node0.configuration().getNodeId(), failEvent(4)); + checkEvents(node0, failEvent(4)); } c1.allowConnect(); - checkEvents(ignite(1).configuration().getNodeId(), joinEvent(3), failEvent(4)); + checkEvents(ignite(1), joinEvent(3)); + + if (failWhenDisconnected) + checkEvents(ignite(1), failEvent(4)); if (!failWhenDisconnected) fut.get(); } + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator1() throws Exception { + connectionRestore_Coordinator(1, 1, 0); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator1_1() throws Exception { + connectionRestore_Coordinator(1, 1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator2() throws Exception { + connectionRestore_Coordinator(1, 3, 0); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator3() throws Exception { + connectionRestore_Coordinator(3, 3, 0); + } + + /** + * @param initNodes Number of initially started nodes. + * @param startNodes Number of nodes to start after coordinator loose connection. + * @throws Exception If failed. + */ + private void connectionRestore_Coordinator(int initNodes, int startNodes, int failCnt) throws Exception { + testSockNio = true; + + Ignite node0 = startGrids(initNodes); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + + c0.closeSocket(true); + + final AtomicInteger nodeIdx = new AtomicInteger(initNodes); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try { + startGrid(nodeIdx.getAndIncrement()); + } + catch (Exception e) { + error("Start failed: " + e); + } + + return null; + } + }, startNodes, "start-node"); + + int cnt = 0; + + for (int i = initNodes; i < initNodes + startNodes; i++) { + ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i)); + + spi.waitConnectStart(); + + if (cnt < failCnt) + spi.closeClient(); + } + + c0.allowConnect(); + + DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes]; + + for (int i = 0; i < startNodes; i++) + expEvts[i] = joinEvent(initNodes + i + 1); + + for (int i = 0; i < initNodes; i++) + checkEvents(ignite(i), expEvts); + + fut.get(); + + waitForTopology(initNodes + startNodes - failCnt); + } + + /** + * @param nodeName Node name. + * @return Node's discovery SPI. + * @throws Exception If failed. + */ + private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws Exception { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return spis.contains(nodeName); + } + }, 5000); + + ZookeeperDiscoverySpi spi = spis.get(nodeName); + + assertNotNull("Failed to get SPI for node: " + nodeName, spi); + + return spi; + } + private static DiscoveryEvent joinEvent(long topVer) { DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_JOINED, null); @@ -346,6 +461,15 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @param node Node. + * @param expEvts Expected events. + * @throws Exception If fialed. + */ + private void checkEvents(final Ignite node, final DiscoveryEvent...expEvts) throws Exception { + checkEvents(node.cluster().localNode().id(), expEvts); + } + + /** * @param nodeId Node ID. * @param expEvts Expected events. * @throws Exception If fialed.