zk

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/930a5179
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/930a5179
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/930a5179

Branch: refs/heads/ignite-zk
Commit: 930a5179955eff26d7292af8a7b8f87c8ecee3b3
Parents: c145262
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Dec 25 14:38:05 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Dec 25 14:46:11 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZkDiscoveryEventsData.java      |  26 +-
 .../discovery/zk/internal/ZkIgnitePaths.java    |   2 +-
 .../zk/internal/ZkNoServersMessage.java         |  45 +++
 .../discovery/zk/internal/ZookeeperClient.java  |  26 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 281 ++++++++++++++++---
 .../ZookeeperDiscoverySpiBasicTest.java         | 101 ++++++-
 6 files changed, 437 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/930a5179/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
index f8727e3..cb2d0be 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -31,6 +31,9 @@ class ZkDiscoveryEventsData implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
+    final UUID clusterId;
+
+    /** */
     int procCustEvt = -1;
 
     /** */
@@ -55,17 +58,36 @@ class ZkDiscoveryEventsData implements Serializable {
     private UUID commErrFutId;
 
     /**
-     * @param startInternalOrder First
+     * @param startInternalOrder Starting internal order for cluster (znodes 
having lower order belong
+     *      to previous cluster and should be ignored).
+     * @param clusterStartTime Start time of first node in cluster.
+     * @return Events.
+     */
+    static ZkDiscoveryEventsData createForNewCluster(long startInternalOrder, 
long clusterStartTime) {
+        return new ZkDiscoveryEventsData(
+            UUID.randomUUID(),
+            startInternalOrder,
+            clusterStartTime,
+            1L,
+            new TreeMap<Long, ZkDiscoveryEventData>()
+        );
+    }
+
+    /**
+     * @param clusterId Cluster ID.
+     * @param startInternalOrder Starting internal order for cluster.
      * @param topVer Current topology version.
      * @param gridStartTime Cluster start time.
      * @param evts Events history.
      */
-    ZkDiscoveryEventsData(
+    private ZkDiscoveryEventsData(
+        UUID clusterId,
         long startInternalOrder,
         long gridStartTime,
         long topVer,
         TreeMap<Long, ZkDiscoveryEventData> evts)
     {
+        this.clusterId = clusterId;
         this.startInternalOrder = startInternalOrder;
         this.gridStartTime = gridStartTime;
         this.topVer = topVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/930a5179/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 8f35a8e..1c8706e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -201,7 +201,7 @@ class ZkIgnitePaths {
      * @param path Alive node zk path.
      * @return {@code True} if node is client.
      */
-    static boolean aliveClientNode(String path) {
+    static boolean aliveNodeClientFlag(String path) {
         return (aliveFlags(path) & CLIENT_NODE_FLAG_MASK) != 0;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/930a5179/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
new file mode 100644
index 0000000..dcbd205
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkNoServersMessage implements DiscoverySpiCustomMessage, 
ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkNoServersMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/930a5179/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 9cd55d4..df0bb43 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -534,7 +534,7 @@ public class ZookeeperClient implements Watcher {
      * @throws InterruptedException If interrupted.
      */
     void setData(String path, byte[] data, int ver)
-        throws ZookeeperClientFailedException, InterruptedException, 
KeeperException.NoNodeException
+        throws ZookeeperClientFailedException, InterruptedException, 
KeeperException.NoNodeException, KeeperException.BadVersionException
     {
         if (data == null)
             data = EMPTY_BYTES;
@@ -550,6 +550,9 @@ public class ZookeeperClient implements Watcher {
             catch (KeeperException.NoNodeException e) {
                 throw e;
             }
+            catch (KeeperException.BadVersionException e) {
+                throw e;
+            }
             catch (Exception e) {
                 onZookeeperError(connStartTime, e);
             }
@@ -558,19 +561,19 @@ public class ZookeeperClient implements Watcher {
 
     /**
      * @param path Path.
+     * @param stat Optional {@link Stat} instance to return znode state.
      * @return Data.
      * @throws KeeperException.NoNodeException If target node does not exist.
      * @throws ZookeeperClientFailedException If connection to zk was lost.
      * @throws InterruptedException If interrupted.
      */
-    byte[] getData(String path)
-        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
-    {
+    byte[] getData(String path, @Nullable Stat stat)
+        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException {
         for (;;) {
             long connStartTime = this.connStartTime;
 
             try {
-                return zk.getData(path, false, null);
+                return zk.getData(path, false, stat);
             }
             catch (KeeperException.NoNodeException e) {
                 throw e;
@@ -583,6 +586,19 @@ public class ZookeeperClient implements Watcher {
 
     /**
      * @param path Path.
+     * @return Data.
+     * @throws KeeperException.NoNodeException If target node does not exist.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    byte[] getData(String path)
+        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
+    {
+        return getData(path, null);
+    }
+
+    /**
+     * @param path Path.
      */
     void deleteIfExistsAsync(String path) {
         new DeleteIfExistsOperation(path).execute();

http://git-wip-us.apache.org/repos/asf/ignite/blob/930a5179/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 30bd750..1d3ad01 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -1209,19 +1209,18 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
-     * @param rc Callback result code.
      * @param aliveNodes Alive nodes.
      * @throws Exception If failed.
      */
-    private void checkIsCoordinator(int rc, final List<String> aliveNodes) 
throws Exception {
-        assert rc == 0 : KeeperException.Code.get(rc);
+    private void checkIsCoordinator(final List<String> aliveNodes) throws 
Exception {
+        assert !locNode.isClient();
 
         TreeMap<Long, String> aliveSrvs = new TreeMap<>();
 
         long locInternalOrder = rtState.internalOrder;
 
         for (String aliveNodePath : aliveNodes) {
-            if (ZkIgnitePaths.aliveClientNode(aliveNodePath))
+            if (ZkIgnitePaths.aliveNodeClientFlag(aliveNodePath))
                 continue;
 
             Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
@@ -1245,30 +1244,137 @@ public class ZookeeperDiscoveryImpl {
             if (log.isInfoEnabled()) {
                 log.info("Discovery coordinator already exists, watch for 
previous server node [" +
                     "locId=" + locNode.id() +
-                    ", prevPath=" + prevE.getValue() + ']');
+                    ", watchPath=" + prevE.getValue() + ']');
              }
 
-            PreviousNodeWatcher watcher = new PreviousNodeWatcher(rtState);
+            PreviousNodeWatcher watcher = new 
ServerPreviousNodeWatcher(rtState);
 
             rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + 
prevE.getValue(), watcher, watcher);
         }
     }
 
     /**
-     *
+     * @param aliveNodes Alive nodes.
+     * @throws Exception If failed.
      */
-    private void onPreviousNodeFail() {
-        // TODO ZK:
-//        if (locInternalId == crdInternalId + 1) {
-//            if (log.isInfoEnabled())
-//                log.info("Previous discovery coordinator failed [locId=" + 
locNode.id() + ']');
-//
-//            onBecomeCoordinator(aliveNodes, locInternalId);
-//        }
-        if (log.isInfoEnabled())
-            log.info("Previous node failed, check is node new coordinator 
[locId=" + locNode.id() + ']');
+    private void checkClientsStatus(final List<String> aliveNodes) throws 
Exception {
+        assert locNode.isClient();
+        assert rtState.joined;
+        assert rtState.evtsData != null;
+
+        TreeMap<Long, String> aliveClients = new TreeMap<>();
+
+        String srvPath = null;
+        Long srvInternalOrder = null;
+
+        long locInternalOrder = rtState.internalOrder;
+
+        for (String aliveNodePath : aliveNodes) {
+            Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
+
+            if (ZkIgnitePaths.aliveNodeClientFlag(aliveNodePath))
+                aliveClients.put(internalId, aliveNodePath);
+            else {
+                if (srvInternalOrder == null || internalId < srvInternalOrder) 
{
+                    srvPath = aliveNodePath;
+                    srvInternalOrder = internalId;
+                }
+            }
+        }
+
+        assert !aliveClients.isEmpty();
+
+        Map.Entry<Long, String> oldest = aliveClients.firstEntry();
+
+        boolean oldestClient = locInternalOrder == oldest.getKey();
+
+        if (srvPath == null) {
+            if (oldestClient) {
+                Stat stat = new Stat();
+
+                ZkDiscoveryEventsData prevEvts = rtState.evtsData;
+
+                ZkDiscoveryEventsData newEvts;
+
+                byte[] evtsBytes = rtState.zkClient.getData(zkPaths.evtsPath, 
stat);
+
+                if (evtsBytes.length == 0) {
+                    // Possible if new cluster already started and removed old 
events,
+                    // still can try generate {@link ZkNoServersMessage}.
+                    newEvts = rtState.evtsData;
+                }
+                else
+                    newEvts = unmarshalZip(evtsBytes);
+
+                if (prevEvts.clusterId.equals(newEvts.clusterId)) {
+                    U.warn(log, "All server nodes failed, notify all 
clients.");
+
+                    generateNoServersEvent(newEvts, stat);
+                }
+            }
+        }
+        else {
+            String watchPath;
+
+            if (oldestClient) {
+                watchPath = srvPath;
+
+                if (log.isInfoEnabled()) {
+                    log.info("Servers exists, watch for server node [locId=" + 
locNode.id() +
+                        ", watchPath=" + watchPath + ']');
+                }
+            }
+            else {
+                assert aliveClients.size() > 1 : aliveClients;
+
+                Map.Entry<Long, String> prevE = 
aliveClients.floorEntry(locInternalOrder - 1);
+
+                assert prevE != null;
+
+                watchPath = prevE.getValue();
+
+                if (log.isInfoEnabled()) {
+                    log.info("Servers exists, watch for previous node [locId=" 
+ locNode.id() +
+                        ", watchPath=" + watchPath + ']');
+                }
+            }
+
+            PreviousNodeWatcher watcher = new 
ClientPreviousNodeWatcher(rtState);
+
+            rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + 
watchPath, watcher, watcher);
+        }
+    }
+
+    /**
+     * @param evtsData Events data.
+     * @param evtsStat Events zookeeper state.
+     * @throws Exception If failed.
+     */
+    private void generateNoServersEvent(ZkDiscoveryEventsData evtsData, Stat 
evtsStat) throws Exception {
+        evtsData.evtIdGen++;
 
-        rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
CheckCoordinatorCallback(rtState));
+        ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
+            evtsData.evtIdGen,
+            evtsData.topVer,
+            locNode.id(),
+            new ZkNoServersMessage(),
+            null,
+            false);
+
+        Collection<ZookeeperClusterNode> nodesToAck = Collections.emptyList();
+
+        evtsData.addEvent(nodesToAck, evtData);
+
+        byte[] newEvtsBytes = marshalZip(evtsData);
+
+        try {
+            rtState.zkClient.setData(zkPaths.evtsPath, newEvtsBytes, 
evtsStat.getVersion());
+        }
+        catch (KeeperException.BadVersionException e) {
+            // Version can change if new cluster started and saved new events.
+            if (log.isDebugEnabled())
+                log.debug("Failed to save no servers message");
+        }
     }
 
     /**
@@ -1395,8 +1501,15 @@ public class ZookeeperDiscoveryImpl {
         for (String child : aliveNodes) {
             Long internalId = ZkIgnitePaths.aliveInternalId(child);
 
-            if (internalId < rtState.evtsData.startInternalOrder)
+            if (internalId < rtState.evtsData.startInternalOrder) {
+                if (log.isInfoEnabled()) {
+                    LT.info(log, "Ignore node from previous cluster 
[startOrder=" + rtState.evtsData.startInternalOrder +
+                        ", nodeOrder=" + internalId +
+                        ", znode=" + child + ']');
+                }
+
                 continue;
+            }
 
             Object old = alives.put(internalId, child);
 
@@ -1924,23 +2037,20 @@ public class ZookeeperDiscoveryImpl {
      */
     @SuppressWarnings("unchecked")
     private void newClusterStarted(@Nullable ZkDiscoveryEventsData prevEvts) 
throws Exception {
-        assert prevEvts == null || prevEvts.maxInternalOrder < 
locNode.internalId();
+        long locInternalId = rtState.internalOrder;
+
+        assert prevEvts == null || prevEvts.maxInternalOrder < locInternalId;
 
         spi.getSpiContext().removeTimeoutObject(rtState.joinErrTimeoutObj);
 
         cleanupPreviousClusterData();
 
-        long locInternalId = rtState.internalOrder;
-
         rtState.joined = true;
+        rtState.gridStartTime = System.currentTimeMillis();
 
-        rtState.gridStartTime = U.currentTimeMillis();
-
-        rtState.evtsData = new ZkDiscoveryEventsData(
-            prevEvts != null ? prevEvts.maxInternalOrder + 1 : locInternalId,
-            rtState.gridStartTime,
-            1L,
-            new TreeMap<Long, ZkDiscoveryEventData>());
+        rtState.evtsData = ZkDiscoveryEventsData.createForNewCluster(
+            prevEvts != null ? prevEvts.maxInternalOrder + 1 : -1L,
+            rtState.gridStartTime);
 
         locNode.internalId(locInternalId);
         locNode.order(1);
@@ -2229,7 +2339,7 @@ public class ZookeeperDiscoveryImpl {
 
         ZkDiscoveryEventsData newEvts = unmarshalZip(data);
 
-        // Need keep processed custom events since they contains message 
object.
+        // Need keep processed custom events since they contain message object 
which is needed to create ack.
         if (rtState.evtsData != null) {
             for (Map.Entry<Long, ZkDiscoveryEventData> e : 
rtState.evtsData.evts.entrySet()) {
                 ZkDiscoveryEventData evtData = e.getValue();
@@ -2246,7 +2356,8 @@ public class ZookeeperDiscoveryImpl {
 
         processNewEvents(newEvts);
 
-        rtState.evtsData = newEvts;
+        if (rtState.joined)
+            rtState.evtsData = newEvts;
 
         return newEvts;
     }
@@ -2257,6 +2368,13 @@ public class ZookeeperDiscoveryImpl {
      */
     @SuppressWarnings("unchecked")
     private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws 
Exception {
+        if (rtState.joined && rtState.evtsData != null && 
!rtState.evtsData.clusterId.equals(evtsData.clusterId)) {
+            assert locNode.isClient() : locNode;
+
+            throw localNodeFail("All server nodes failed, client node 
disconnected (received events from new custer) " +
+                "[locId=" + locNode.id() + ']', true);
+        }
+
         TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts;
 
         ZookeeperClient zkClient = rtState.zkClient;
@@ -2504,6 +2622,9 @@ public class ZookeeperDiscoveryImpl {
         joinFut.onDone();
 
         deleteDataForJoinedAsync(evtData);
+
+        if (locNode.isClient())
+            rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
CheckClientsStatusCallback(rtState));
     }
 
     /**
@@ -2523,6 +2644,19 @@ public class ZookeeperDiscoveryImpl {
             processCommunicationErrorResolveFinishMessage(
                 (ZkCommunicationErrorResolveFinishMessage)msg);
         }
+        else if (msg instanceof ZkNoServersMessage)
+            processNoServersMessage((ZkNoServersMessage)msg);
+    }
+
+    /**
+     * @param msg Message.
+     * @throws Exception If failed.
+     */
+    private void processNoServersMessage(ZkNoServersMessage msg) throws 
Exception {
+        assert locNode.isClient() : locNode;
+
+        throw localNodeFail("All server nodes failed, client node disconnected 
" +
+            "(received 'no-servers' message ) [locId=" + locNode.id() + ']', 
true);
     }
 
     /**
@@ -3654,7 +3788,7 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    private class PreviousNodeWatcher extends ZkAbstractWatcher implements 
AsyncCallback.StatCallback {
+    private abstract class PreviousNodeWatcher extends ZkAbstractWatcher 
implements AsyncCallback.StatCallback {
         /**
          * @param rtState Runtime state.
          */
@@ -3689,6 +3823,62 @@ public class ZookeeperDiscoveryImpl {
                 onProcessError(e);
             }
         }
+
+        /**
+         *
+         */
+        abstract void onPreviousNodeFail();
+    }
+
+    /**
+     *
+     */
+    private class ServerPreviousNodeWatcher extends PreviousNodeWatcher {
+        /**
+         * @param rtState Runtime state.
+         */
+        ServerPreviousNodeWatcher(ZkRuntimeState rtState) {
+            super(rtState);
+
+            assert !locNode.isClient() : locNode;
+        }
+
+        /** {@inheritDoc} */
+        @Override void onPreviousNodeFail() {
+            // TODO ZK:
+//        if (locInternalId == crdInternalId + 1) {
+//            if (log.isInfoEnabled())
+//                log.info("Previous discovery coordinator failed [locId=" + 
locNode.id() + ']');
+//
+//            onBecomeCoordinator(aliveNodes, locInternalId);
+//        }
+            if (log.isInfoEnabled())
+                log.info("Previous server node failed, check is node new 
coordinator [locId=" + locNode.id() + ']');
+
+            rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
CheckCoordinatorCallback(rtState));
+        }
+    }
+
+    /**
+     *
+     */
+    private class ClientPreviousNodeWatcher extends PreviousNodeWatcher {
+        /**
+         * @param rtState Runtime state.
+         */
+        ClientPreviousNodeWatcher(ZkRuntimeState rtState) {
+            super(rtState);
+
+            assert locNode.isClient() : locNode;
+        }
+
+        /** {@inheritDoc} */
+        @Override void onPreviousNodeFail() {
+            if (log.isInfoEnabled())
+                log.info("Watched node failed, check if there are alive 
servers [locId=" + locNode.id() + ']');
+
+            rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
CheckClientsStatusCallback(rtState));
+        }
     }
 
     /**
@@ -3703,10 +3893,33 @@ public class ZookeeperDiscoveryImpl {
         }
 
         /** {@inheritDoc} */
-        @Override public void processResult0(int rc, String path, Object ctx, 
List<String> children, Stat stat) throws Exception {
+        @Override public void processResult0(int rc, String path, Object ctx, 
List<String> children, Stat stat)
+            throws Exception
+        {
+            assert rc == 0 : KeeperException.Code.get(rc);
+
+            checkIsCoordinator(children);
+        }
+    }
+
+    /**
+     *
+     */
+    class CheckClientsStatusCallback extends ZkAbstractChildrenCallback {
+        /**
+         * @param rtState Runtime state.
+         */
+        CheckClientsStatusCallback(ZkRuntimeState rtState) {
+            super(rtState, ZookeeperDiscoveryImpl.this);
+        }
+
+        /** {@inheritDoc} */
+        @Override void processResult0(int rc, String path, Object ctx, 
List<String> children, Stat stat)
+            throws Exception
+        {
             assert rc == 0 : KeeperException.Code.get(rc);
 
-            checkIsCoordinator(rc, children);
+            checkClientsStatus(children);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/930a5179/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 2a4fa76..2dd690d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -256,7 +256,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
                 try {
                     DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt;
 
-                    UUID locId = ignite.cluster().localNode().id();
+                    UUID locId = 
((IgniteKernal)ignite).context().localNodeId();
 
                     Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId);
 
@@ -281,7 +281,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
                 catch (Throwable e) {
                     err = true;
 
-                    info("Unexpected error: " + e);
+                    error("Unexpected error [evt=" + evt + ", err=" + e + ']', 
e);
                 }
 
                 return true;
@@ -2572,8 +2572,12 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
             }
         });
 
+        U.sleep(3000);
+
         waitSpi(getTestIgniteInstanceName(0));
 
+        client = false;
+
         startGrid(1);
 
         fut.get();
@@ -2584,6 +2588,99 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testDisconnectOnServersLeft_1() throws Exception {
+        disconnectOnServersLeft(1, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDisconnectOnServersLeft_2() throws Exception {
+        disconnectOnServersLeft(5, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDisconnectOnServersLeft_3() throws Exception {
+        disconnectOnServersLeft(1, 10);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDisconnectOnServersLeft_4() throws Exception {
+        disconnectOnServersLeft(5, 10);
+    }
+
+    /**
+     * @param srvs Number of servers.
+     * @param clients Number of clients.
+     * @throws Exception If failed.
+     */
+    private void disconnectOnServersLeft(int srvs, int clients) throws 
Exception {
+        startGridsMultiThreaded(srvs);
+
+        client = true;
+
+        startGridsMultiThreaded(srvs, clients);
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(clients);
+        final CountDownLatch reconnectLatch = new CountDownLatch(clients);
+
+        IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    log.info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    log.info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        };
+
+        for (int i = 0; i < clients; i++) {
+            Ignite client = ignite(srvs + i);
+
+            assertTrue(client.configuration().isClientMode());
+
+            client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, 
EVT_CLIENT_NODE_RECONNECTED);
+        }
+
+        log.info("Stop all servers.");
+
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            @Override public void apply(Integer threadIdx) {
+                stopGrid(getTestIgniteInstanceName(threadIdx), true, false);
+            }
+        }, srvs, "stop-server");
+
+        waitReconnectEvent(log, disconnectLatch);
+
+        evts.clear();
+
+        client = false;
+
+        log.info("Restart servers.");
+
+        startGridsMultiThreaded(0, srvs);
+
+        waitReconnectEvent(log, reconnectLatch);
+
+        waitForTopology(srvs + clients);
+
+        log.info("Reconnect finished.");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testStartNoZk() throws Exception {
         stopZkCluster();
 

Reply via email to