Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 42813c8b0 -> 32f7fa898


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/32f7fa89
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/32f7fa89
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/32f7fa89

Branch: refs/heads/ignite-zk
Commit: 32f7fa89899bd005791d941aed50c8f4f35dd46c
Parents: 42813c8
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Nov 14 11:05:08 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Nov 14 13:22:04 2017 +0300

----------------------------------------------------------------------
 .../managers/discovery/JoiningNodesAware.java   |  27 +++
 .../GridDhtPartitionsExchangeFuture.java        |   3 +-
 .../communication/tcp/TcpCommunicationSpi.java  |   3 +
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 212 ++++++++++++-------
 .../zk/ZookeeperDiscoverySpiBasicTest.java      | 132 +++++++++++-
 5 files changed, 300 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/32f7fa89/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/JoiningNodesAware.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/JoiningNodesAware.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/JoiningNodesAware.java
new file mode 100644
index 0000000..85128e4
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/JoiningNodesAware.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+public interface JoiningNodesAware {
+    public boolean knownNode(UUID nodeId);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/32f7fa89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d29293e..5d9186b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1598,7 +1598,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 mergedJoinExchMsgs = new LinkedHashMap<>();
 
             if (msg != null) {
-                assert msg.exchangeId().topologyVersion().equals(new 
AffinityTopologyVersion(node.order()));
+                // TODO ZK
+                // assert msg.exchangeId().topologyVersion().equals(new 
AffinityTopologyVersion(node.order()));
 
                 if (log.isInfoEnabled()) {
                     log.info("Merge server join exchange, message received 
[curFut=" + initialVersion() +

http://git-wip-us.apache.org/repos/asf/ignite/blob/32f7fa89/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 04683ac..fdba1ca 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -64,6 +64,7 @@ import 
org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.discovery.JoiningNodesAware;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
@@ -492,6 +493,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                                 unknownNode = false;
                         }
                     }
+                    else if (discoverySpi instanceof JoiningNodesAware)
+                        unknownNode = !((JoiningNodesAware) 
discoverySpi).knownNode(sndId);
 
                     if (unknownNode) {
                         U.warn(log, "Close incoming connection, unknown node 
[nodeId=" + sndId + ", ses=" + ses + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/32f7fa89/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 5660949..c45c559 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -37,6 +37,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.managers.discovery.JoiningNodesAware;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -59,10 +60,8 @@ import 
org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
@@ -74,7 +73,7 @@ import org.jetbrains.annotations.Nullable;
 @IgniteSpiMultipleInstancesSupport(true)
 @DiscoverySpiOrderSupport(true)
 @DiscoverySpiHistorySupport(true)
-public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements 
DiscoverySpi {
+public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements 
DiscoverySpi, JoiningNodesAware {
     /** */
     private static final String IGNITE_PATH = "/ignite";
 
@@ -130,6 +129,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
     private final DataUpdateCallback dataUpdateCallback;
 
     /** */
+    private final JoinedNodes joinHist = new JoinedNodes();
+
+    /** */
     private ZookeeperClusterNode locNode;
 
     /** */
@@ -148,6 +150,12 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
     /** */
     private CountDownLatch joinLatch = new CountDownLatch(1);
 
+    /** */
+    private Exception joinErr;
+
+    /** For testing only. */
+    private CountDownLatch connectStart = new CountDownLatch(1);
+
     /**
      *
      */
@@ -178,6 +186,25 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
     private Serializable consistentId;
 
     /** {@inheritDoc} */
+    @Override public boolean knownNode(UUID nodeId) {
+        try {
+            for (String child : 
zkCurator.getChildren().forPath(ALIVE_NODES_PATH)) {
+                ZKNodeData nodeData = parseNodePath(child);
+
+                if (nodeData.nodeId.equals(nodeId))
+                    return true;
+            }
+
+            return false;
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to read alive nodes: " + e, e);
+
+            return false;
+        }
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public Serializable consistentId() throws 
IgniteSpiException {
         if (consistentId == null) {
             final Serializable cfgId = 
ignite.configuration().getConsistentId();
@@ -417,10 +444,17 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
 
             String zkNode = "/" + locNode.id().toString() + "-";
 
-            joinOps.add(Op.create(JOIN_HIST_PATH + zkNode, nodeData, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL));
-            joinOps.add(Op.create(ALIVE_NODES_PATH + zkNode, EMPTY_BYTES, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL));
+//            joinOps.add(Op.create(JOIN_HIST_PATH + zkNode, nodeData, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL));
+//            joinOps.add(Op.create(ALIVE_NODES_PATH + zkNode, EMPTY_BYTES, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL));
+//            List<OpResult> res = zk.multi(joinOps);
+
+            zkCurator.inTransaction().
+                
create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(JOIN_HIST_PATH
 + zkNode, nodeData).
+                and().
+                
create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(ALIVE_NODES_PATH
 + zkNode).
+                and().commit();
 
-            List<OpResult> res = zk.multi(joinOps);
+            connectStart.countDown();
 
             log.info("Waiting for local join event [nodeId=" + locNode.id() + 
", name=" + igniteInstanceName + ']');
 
@@ -432,12 +466,25 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
                     break;
             }
 
+            if (joinErr != null)
+                throw new IgniteSpiException(joinErr);
         }
         catch (Exception e) {
+            connectStart.countDown();
+
             throw new IgniteSpiException(e);
         }
     }
 
+    /**
+     * For testing only.
+     *
+     * @throws Exception If failed.
+     */
+    public void waitConnectStart() throws Exception {
+        connectStart.await();
+    }
+
     /** {@inheritDoc} */
     @Override public void spiStop() throws IgniteSpiException {
         closeZkClient();
@@ -567,9 +614,6 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
     }
 
     /** */
-    private Map<Long, ZKNodeData> joinHist = new HashMap<>();
-
-    /** */
     private boolean crd;
 
     /** */
@@ -583,13 +627,48 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
 
         ZKJoiningNodeData joinData = unmarshal(bytes);
 
-        assert joinData.node != null && joinData.joiningNodeData != null : 
joinData;
+        assert joinData != null && joinData.node != null && 
joinData.joiningNodeData != null : joinData;
 
         joinData.node.order(data.order);
 
         data.joinData = joinData;
     }
 
+    private void processJoinedNodesHistory(List<String> children) {
+        for (String child : children) {
+            ZKNodeData data = parseNodePath(child);
+
+            if (!joinHist.hist.containsKey(data.order)) {
+                try {
+                    Object old = joinHist.hist.put(data.order, data);
+
+                    assert old == null : old;
+
+                    readJoinNodeData(data, JOIN_HIST_PATH + "/" + child);
+
+                    assert data.joinData != null && 
joinHist.hist.get(data.order) == data : data;
+
+                    log.info("New joined node data: " + data);
+                }
+                catch (Exception e) {
+                    // TODO ZK
+                    U.error(log, "Failed to get node data: " + e, e);
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class JoinedNodes {
+        /** */
+        private Stat stat;
+
+        /** */
+        private final Map<Long, ZKNodeData> hist = new HashMap<>();
+    }
+
     /**
      *
      */
@@ -605,21 +684,10 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
                         ", nodes=" + children +
                         ", ver=" + (stat != null ? stat.getCversion() : null) 
+ ']');
 
-                    for (String child : children) {
-                        ZKNodeData data = parseNodePath(child);
-
-                        if (joinHist.put(data.order, data) == null) {
-                            try {
-                                log.info("New joined node data: " + data);
+                    if (stat != null)
+                        joinHist.stat = stat;
 
-                                readJoinNodeData(data, path + "/" + child);
-                            }
-                            catch (Exception e) {
-                                // TODO ZK
-                                U.error(log, "Failed to get node data: " + e, 
e);
-                            }
-                        }
-                    }
+                    processJoinedNodesHistory(children);
                 }
                 else if (path.equals(ALIVE_NODES_PATH)) {
                     log.info("Alive nodes changed [rc=" + rc +
@@ -657,6 +725,10 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
      */
     public void closeClient() {
         closeZkClient();
+
+        joinErr = new Exception("Start error");
+
+        joinLatch.countDown();
     }
 
     /** */
@@ -698,7 +770,11 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
         if (!crd)
             return;
 
-        log.info("Generate discovery events [oldNodes=" + oldNodes + ", 
newNodes=" + newNodes + ']');
+        long nextJoinOrder = curCrdEvts != null ? curCrdEvts.nextJoinOrder : 
1L;
+
+        log.info("Generate discovery events [oldNodes=" + oldNodes +
+            ", newNodes=" + newNodes +
+            ", nextJoinOrder=" + nextJoinOrder + ']');
 
         if (oldNodes.ver == newNodes.ver)
             return;
@@ -706,48 +782,32 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
         TreeMap<Integer, ZKDiscoveryEvent> evts = new TreeMap<>();
 
         Set<Long> failedNodes = new HashSet<>();
-        Set<Long> joinedNodes = new HashSet<>();
 
-        synchronized (curTop) {
-            for (int v = oldNodes.ver + 1; v <= newNodes.ver; v++) {
-                ZKNodeData joined = null;
-
-                for (ZKNodeData newData : newNodes.nodesByOrder.values()) {
-                    if (!curTop.containsKey(newData.order) && 
!joinedNodes.contains(newData.order)) {
-                        joined = newData;
+        for (int v = oldNodes.ver + 1; v <= newNodes.ver; v++) {
+            ZKNodeData joined = joinHist.hist.get(nextJoinOrder);
 
-                        break;
-                    }
-                }
+            if (joined == null) {
+                try {
+                    // TODO ZK: check version.
+                    List<String> children = 
zkCurator.getChildren().forPath(JOIN_HIST_PATH);
 
-                // TODO ZK: process joinHist
+                    processJoinedNodesHistory(children);
 
-                if (joined != null) {
-                    joinedNodes.add(joined.order);
-
-                    ZKNodeData data = joinHist.get(joined.order);
-
-                    if (data == null) {
-                        try {
-                            readJoinNodeData(joined, JOIN_HIST_PATH + "/" + 
joined.zkPath);
-
-                            assert joined.joinData != null : joined;
-
-                            joinHist.put(joined.order, joined);
-
-                            data = joined;
-                        }
-                        catch (Exception e) {
-                            U.error(log, "Failed to read node data: " + e);
-                        }
-                    }
+                    joined = joinHist.hist.get(nextJoinOrder);
+                }
+                catch (Exception e) {
+                    U.error(log, "Failed to read joined nodes: " + e, e);
+                }
+            }
 
-                    assert data != null : joined;
+            // TODO ZK: process joinHist
 
-                    ZKJoiningNodeData joinData = data.joinData;
+            if (joined != null) {
+                assert joined.joinData != null : joined;
 
-                    assert joinData != null : data;
+                ZKJoiningNodeData joinData = joined.joinData;
 
+                synchronized (curTop) {
                     curTop.put(joinData.node.order(), joinData.node);
 
                     ZKDiscoveryEvent joinEvt = new 
ZKDiscoveryEvent(EventType.EVT_NODE_JOINED,
@@ -773,12 +833,12 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
 
                     evts.put(v, joinEvt);
 
-                    if (!newNodes.nodesByOrder.containsKey(data.order)) {
+                    if (!newNodes.nodesByOrder.containsKey(joined.order)) {
                         v++;
 
-                        ZookeeperClusterNode failedNode = 
curTop.remove(data.order);
+                        ZookeeperClusterNode failedNode = 
curTop.remove(joined.order);
 
-                        assert failedNode != null : data.order;
+                        assert failedNode != null : joined.order;
 
                         log.info("ZK event [type=FAIL, node=" + 
failedNode.id() + ", ver=" + v + ']');
 
@@ -787,12 +847,16 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
                             failedNode,
                             new ArrayList<>(curTop.values())));
                     }
+
+                    nextJoinOrder++;
                 }
-                else {
-                    for (ZKNodeData oldData : oldNodes.nodesByOrder.values()) {
-                        if (!failedNodes.contains(oldData.order) && 
!newNodes.nodesByOrder.containsKey(oldData.order)) {
-                            failedNodes.add(oldData.order);
+            }
+            else {
+                for (ZKNodeData oldData : oldNodes.nodesByOrder.values()) {
+                    if (!failedNodes.contains(oldData.order) && 
!newNodes.nodesByOrder.containsKey(oldData.order)) {
+                        failedNodes.add(oldData.order);
 
+                        synchronized (curTop) {
                             ZookeeperClusterNode failedNode = 
curTop.remove(oldData.order);
 
                             assert failedNode != null : oldData.order;
@@ -803,9 +867,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
                                 v,
                                 failedNode,
                                 new ArrayList<>(curTop.values())));
-
-                            break;
                         }
+
+                        break;
                     }
                 }
             }
@@ -820,7 +884,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
         if (curCrdEvts == null) {
             expVer = 0;
 
-            newEvents = new ZKDiscoveryEvents(newNodes, evts);
+            newEvents = new ZKDiscoveryEvents(nextJoinOrder, newNodes, evts);
         }
         else {
             TreeMap<Integer, ZKDiscoveryEvent> evts0 = new 
TreeMap<>(curCrdEvts.evts);
@@ -831,7 +895,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
                 evts0.put(e.topVer, e);
             }
 
-            newEvents = new ZKDiscoveryEvents(newNodes, evts0);
+            newEvents = new ZKDiscoveryEvents(nextJoinOrder, newNodes, evts0);
 
             expVer = curCrdEvts.ver;
         }
@@ -839,7 +903,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
         newEvents.ver = expVer + 1;
 
         try {
-            zkCurator.setData().forPath(EVENTS_PATH, marshal(newEvents));
+            zkCurator.setData().withVersion(expVer).forPath(EVENTS_PATH, 
marshal(newEvents));
 
             // zk.setData(EVENTS_PATH, marshal(newEvents), expVer);
         }
@@ -1002,11 +1066,15 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
         @GridToStringInclude
         final TreeMap<Integer, ZKDiscoveryEvent> evts;
 
+        /** */
+        final long nextJoinOrder;
+
         /**
          * @param aliveNodes
          * @param evts
          */
-        ZKDiscoveryEvents(ZKAliveNodes aliveNodes, TreeMap<Integer, 
ZKDiscoveryEvent> evts) {
+        ZKDiscoveryEvents(long nextJoinOrder, ZKAliveNodes aliveNodes, 
TreeMap<Integer, ZKDiscoveryEvent> evts) {
+            this.nextJoinOrder = nextJoinOrder;
             this.aliveNodes = aliveNodes;
             this.evts = evts;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/32f7fa89/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index 3b049dc..729e09f 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -24,6 +24,7 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.curator.test.TestingCluster;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -89,6 +90,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
         ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
 
+        zkSpi.setSessionTimeout(30_000);
+
         spis.put(igniteInstanceName, zkSpi);
 
         if (USE_TEST_CLUSTER) {
@@ -194,6 +197,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         finally {
             reset();
         }
+
+        stopAllGrids();
     }
 
     /**
@@ -223,7 +228,11 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
                 if (!nodeId.equals(nodeEvtEntry0.getKey())) {
                     Map<Long, DiscoveryEvent> nodeEvts0 = 
nodeEvtEntry0.getValue();
 
-                    checkEventsConsistency(nodeEvts, nodeEvts0);
+                    synchronized (nodeEvts) {
+                        synchronized (nodeEvts0) {
+                            checkEventsConsistency(nodeEvts, nodeEvts0);
+                        }
+                    }
                 }
             }
         }
@@ -291,6 +300,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @param failWhenDisconnected {@code True} if fail node while another 
node is disconnected.
      * @throws Exception If failed.
      */
     private void connectionRestore_NonCoordinator(boolean 
failWhenDisconnected) throws Exception {
@@ -311,24 +321,129 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
             }
         }, "start-node");
 
-        checkEvents(node0.configuration().getNodeId(), joinEvent(3));
+        checkEvents(node0, joinEvent(3));
 
         if (failWhenDisconnected) {
             ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2));
 
             spi.closeClient();
 
-            checkEvents(node0.configuration().getNodeId(), failEvent(4));
+            checkEvents(node0, failEvent(4));
         }
 
         c1.allowConnect();
 
-        checkEvents(ignite(1).configuration().getNodeId(), joinEvent(3), 
failEvent(4));
+        checkEvents(ignite(1), joinEvent(3));
+
+        if (failWhenDisconnected)
+            checkEvents(ignite(1), failEvent(4));
 
         if (!failWhenDisconnected)
             fut.get();
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator1() throws Exception {
+        connectionRestore_Coordinator(1, 1, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator1_1() throws Exception {
+        connectionRestore_Coordinator(1, 1, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator2() throws Exception {
+        connectionRestore_Coordinator(1, 3, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator3() throws Exception {
+        connectionRestore_Coordinator(3, 3, 0);
+    }
+
+    /**
+     * @param initNodes Number of initially started nodes.
+     * @param startNodes Number of nodes to start after coordinator loose 
connection.
+     * @throws Exception If failed.
+     */
+    private void connectionRestore_Coordinator(int initNodes, int startNodes, 
int failCnt) throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrids(initNodes);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(true);
+
+        final AtomicInteger nodeIdx = new AtomicInteger(initNodes);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                try {
+                    startGrid(nodeIdx.getAndIncrement());
+                }
+                catch (Exception e) {
+                    error("Start failed: " + e);
+                }
+
+                return null;
+            }
+        }, startNodes, "start-node");
+
+        int cnt = 0;
+
+        for (int i = initNodes; i < initNodes + startNodes; i++) {
+            ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i));
+
+            spi.waitConnectStart();
+
+            if (cnt < failCnt)
+                spi.closeClient();
+        }
+
+        c0.allowConnect();
+
+        DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes];
+
+        for (int i = 0; i < startNodes; i++)
+            expEvts[i] = joinEvent(initNodes + i + 1);
+
+        for (int i = 0; i < initNodes; i++)
+            checkEvents(ignite(i), expEvts);
+
+        fut.get();
+
+        waitForTopology(initNodes + startNodes - failCnt);
+    }
+
+    /**
+     * @param nodeName Node name.
+     * @return Node's discovery SPI.
+     * @throws Exception If failed.
+     */
+    private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws 
Exception {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return spis.contains(nodeName);
+            }
+        }, 5000);
+
+        ZookeeperDiscoverySpi spi = spis.get(nodeName);
+
+        assertNotNull("Failed to get SPI for node: " + nodeName, spi);
+
+        return spi;
+    }
+
     private static DiscoveryEvent joinEvent(long topVer) {
         DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_JOINED, null);
 
@@ -346,6 +461,15 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @param node Node.
+     * @param expEvts Expected events.
+     * @throws Exception If fialed.
+     */
+    private void checkEvents(final Ignite node, final 
DiscoveryEvent...expEvts) throws Exception {
+        checkEvents(node.cluster().localNode().id(), expEvts);
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param expEvts Expected events.
      * @throws Exception If fialed.

Reply via email to