This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d97857  IGNITE-13577 Graceful node shutdown for Zookeeper Discovery 
SPI - Fixes #8371.
6d97857 is described below

commit 6d9785706e4a7ca0edeccc32dc6fdf34f9143956
Author: Ivan Daschinskiy <ivanda...@gmail.com>
AuthorDate: Mon Nov 2 17:55:13 2020 +0300

    IGNITE-13577 Graceful node shutdown for Zookeeper Discovery SPI - Fixes 
#8371.
    
    Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com>
---
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java    |   2 +-
 ...stractCallabck.java => ZkAbstractCallback.java} |   6 +-
 .../zk/internal/ZkAbstractChildrenCallback.java    |   2 +-
 .../discovery/zk/internal/ZkAbstractWatcher.java   |   2 +-
 .../zk/internal/ZkDiscoveryEventData.java          |   4 +-
 ...ata.java => ZkDiscoveryNodeLeaveEventData.java} |  45 +++-
 .../spi/discovery/zk/internal/ZkIgnitePaths.java   |  57 ++++-
 .../spi/discovery/zk/internal/ZkRunnable.java      |   2 +-
 .../spi/discovery/zk/internal/ZookeeperClient.java |  26 ++
 .../zk/internal/ZookeeperDiscoveryImpl.java        | 124 +++++++---
 .../zk/internal/ZookeeperDiscoveryStatistics.java  |  32 ++-
 .../zk/ZookeeperDiscoverySpiTestSuite1.java        |   2 +
 ...erDiscoveryConcurrentStartAndStartStopTest.java |   6 +-
 .../zk/internal/ZookeeperDiscoveryMiscTest.java    |   2 +
 ...perDiscoveryRandomStopOrFailConcurrentTest.java | 264 +++++++++++++++++++++
 ...coverySegmentationAndConnectionRestoreTest.java |   4 +-
 .../zk/internal/ZookeeperDiscoverySpiTestBase.java |  90 +++++++
 .../internal/ZookeeperDiscoverySpiTestHelper.java  |   6 +-
 ...perDiscoveryTopologyChangeAndReconnectTest.java |  90 -------
 .../zookeeper/ZkTestClientCnxnSocketNIO.java       |   3 +-
 20 files changed, 595 insertions(+), 174 deletions(-)

diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 5cdfa58..3de8df6 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -595,7 +595,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDis
 
         /** {@inheritDoc} */
         @Override public long getNodesLeft() {
-            return 0;
+            return stats.leftNodesCnt();
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallback.java
similarity index 92%
rename from 
modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
rename to 
modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallback.java
index b80a9dd..427a81c 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallback.java
@@ -22,12 +22,12 @@ import org.apache.ignite.internal.util.GridSpinBusyLock;
 /**
  *
  */
-abstract class ZkAbstractCallabck {
+abstract class ZkAbstractCallback {
     /** */
     final ZkRuntimeState rtState;
 
     /** */
-    private final ZookeeperDiscoveryImpl impl;
+    final ZookeeperDiscoveryImpl impl;
 
     /** */
     private final GridSpinBusyLock busyLock;
@@ -36,7 +36,7 @@ abstract class ZkAbstractCallabck {
      * @param rtState Runtime state.
      * @param impl Discovery impl.
      */
-    ZkAbstractCallabck(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+    ZkAbstractCallback(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
         this.rtState = rtState;
         this.impl = impl;
 
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
index 2292e35..dc680f3 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
@@ -24,7 +24,7 @@ import org.apache.zookeeper.data.Stat;
 /**
  *
  */
-abstract class ZkAbstractChildrenCallback extends ZkAbstractCallabck 
implements AsyncCallback.Children2Callback {
+abstract class ZkAbstractChildrenCallback extends ZkAbstractCallback 
implements AsyncCallback.Children2Callback {
     /**
      * @param rtState Runtime state.
      * @param impl Discovery impl.
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
index 9098d05..37e65e5 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
@@ -23,7 +23,7 @@ import org.apache.zookeeper.Watcher;
 /**
  *
  */
-abstract class ZkAbstractWatcher extends ZkAbstractCallabck implements Watcher 
{
+abstract class ZkAbstractWatcher extends ZkAbstractCallback implements Watcher 
{
     /**
      * @param rtState Runtime state.
      * @param impl Discovery impl.
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
index d667a17..2bc49e5 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
@@ -30,7 +30,7 @@ abstract class ZkDiscoveryEventData implements Serializable {
     static final byte ZK_EVT_NODE_JOIN = 1;
 
     /** */
-    static final byte ZK_EVT_NODE_FAILED = 2;
+    static final byte ZK_EVT_NODE_LEFT = 2;
 
     /** */
     static final byte ZK_EVT_CUSTOM_EVT = 3;
@@ -59,7 +59,7 @@ abstract class ZkDiscoveryEventData implements Serializable {
      * @param topVer Topology version.
      */
     ZkDiscoveryEventData(long evtId, byte evtType, long topVer) {
-        assert evtType == ZK_EVT_NODE_JOIN || evtType == ZK_EVT_NODE_FAILED || 
evtType == ZK_EVT_CUSTOM_EVT : evtType;
+        assert evtType == ZK_EVT_NODE_JOIN || evtType == ZK_EVT_NODE_LEFT || 
evtType == ZK_EVT_CUSTOM_EVT : evtType;
 
         this.evtId = evtId;
         this.evtType = evtType;
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeLeaveEventData.java
similarity index 53%
rename from 
modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
rename to 
modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeLeaveEventData.java
index c76158f..77d1157 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeLeaveEventData.java
@@ -20,36 +20,59 @@ package org.apache.ignite.spi.discovery.zk.internal;
 /**
  *
  */
-class ZkDiscoveryNodeFailEventData extends ZkDiscoveryEventData {
+class ZkDiscoveryNodeLeaveEventData extends ZkDiscoveryEventData {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** */
-    private long failedNodeInternalId;
+    private final long leftNodeInternalId;
+
+    /** */
+    private final boolean failed;
 
     /**
      * @param evtId Event ID.
      * @param topVer Topology version.
-     * @param failedNodeInternalId Failed node ID.
+     * @param leftNodeInternalId Failed node ID.
      */
-    ZkDiscoveryNodeFailEventData(long evtId, long topVer, long 
failedNodeInternalId) {
-        super(evtId, ZK_EVT_NODE_FAILED, topVer);
+    ZkDiscoveryNodeLeaveEventData(long evtId, long topVer, long 
leftNodeInternalId) {
+       this(evtId, topVer, leftNodeInternalId, false);
+    }
 
-        this.failedNodeInternalId = failedNodeInternalId;
+    /**
+     * @param evtId Event ID.
+     * @param topVer Topology version.
+     * @param leftNodeInternalId Left node ID.
+     */
+    ZkDiscoveryNodeLeaveEventData(long evtId, long topVer, long 
leftNodeInternalId, boolean failed) {
+        super(evtId, ZK_EVT_NODE_LEFT, topVer);
+
+        this.leftNodeInternalId = leftNodeInternalId;
+
+        this.failed = failed;
+    }
+
+    /**
+     * @return Left node ID.
+     */
+    long leftNodeInternalId() {
+        return leftNodeInternalId;
     }
 
     /**
-     * @return Failed node ID.
+     *
+     * @return {@code true} if failed.
      */
-    long failedNodeInternalId() {
-        return failedNodeInternalId;
+    boolean failed() {
+        return failed;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return "ZkDiscoveryNodeFailEventData [" +
+        return "ZkDiscoveryNodeLeaveEventData [" +
             "evtId=" + eventId() +
             ", topVer=" + topologyVersion() +
-            ", nodeId=" + failedNodeInternalId + ']';
+            ", nodeId=" + leftNodeInternalId +
+            ", failed=" + failed + ']';
     }
 }
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 4e54254..02e9d36 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -44,6 +44,9 @@ public class ZkIgnitePaths {
     /** Directory to store acknowledge messages for custom events. */
     private static final String CUSTOM_EVTS_ACKS_DIR = "ca";
 
+    /** Directory to store node's stopped flags. */
+    private static final String STOPPED_NODES_FLAGS_DIR = "sf";
+
     /** Directory to store EPHEMERAL znodes for alive cluster nodes. */
     static final String ALIVE_NODES_DIR = "n";
 
@@ -71,6 +74,9 @@ public class ZkIgnitePaths {
     /** */
     final String customEvtsAcksDir;
 
+    /** */
+    final String stoppedNodesFlagsDir;
+
     /**
      * @param zkRootPath Base Zookeeper directory for all Ignite nodes.
      */
@@ -83,6 +89,7 @@ public class ZkIgnitePaths {
         customEvtsDir = zkPath(CUSTOM_EVTS_DIR);
         customEvtsPartsDir = zkPath(CUSTOM_EVTS_PARTS_DIR);
         customEvtsAcksDir = zkPath(CUSTOM_EVTS_ACKS_DIR);
+        stoppedNodesFlagsDir = zkPath(STOPPED_NODES_FLAGS_DIR);
     }
 
     /**
@@ -90,7 +97,7 @@ public class ZkIgnitePaths {
      * @return Full path.
      */
     private String zkPath(String path) {
-        return clusterDir + "/" + path;
+        return join(clusterDir, path);
     }
 
     /**
@@ -99,7 +106,7 @@ public class ZkIgnitePaths {
      * @return Path.
      */
     String joiningNodeDataPath(UUID nodeId, UUID prefixId) {
-        return joinDataDir + '/' + prefixId + ":" + nodeId.toString();
+        return join(joinDataDir, prefixId + ":" + nodeId.toString());
     }
 
     /**
@@ -109,7 +116,7 @@ public class ZkIgnitePaths {
     static long aliveInternalId(String path) {
         int idx = path.lastIndexOf('|');
 
-        return Integer.parseInt(path.substring(idx + 1));
+        return Long.parseLong(path.substring(idx + 1));
     }
 
     /**
@@ -123,7 +130,7 @@ public class ZkIgnitePaths {
         if (node.isClient())
             flags |= CLIENT_NODE_FLAG_MASK;
 
-        return aliveNodesDir + "/" + prefix + ":" + node.id() + ":" + 
encodeFlags(flags) + "|";
+        return join(aliveNodesDir, prefix + ":" + node.id() + ":" + 
encodeFlags(flags) + "|");
     }
 
     /**
@@ -156,6 +163,26 @@ public class ZkIgnitePaths {
     }
 
     /**
+     * @param node Leaving node.
+     * @return Stopped node path.
+     */
+    String nodeStoppedFlag(ZookeeperClusterNode node) {
+        String path = node.id().toString() + '|' + node.internalId();
+
+        return join(stoppedNodesFlagsDir, path);
+    }
+
+    /**
+     * @param path Leaving flag path.
+     * @return Stopped node internal id.
+     */
+    static long stoppedFlagNodeInternalId(String path) {
+        int idx = path.lastIndexOf('|');
+
+        return Long.parseLong(path.substring(idx + 1));
+    }
+
+    /**
      * @param path Event zk path.
      * @return Event sequence number.
      */
@@ -212,7 +239,7 @@ public class ZkIgnitePaths {
      * @return Path.
      */
     String createCustomEventPath(String prefix, UUID nodeId, int partCnt) {
-        return customEvtsDir + "/" + prefix + ":" + nodeId + ":" + 
String.format("%04d", partCnt) + '|';
+        return join(customEvtsDir, prefix + ":" + nodeId + ":" + 
String.format("%04d", partCnt) + '|');
     }
 
     /**
@@ -221,7 +248,7 @@ public class ZkIgnitePaths {
      * @return Path.
      */
     String customEventPartsBasePath(String prefix, UUID nodeId) {
-        return customEvtsPartsDir + "/" + prefix + ":" + nodeId + ":";
+        return join(customEvtsPartsDir, prefix + ":" + nodeId + ":");
     }
 
     /**
@@ -239,7 +266,7 @@ public class ZkIgnitePaths {
      * @return Event zk path.
      */
     String joinEventDataPathForJoined(long evtId) {
-        return evtsPath + "/fj-" + evtId;
+        return join(evtsPath,"fj-" + evtId);
     }
 
     /**
@@ -247,7 +274,7 @@ public class ZkIgnitePaths {
      * @return Event zk path.
      */
     String joinEventSecuritySubjectPath(long topVer) {
-        return evtsPath + "/s-" + topVer;
+        return join(evtsPath, "s-" + topVer);
     }
 
     /**
@@ -257,7 +284,7 @@ public class ZkIgnitePaths {
     String ackEventDataPath(long origEvtId) {
         assert origEvtId != 0;
 
-        return customEvtsAcksDir + "/" + String.valueOf(origEvtId);
+        return join(customEvtsAcksDir, String.valueOf(origEvtId));
     }
 
     /**
@@ -265,7 +292,7 @@ public class ZkIgnitePaths {
      * @return Future path.
      */
     String distributedFutureBasePath(UUID id) {
-        return evtsPath + "/f-" + id;
+        return join(evtsPath, "f-" + id);
     }
 
     /**
@@ -273,7 +300,7 @@ public class ZkIgnitePaths {
      * @return Future path.
      */
     String distributedFutureResultPath(UUID id) {
-        return evtsPath + "/fr-" + id;
+        return join(evtsPath, "fr-" + id);
     }
 
     /**
@@ -306,6 +333,14 @@ public class ZkIgnitePaths {
     }
 
     /**
+     * @param paths Paths to join.
+     * @return Paths joined with separator.
+     */
+    public static String join(String... paths) {
+        return String.join(PATH_SEPARATOR, paths);
+    }
+
+    /**
      * Validate the provided znode path string.
      *
      * @param path znode path string.
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
index 965bdc0..1be63e0 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
@@ -20,7 +20,7 @@ package org.apache.ignite.spi.discovery.zk.internal;
 /**
  * Zk Runnable.
  */
-public abstract class ZkRunnable extends ZkAbstractCallabck implements 
Runnable {
+public abstract class ZkRunnable extends ZkAbstractCallback implements 
Runnable {
     /**
      * @param rtState Runtime state.
      * @param impl Discovery impl.
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 7e1bb9a..e98bc01 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -442,6 +442,32 @@ public class ZookeeperClient implements Watcher {
     }
 
     /**
+     * @param path Path.
+     * @param data Data.
+     * @param createMode Create mode.
+     * @return Created path.
+     * @throws KeeperException In case of zookeeper error.
+     * @throws InterruptedException If interrupted.
+     */
+    String createIfNeededNoRetry(String path, byte[] data, CreateMode 
createMode)
+        throws KeeperException, InterruptedException {
+        assert !createMode.isSequential() : createMode;
+
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        try {
+            return zk.create(path, data, ZK_ACL, createMode);
+        }
+        catch (KeeperException.NodeExistsException e) {
+            if (log.isDebugEnabled())
+                log.debug("Node already exists: " + path);
+
+            return path;
+        }
+    }
+
+    /**
      * @param checkPrefix Unique prefix to check in case of retry.
      * @param parentPath Parent node path.
      * @param path Node to create.
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index e9196f2..d9d56ae 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -101,6 +101,7 @@ import static 
org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS;
@@ -854,7 +855,8 @@ public class ZookeeperDiscoveryImpl {
                 zkPaths.customEvtsDir,
                 zkPaths.customEvtsPartsDir,
                 zkPaths.customEvtsAcksDir,
-                zkPaths.aliveNodesDir};
+                zkPaths.aliveNodesDir,
+                zkPaths.stoppedNodesFlagsDir};
 
             List<String> dirs = new ArrayList<>();
 
@@ -1009,7 +1011,7 @@ public class ZookeeperDiscoveryImpl {
             final int OVERHEAD = 5;
 
             // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8193
-            String joinDataPath = zkPaths.joinDataDir + "/" + prefix + ":" + 
locNode.id();
+            String joinDataPath = ZkIgnitePaths.join(zkPaths.joinDataDir, 
prefix + ":" + locNode.id());
 
             if (zkClient.needSplitNodeData(joinDataPath, joinDataBytes, 
OVERHEAD)) {
                 List<byte[]> parts = zkClient.splitNodeData(joinDataPath, 
joinDataBytes, OVERHEAD);
@@ -1379,7 +1381,7 @@ public class ZookeeperDiscoveryImpl {
 
             PreviousNodeWatcher watcher = new 
ServerPreviousNodeWatcher(rtState);
 
-            rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + 
prevE.getValue(), watcher, watcher);
+            
rtState.zkClient.existsAsync(ZkIgnitePaths.join(zkPaths.aliveNodesDir, 
prevE.getValue()), watcher, watcher);
         }
     }
 
@@ -1478,7 +1480,7 @@ public class ZookeeperDiscoveryImpl {
 
             PreviousNodeWatcher watcher = new 
ClientPreviousNodeWatcher(rtState);
 
-            rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + 
watchPath, watcher, watcher);
+            
rtState.zkClient.existsAsync(ZkIgnitePaths.join(zkPaths.aliveNodesDir, 
watchPath), watcher, watcher);
         }
     }
 
@@ -1512,6 +1514,16 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void previousCoordinatorCleanup(ZkDiscoveryEventsData lastEvts) 
throws Exception {
+        for (String stoppedFlagPath : 
rtState.zkClient.getChildren(zkPaths.stoppedNodesFlagsDir)) {
+            long leftIntId = 
ZkIgnitePaths.stoppedFlagNodeInternalId(stoppedFlagPath);
+
+            if (!rtState.top.nodesByInternalId.containsKey(leftIntId)) {
+                rtState.zkClient.deleteIfExistsAsync(
+                    ZkIgnitePaths.join(zkPaths.stoppedNodesFlagsDir, 
stoppedFlagPath)
+                );
+            }
+        }
+
         for (ZkDiscoveryEventData evtData : lastEvts.evts.values()) {
             if (evtData instanceof ZkDiscoveryCustomEventData) {
                 ZkDiscoveryCustomEventData evtData0 = 
(ZkDiscoveryCustomEventData)evtData;
@@ -1620,7 +1632,7 @@ public class ZookeeperDiscoveryImpl {
     private void watchAliveNodeData(String alivePath) {
         assert rtState.locNodeZkPath != null;
 
-        String path = zkPaths.aliveNodesDir + "/" + alivePath;
+        String path = ZkIgnitePaths.join(zkPaths.aliveNodesDir, alivePath);
 
         if (!path.equals(rtState.locNodeZkPath))
             rtState.zkClient.getDataAsync(path, rtState.aliveNodeDataWatcher, 
rtState.aliveNodeDataWatcher);
@@ -1642,6 +1654,11 @@ public class ZookeeperDiscoveryImpl {
             rtState.updateAlives = false;
         }
 
+        Set<Long> stoppedNodes = new HashSet<>();
+
+        for (String stoppedFlagPath : 
rtState.zkClient.getChildren(zkPaths.stoppedNodesFlagsDir))
+            
stoppedNodes.add(ZkIgnitePaths.stoppedFlagNodeInternalId(stoppedFlagPath));
+
         TreeMap<Long, String> alives = new TreeMap<>();
 
         for (String child : aliveNodes) {
@@ -1670,7 +1687,7 @@ public class ZookeeperDiscoveryImpl {
 
                 failedNodes.add(failedNode);
 
-                generateNodeFail(curTop, failedNode);
+                generateNodeLeave(curTop, failedNode, 
!stoppedNodes.contains(failedNode.internalId()));
 
                 newEvts++;
 
@@ -2031,15 +2048,11 @@ public class ZookeeperDiscoveryImpl {
             String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, 
prefixId);
 
             client.setData(joinDataPath, marshalZip(joinErr), -1);
-
-            client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, 
-1);
         }
-        else {
-            if (log.isInfoEnabled())
+        else if (log.isInfoEnabled())
                 log.info("Ignore join data, node was failed by previous 
coordinator: " + aliveNodePath);
 
-            client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, 
-1);
-        }
+        client.deleteIfExists(ZkIgnitePaths.join(zkPaths.aliveNodesDir, 
aliveNodePath), -1);
     }
 
     /**
@@ -2180,25 +2193,35 @@ public class ZookeeperDiscoveryImpl {
 
     /**
      * @param curTop Current topology.
-     * @param failedNode Failed node.
+     * @param leftNode Failed node.
+     * @param failed Whether node failed or stopped.
      */
-    private void generateNodeFail(TreeMap<Long, ZookeeperClusterNode> curTop, 
ZookeeperClusterNode failedNode) {
-        Object rmvd = curTop.remove(failedNode.order());
+    private void generateNodeLeave(
+        TreeMap<Long, ZookeeperClusterNode> curTop,
+        ZookeeperClusterNode leftNode,
+        boolean failed
+    ) {
+        Object rmvd = curTop.remove(leftNode.order());
 
         assert rmvd != null;
 
         rtState.evtsData.topVer++;
         rtState.evtsData.evtIdGen++;
 
-        ZkDiscoveryNodeFailEventData evtData = new 
ZkDiscoveryNodeFailEventData(
+        ZkDiscoveryNodeLeaveEventData evtData = new 
ZkDiscoveryNodeLeaveEventData(
             rtState.evtsData.evtIdGen,
             rtState.evtsData.topVer,
-            failedNode.internalId());
+            leftNode.internalId(),
+            failed
+        );
 
         rtState.evtsData.addEvent(curTop.values(), evtData);
 
-        if (log.isInfoEnabled())
-            log.info("Generated NODE_FAILED event [evt=" + evtData + ']');
+        if (log.isInfoEnabled()) {
+            String evtName = failed ? "NODE_FAILED" : "NODE_LEFT";
+
+            log.info("Generated " + evtName + " event [evt=" + evtData + ']');
+        }
     }
 
     /**
@@ -2389,12 +2412,14 @@ public class ZookeeperDiscoveryImpl {
 
         batch.addAll(client.getChildrenPaths(zkPaths.customEvtsAcksDir));
 
+        batch.addAll(client.getChildrenPaths(zkPaths.stoppedNodesFlagsDir));
+
         client.deleteAll(batch, -1);
 
         if (startInternalOrder > 0) {
             for (String alive : client.getChildren(zkPaths.aliveNodesDir)) {
                 if (ZkIgnitePaths.aliveInternalId(alive) < startInternalOrder)
-                    client.deleteIfExists(zkPaths.aliveNodesDir + "/" + alive, 
-1);
+                    
client.deleteIfExists(ZkIgnitePaths.join(zkPaths.aliveNodesDir, alive), -1);
             }
         }
 
@@ -2423,7 +2448,7 @@ public class ZookeeperDiscoveryImpl {
             return readMultipleParts(zkClient, partsBasePath, partCnt);
         }
         else
-            return zkClient.getData(zkPaths.customEvtsDir + "/" + evtPath);
+            return zkClient.getData(ZkIgnitePaths.join(zkPaths.customEvtsDir, 
evtPath));
     }
 
     /**
@@ -2594,7 +2619,7 @@ public class ZookeeperDiscoveryImpl {
         for (String child : 
rtState.zkClient.getChildren(zkPaths.aliveNodesDir)) {
             if (ZkIgnitePaths.aliveInternalId(child) == internalId) {
                 // Need use sync delete to do not process again join of this 
node again.
-                rtState.zkClient.deleteIfExists(zkPaths.aliveNodesDir + "/" + 
child, -1);
+                
rtState.zkClient.deleteIfExists(ZkIgnitePaths.join(zkPaths.aliveNodesDir, 
child), -1);
 
                 return;
             }
@@ -2623,7 +2648,7 @@ public class ZookeeperDiscoveryImpl {
             }
         }
 
-        zkClient.deleteIfExistsAsync(zkPaths.customEvtsDir + "/" + evtPath);
+        zkClient.deleteIfExistsAsync(ZkIgnitePaths.join(zkPaths.customEvtsDir, 
evtPath));
     }
 
     /**
@@ -2690,13 +2715,13 @@ public class ZookeeperDiscoveryImpl {
                         break;
                     }
 
-                    case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: {
+                    case ZkDiscoveryEventData.ZK_EVT_NODE_LEFT: {
                         if (!rtState.joined)
                             break;
 
                         evtProcessed = true;
 
-                        notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData);
+                        
notifyNodeLeave((ZkDiscoveryNodeLeaveEventData)evtData);
 
                         break;
                     }
@@ -3204,7 +3229,7 @@ public class ZookeeperDiscoveryImpl {
             String alive = alives.get(i);
 
             if (internalIds.contains(ZkIgnitePaths.aliveInternalId(alive)))
-                rtState.zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + 
"/" + alive);
+                
rtState.zkClient.deleteIfExistsAsync(ZkIgnitePaths.join(zkPaths.aliveNodesDir, 
alive));
         }
     }
 
@@ -3532,8 +3557,8 @@ public class ZookeeperDiscoveryImpl {
     /**
      * @param evtData Event data.
      */
-    private void notifyNodeFail(final ZkDiscoveryNodeFailEventData evtData) {
-        notifyNodeFail(evtData.failedNodeInternalId(), 
evtData.topologyVersion());
+    private void notifyNodeLeave(final ZkDiscoveryNodeLeaveEventData evtData) {
+        notifyNodeLeave(evtData.leftNodeInternalId(), 
evtData.topologyVersion(), evtData.failed());
     }
 
     /**
@@ -3541,11 +3566,23 @@ public class ZookeeperDiscoveryImpl {
      * @param topVer Topology version.
      */
     private void notifyNodeFail(long nodeInternalOrder, long topVer) {
-        final ZookeeperClusterNode failedNode = 
rtState.top.removeNode(nodeInternalOrder);
+        notifyNodeLeave(nodeInternalOrder, topVer, true);
+    }
 
-        assert failedNode != null && !failedNode.isLocal() : failedNode;
+    /**
+     * @param nodeInternalOrder Node order.
+     * @param topVer Topology version.
+     * @param failed {@code true} if node failed, {@code false} otherwise.
+     */
+    private void notifyNodeLeave(long nodeInternalOrder, long topVer, boolean 
failed) {
+        final ZookeeperClusterNode leftNode = 
rtState.top.removeNode(nodeInternalOrder);
 
-        PingFuture pingFut = pingFuts.get(failedNode.order());
+        assert leftNode != null && !leftNode.isLocal() : leftNode;
+
+        if (!failed && rtState.crd)
+            
rtState.zkClient.deleteIfExistsAsync(zkPaths.nodeStoppedFlag(leftNode));
+
+        PingFuture pingFut = pingFuts.get(leftNode.order());
 
         if (pingFut != null)
             pingFut.onDone(false);
@@ -3554,9 +3591,9 @@ public class ZookeeperDiscoveryImpl {
 
         lsnr.onDiscovery(
             new DiscoveryNotification(
-                EVT_NODE_FAILED,
+                failed ? EVT_NODE_FAILED : EVT_NODE_LEFT,
                 topVer,
-                failedNode,
+                leftNode,
                 topSnapshot,
                 Collections.emptyMap(),
                 null,
@@ -3564,7 +3601,10 @@ public class ZookeeperDiscoveryImpl {
             )
         ).get();
 
-        stats.onNodeFailed();
+        if (failed)
+            stats.onNodeFailed();
+        else
+            stats.onNodeLeft();
     }
 
     /**
@@ -3680,11 +3720,11 @@ public class ZookeeperDiscoveryImpl {
                         break;
                     }
 
-                    case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: {
+                    case ZkDiscoveryEventData.ZK_EVT_NODE_LEFT: {
                         if (log.isDebugEnabled())
-                            log.debug("All nodes processed node fail 
[evtData=" + evtData + ']');
+                            log.debug("All nodes processed node left 
[evtData=" + evtData + ']');
 
-                        break; // Do not need addition cleanup.
+                        break;
                     }
                 }
 
@@ -3899,7 +3939,7 @@ public class ZookeeperDiscoveryImpl {
      *
      */
     public void stop() {
-        stop0(new IgniteSpiException("Node stopped"));
+        stop0(null);
     }
 
     /**
@@ -3913,6 +3953,14 @@ public class ZookeeperDiscoveryImpl {
 
         if (rtState.zkClient != null && rtState.locNodeZkPath != null && 
rtState.zkClient.connected()) {
             try {
+                if (e == null && rtState.joined) {
+                    rtState.zkClient.createIfNeededNoRetry(
+                        zkPaths.nodeStoppedFlag(locNode),
+                        null,
+                        PERSISTENT
+                    );
+                }
+
                 rtState.zkClient.deleteIfExistsNoRetry(rtState.locNodeZkPath, 
-1);
             }
             catch (Exception err) {
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java
index cc95dd3..21b62c4 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java
@@ -16,6 +16,7 @@
  */
 package org.apache.ignite.spi.discovery.zk.internal;
 
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
@@ -23,42 +24,55 @@ import org.apache.ignite.internal.util.typedef.internal.S;
  */
 public class ZookeeperDiscoveryStatistics {
     /** */
-    private long joinedNodesCnt;
+    private final LongAdder joinedNodesCnt = new LongAdder();
 
     /** */
-    private long failedNodesCnt;
+    private final LongAdder failedNodesCnt = new LongAdder();
+
+    /** */
+    private final LongAdder leftNodesCnt = new LongAdder();
 
     /** Communication error count. */
-    private long commErrCnt;
+    private final LongAdder commErrCnt = new LongAdder();
 
     /** */
     public long joinedNodesCnt() {
-        return joinedNodesCnt;
+        return joinedNodesCnt.longValue();
     }
 
     /** */
     public long failedNodesCnt() {
-        return failedNodesCnt;
+        return failedNodesCnt.longValue();
+    }
+
+    /** */
+    public long leftNodesCnt() {
+        return leftNodesCnt.longValue();
     }
 
     /** */
     public long commErrorCount() {
-        return commErrCnt;
+        return commErrCnt.longValue();
     }
 
     /** */
     public void onNodeJoined() {
-        joinedNodesCnt++;
+        joinedNodesCnt.increment();
     }
 
     /** */
     public void onNodeFailed() {
-        failedNodesCnt++;
+        failedNodesCnt.increment();
+    }
+
+    /** */
+    public void onNodeLeft() {
+        leftNodesCnt.increment();
     }
 
     /** */
     public void onCommunicationError() {
-        commErrCnt++;
+        commErrCnt.increment();
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite1.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite1.java
index d5be881..03d6a43 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite1.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite1.java
@@ -24,6 +24,7 @@ import 
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryCommunicati
 import 
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryConcurrentStartAndStartStopTest;
 import 
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryCustomEventsTest;
 import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryMiscTest;
+import 
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryRandomStopOrFailConcurrentTest;
 import 
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoverySegmentationAndConnectionRestoreTest;
 import 
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoverySpiSaslFailedAuthTest;
 import 
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoverySpiSaslSuccessfulAuthTest;
@@ -43,6 +44,7 @@ import org.junit.runners.Suite;
     ZookeeperValidatePathsTest.class,
     ZookeeperDiscoverySegmentationAndConnectionRestoreTest.class,
     ZookeeperDiscoveryConcurrentStartAndStartStopTest.class,
+    ZookeeperDiscoveryRandomStopOrFailConcurrentTest.class,
     ZookeeperDiscoveryTopologyChangeAndReconnectTest.class,
     ZookeeperDiscoveryCommunicationFailureTest.class,
     ZookeeperDiscoveryClientDisconnectTest.class,
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryConcurrentStartAndStartStopTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryConcurrentStartAndStartStopTest.java
index 1572af5..cea5975 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryConcurrentStartAndStartStopTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryConcurrentStartAndStartStopTest.java
@@ -180,7 +180,9 @@ public class 
ZookeeperDiscoveryConcurrentStartAndStartStopTest extends Zookeeper
             }, NODES, "stop-node");
 
             for (int j = 0; j < NODES; j++)
-                expEvts[j] = 
ZookeeperDiscoverySpiTestHelper.failEvent(++topVer);
+                expEvts[j] = 
ZookeeperDiscoverySpiTestHelper.leftEvent(++topVer, false);
+
+            helper.checkEvents(ignite(0), evts, expEvts);
 
             checkEventsConsistency();
         }
@@ -199,6 +201,8 @@ public class 
ZookeeperDiscoveryConcurrentStartAndStartStopTest extends Zookeeper
 
         startGridsMultiThreaded(3, false);
 
+        checkZkNodesCleanup();
+
         waitForTopology(3);
     }
 
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryMiscTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryMiscTest.java
index c644a4b..f271bad 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryMiscTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryMiscTest.java
@@ -255,6 +255,8 @@ public class ZookeeperDiscoveryMiscTest extends 
ZookeeperDiscoverySpiTestBase {
 
         stopGrid(0);
 
+        waitForTopology(2);
+
         assertEquals(mbean.getCoordinator(), srv2.localNode().id());
         assertEquals(mbean.getCoordinatorNodeFormatted(), 
String.valueOf(srv2.localNode()));
     }
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryRandomStopOrFailConcurrentTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryRandomStopOrFailConcurrentTest.java
new file mode 100644
index 0000000..0f9935b
--- /dev/null
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryRandomStopOrFailConcurrentTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.discovery.DiscoverySpiMBean;
+import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
+import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiMBean;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class ZookeeperDiscoveryRandomStopOrFailConcurrentTest extends 
ZookeeperDiscoverySpiTestBase {
+    /** */
+    private static final int NUM_CLIENTS = 10;
+
+    /** */
+    private static final int NUM_SERVERS = 10;
+
+    /** */
+    private static final int ZK_SESSION_TIMEOUT = 5_000;
+
+    /** */
+    @Parameterized.Parameters(name = "stop mode = {0}, with crd = {1}")
+    public static Collection<Object[]> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (StopMode stopMode: StopMode.values()) {
+            params.add(new Object[] {stopMode, true});
+            params.add(new Object[] {stopMode, false});
+        }
+
+        return params;
+    }
+
+    /** */
+    @Parameterized.Parameter(0)
+    public StopMode stopMode;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public boolean killCrd;
+
+    /** */
+    private final AtomicLong nodesLeft = new AtomicLong(0);
+
+    /** */
+    private final AtomicLong nodesFailed = new AtomicLong(0);
+
+    /** */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setClusterStateOnStart(ClusterState.INACTIVE);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        sesTimeout = ZK_SESSION_TIMEOUT;
+
+        testSockNio = true;
+
+        clientReconnectDisabled = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        for (Ignite g: G.allGrids()) {
+            ZkTestClientCnxnSocketNIO cnxn = 
ZkTestClientCnxnSocketNIO.forNode(g);
+
+            if (cnxn != null)
+                cnxn.allowConnect();
+        }
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void waitForTopology(int expSize) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(() -> 
grid(0).cluster().nodes().size() == expSize, 30_000));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStopOrFailConcurrently() throws Exception {
+        IgniteEx client = startServersAndClients(NUM_SERVERS, NUM_CLIENTS);
+
+        int crd = getCoordinatorIndex();
+
+        List<Integer> srvToStop = IntStream.range(1, NUM_SERVERS + 1)
+            .filter(j -> j != crd)
+            .boxed()
+            .collect(Collectors.collectingAndThen(Collectors.toList(), list -> 
{
+                Collections.shuffle(list);
+
+                return list.subList(0, NUM_SERVERS / 2);
+            }));
+
+        if (killCrd)
+            srvToStop.set(0, crd);
+
+        List<Integer> cliToStop = IntStream.range(NUM_SERVERS + 1, NUM_CLIENTS 
+ NUM_SERVERS)
+            .boxed()
+            .collect(Collectors.collectingAndThen(Collectors.toList(), list -> 
{
+                Collections.shuffle(list);
+
+                return list.subList(0, NUM_CLIENTS / 2);
+            }));
+
+        srvToStop.addAll(cliToStop);
+
+        stopOrKillMultithreaded(srvToStop);
+
+        waitForTopology(NUM_CLIENTS + NUM_SERVERS - srvToStop.size());
+
+        checkStopFlagsDeleted(10_000);
+
+        DiscoverySpiMBean mBean = getMbean(client);
+
+        GridTestUtils.waitForCondition(() -> nodesLeft.get() == 
mBean.getNodesLeft(), 10_000);
+        GridTestUtils.waitForCondition(() -> nodesFailed.get() == 
mBean.getNodesFailed(), 10_000);
+    }
+
+    /** */
+    private void checkStopFlagsDeleted(long timeout) throws Exception {
+        ZookeeperClient zkClient = new 
ZookeeperClient(getTestResources().getLogger(),
+            zkCluster.getConnectString(),
+            30_000,
+            null);
+
+        ZkIgnitePaths paths = new 
ZkIgnitePaths(ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT);
+
+        GridTestUtils.waitForCondition(() -> {
+            try {
+                return 
zkClient.getChildren(paths.stoppedNodesFlagsDir).isEmpty();
+            }
+            catch (Exception e) {
+                if (e instanceof InterruptedException)
+                    Thread.currentThread().interrupt();
+
+                throw new RuntimeException("Failed to wait for stopped nodes 
flags", e);
+            }
+        }, timeout);
+    }
+
+    /** */
+    private void stopOrKillMultithreaded(final List<Integer> stopIndices) 
throws Exception {
+        log.info("Stopping or killing nodes by idx: " + 
stopIndices.toString());
+
+        final StopMode mode = stopMode;
+
+        GridTestUtils.runMultiThreaded((idx) -> {
+            try {
+                Random rnd = ThreadLocalRandom.current();
+
+                int nodeIdx = stopIndices.get(idx);
+
+                if (mode == StopMode.FAIL_ONLY || (mode == StopMode.RANDOM && 
rnd.nextBoolean())) {
+                    ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(grid(nodeIdx));
+
+                    c0.closeSocket(true);
+
+                    nodesFailed.incrementAndGet();
+                }
+                else {
+                    stopGrid(nodeIdx);
+
+                    nodesLeft.incrementAndGet();
+                }
+            }
+            catch (Exception e) {
+                e.printStackTrace();
+
+                fail(e.getMessage());
+            }
+        }, stopIndices.size(), "stop-node");
+    }
+
+    /** */
+    private int getCoordinatorIndex() {
+        UUID crdId = getMbean(grid(0)).getCoordinator();
+
+        Optional<Integer> crdIdx = grid(0).cluster().nodes().stream().filter(n 
-> n.id().equals(crdId))
+            .map(n -> 
getTestIgniteInstanceIndex((String)n.consistentId())).findAny();
+
+        assertTrue(crdIdx.isPresent());
+
+        return crdIdx.get();
+    }
+
+    /** */
+    private DiscoverySpiMBean getMbean(IgniteEx grid) {
+        ZookeeperDiscoverySpiMBean bean = 
getMxBean(grid.context().igniteInstanceName(), "SPIs",
+            ZookeeperDiscoverySpi.class, ZookeeperDiscoverySpiMBean.class);
+
+        assertNotNull(bean);
+
+        return bean;
+    }
+
+    /** */
+    private IgniteEx startServersAndClients(int numServers, int numClients) 
throws Exception {
+        startGridsMultiThreaded(1, numServers);
+        startClientGridsMultiThreaded(numServers + 1, numClients - 1);
+
+        IgniteEx res = startClientGrid(0);
+
+        waitForTopology(numClients + numServers);
+
+        // Set initial value of counters from MBean.
+        nodesLeft.addAndGet(getMbean(res).getNodesLeft());
+        nodesFailed.addAndGet(getMbean(res).getNodesFailed());
+
+        return res;
+    }
+
+    enum StopMode {
+        STOP_ONLY,
+        FAIL_ONLY,
+        RANDOM
+    }
+}
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySegmentationAndConnectionRestoreTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySegmentationAndConnectionRestoreTest.java
index 49e39a8..d33932b 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySegmentationAndConnectionRestoreTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySegmentationAndConnectionRestoreTest.java
@@ -391,7 +391,7 @@ public class 
ZookeeperDiscoverySegmentationAndConnectionRestoreTest extends Zook
 
             closeZkClient(spi);
 
-            helper.checkEvents(node0, evts, 
ZookeeperDiscoverySpiTestHelper.failEvent(4));
+            helper.checkEvents(node0, evts, 
ZookeeperDiscoverySpiTestHelper.leftEvent(4, true));
         }
 
         c1.allowConnect();
@@ -399,7 +399,7 @@ public class 
ZookeeperDiscoverySegmentationAndConnectionRestoreTest extends Zook
         helper.checkEvents(ignite(1), evts, 
ZookeeperDiscoverySpiTestHelper.joinEvent(3));
 
         if (failWhenDisconnected) {
-            helper.checkEvents(ignite(1), evts, 
ZookeeperDiscoverySpiTestHelper.failEvent(4));
+            helper.checkEvents(ignite(1), evts, 
ZookeeperDiscoverySpiTestHelper.leftEvent(4, true));
 
             IgnitionEx.stop(getTestIgniteInstanceName(2), true, 
ShutdownPolicy.IMMEDIATE, true);
         }
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
index d23aa97..bed11bd 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
@@ -80,7 +80,10 @@ import 
org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
 import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiTestUtil;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
+import org.apache.zookeeper.ZooKeeper;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -566,6 +569,93 @@ class ZookeeperDiscoverySpiTestBase extends 
GridCommonAbstractTest {
         }
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    protected void checkZkNodesCleanup() throws Exception {
+        final ZookeeperClient zkClient = new 
ZookeeperClient(getTestResources().getLogger(),
+            zkCluster.getConnectString(),
+            30_000,
+            null);
+
+        final String basePath = ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT 
+ "/";
+
+        final String aliveDir = basePath + ZkIgnitePaths.ALIVE_NODES_DIR + "/";
+
+        try {
+            List<String> znodes = listSubTree(zkClient.zk(), 
ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT);
+
+            boolean foundAlive = false;
+
+            for (String znode : znodes) {
+                if (znode.startsWith(aliveDir)) {
+                    foundAlive = true;
+
+                    break;
+                }
+            }
+
+            assertTrue(foundAlive); // Sanity check to make sure we check 
correct directory.
+
+            assertTrue("Failed to wait for unused znodes cleanup", 
GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    try {
+                        List<String> znodes = listSubTree(zkClient.zk(), 
ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT);
+
+                        for (String znode : znodes) {
+                            if (znode.startsWith(aliveDir) || znode.length() < 
basePath.length())
+                                continue;
+
+                            znode = znode.substring(basePath.length());
+
+                            if (!znode.contains("/")) // Ignore roots.
+                                continue;
+
+                            // TODO ZK: 
https://issues.apache.org/jira/browse/IGNITE-8193
+                            if (znode.startsWith("jd/"))
+                                continue;
+
+                            log.info("Found unexpected znode: " + znode);
+
+                            return false;
+                        }
+
+                        return true;
+                    }
+                    catch (Exception e) {
+                        error("Unexpected error: " + e, e);
+
+                        fail("Unexpected error: " + e);
+                    }
+
+                    return false;
+                }
+            }, 10_000));
+        }
+        finally {
+            zkClient.close();
+        }
+    }
+
+    /**
+     * @param zk ZooKeeper client.
+     * @param root Root path.
+     * @return All children znodes for given path.
+     * @throws Exception If failed/
+     */
+    private List<String> listSubTree(ZooKeeper zk, String root) throws 
Exception {
+        for (int i = 0; i < 30; i++) {
+            try {
+                return ZKUtil.listSubTreeBFS(zk, root);
+            }
+            catch (KeeperException.NoNodeException e) {
+                info("NoNodeException when get znodes, will retry: " + e);
+            }
+        }
+
+        throw new Exception("Failed to get znodes: " + root);
+    }
+
     /** */
     private CacheConfiguration getCacheConfiguration() {
         CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestHelper.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestHelper.java
index 32e3855..be5f2e6 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestHelper.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestHelper.java
@@ -135,8 +135,10 @@ class ZookeeperDiscoverySpiTestHelper {
      * @param topVer Topology version.
      * @return Expected event instance.
      */
-    static DiscoveryEvent failEvent(long topVer) {
-        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_FAILED, null);
+    static DiscoveryEvent leftEvent(long topVer, boolean fail) {
+        int eventType = fail ? EventType.EVT_NODE_FAILED : 
EventType.EVT_NODE_LEFT;
+
+        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, eventType, 
null);
 
         expEvt.topologySnapshot(topVer, null);
 
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryTopologyChangeAndReconnectTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryTopologyChangeAndReconnectTest.java
index ba17a2f..f38baa7 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryTopologyChangeAndReconnectTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryTopologyChangeAndReconnectTest.java
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import 
org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
 import org.apache.ignite.internal.processors.query.DummyQueryIndexing;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -49,8 +48,6 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
 import org.apache.zookeeper.ZooKeeper;
 import org.junit.Ignore;
@@ -240,74 +237,6 @@ public class 
ZookeeperDiscoveryTopologyChangeAndReconnectTest extends ZookeeperD
     /**
      * @throws Exception If failed.
      */
-    private void checkZkNodesCleanup() throws Exception {
-        final ZookeeperClient zkClient = new 
ZookeeperClient(getTestResources().getLogger(),
-            zkCluster.getConnectString(),
-            30_000,
-            null);
-
-        final String basePath = ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT 
+ "/";
-
-        final String aliveDir = basePath + ZkIgnitePaths.ALIVE_NODES_DIR + "/";
-
-        try {
-            List<String> znodes = listSubTree(zkClient.zk(), 
ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT);
-
-            boolean foundAlive = false;
-
-            for (String znode : znodes) {
-                if (znode.startsWith(aliveDir)) {
-                    foundAlive = true;
-
-                    break;
-                }
-            }
-
-            assertTrue(foundAlive); // Sanity check to make sure we check 
correct directory.
-
-            assertTrue("Failed to wait for unused znodes cleanup", 
GridTestUtils.waitForCondition(new GridAbsPredicate() {
-                @Override public boolean apply() {
-                    try {
-                        List<String> znodes = listSubTree(zkClient.zk(), 
ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT);
-
-                        for (String znode : znodes) {
-                            if (znode.startsWith(aliveDir) || znode.length() < 
basePath.length())
-                                continue;
-
-                            znode = znode.substring(basePath.length());
-
-                            if (!znode.contains("/")) // Ignore roots.
-                                continue;
-
-                            // TODO ZK: 
https://issues.apache.org/jira/browse/IGNITE-8193
-                            if (znode.startsWith("jd/"))
-                                continue;
-
-                            log.info("Found unexpected znode: " + znode);
-
-                            return false;
-                        }
-
-                        return true;
-                    }
-                    catch (Exception e) {
-                        error("Unexpected error: " + e, e);
-
-                        fail("Unexpected error: " + e);
-                    }
-
-                    return false;
-                }
-            }, 10_000));
-        }
-        finally {
-            zkClient.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     @Ignore("https://issues.apache.org/jira/browse/IGNITE-9138";)
     @Test
     public void testRandomTopologyChanges_RestartZk() throws Exception {
@@ -767,25 +696,6 @@ public class 
ZookeeperDiscoveryTopologyChangeAndReconnectTest extends ZookeeperD
     }
 
     /**
-     * @param zk ZooKeeper client.
-     * @param root Root path.
-     * @return All children znodes for given path.
-     * @throws Exception If failed/
-     */
-    private List<String> listSubTree(ZooKeeper zk, String root) throws 
Exception {
-        for (int i = 0; i < 30; i++) {
-            try {
-                return ZKUtil.listSubTreeBFS(zk, root);
-            }
-            catch (KeeperException.NoNodeException e) {
-                info("NoNodeException when get znodes, will retry: " + e);
-            }
-        }
-
-        throw new Exception("Failed to get znodes: " + root);
-    }
-
-    /**
      * @param cacheName Cache name.
      * @return Configuration.
      */
diff --git 
a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
index 2b741a1..47fe0ac 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
@@ -114,7 +114,8 @@ public class ZkTestClientCnxnSocketNIO extends 
ClientCnxnSocketNIO {
      *
      */
     public void allowConnect() {
-        assert blockConnectLatch != null && blockConnectLatch.getCount() == 1 
: blockConnectLatch;
+        if (blockConnectLatch == null || blockConnectLatch.getCount() == 0)
+            return;
 
         log.info("ZkTestClientCnxnSocketNIO allowConnect [node=" + nodeName + 
']');
 

Reply via email to