Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 5d8feab45 -> 9ffd603d2


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9ffd603d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9ffd603d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9ffd603d

Branch: refs/heads/ignite-zk
Commit: 9ffd603d217034247497b6c2734933872c8a78ed
Parents: 5d8feab
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Nov 23 12:04:42 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Nov 23 13:44:06 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  15 -
 .../org/apache/ignite/internal/IgnitionEx.java  |  28 +-
 .../zk/internal/ZkDiscoveryEventData.java       |  49 +++-
 .../zk/internal/ZkDiscoveryEventsData.java      |   9 +-
 .../internal/ZkDiscoveryNodeFailEventData.java  |   4 +-
 .../internal/ZkDiscoveryNodeJoinEventData.java  |   4 +-
 .../discovery/zk/internal/ZkEventAckFuture.java |   1 -
 .../discovery/zk/internal/ZkIgnitePaths.java    |  50 +++-
 .../zk/internal/ZkJoinEventDataForJoined.java   |   6 +
 .../discovery/zk/internal/ZookeeperClient.java  |  50 ++++
 .../ZookeeperClientFailedException.java         |   2 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 287 ++++++++++---------
 .../ZookeeperDiscoverySpiBasicTest.java         | 201 +++++++++++--
 13 files changed, 525 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index b58819c..19e1f1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -49,7 +49,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.CacheException;
 import javax.management.JMException;
 import javax.management.ObjectName;
-import org.apache.curator.test.TestingCluster;
 import org.apache.ignite.DataRegionMetrics;
 import org.apache.ignite.DataRegionMetricsAdapter;
 import org.apache.ignite.DataStorageMetrics;
@@ -265,20 +264,6 @@ import static 
org.apache.ignite.lifecycle.LifecycleEventType.BEFORE_NODE_START;
  */
 public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     /** */
-    public static TestingCluster zkCluster;
-
-    static {
-        zkCluster = new TestingCluster(1);
-
-        try {
-            zkCluster.start();
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    /** */
     private static final long serialVersionUID = 0L;
 
     /** Ignite site that is shown in log messages. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index e6f5442..cc7e266 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -44,6 +44,7 @@ import java.util.logging.Handler;
 import javax.management.JMException;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
+import org.apache.curator.test.TestingCluster;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -153,6 +154,25 @@ import static 
org.apache.ignite.plugin.segmentation.SegmentationPolicy.RESTART_J
  * GridConfiguration cfg = new GridConfiguration();
  */
 public class IgnitionEx {
+    /** */
+    public static final boolean TEST_ZK = true;
+
+    /** */
+    public static TestingCluster zkCluster;
+
+    static {
+        if (TEST_ZK) {
+            zkCluster = new TestingCluster(1);
+
+            try {
+                zkCluster.start();
+            }
+            catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
     /** Default configuration path relative to Ignite home. */
     public static final String DFLT_CFG = "config/default-config.xml";
 
@@ -2222,11 +2242,13 @@ public class IgnitionEx {
 
             initializeDataStorageConfiguration(myCfg);
 
-            ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
+            if (TEST_ZK) {
+                ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
 
-            
zkSpi.setZkConnectionString(IgniteKernal.zkCluster.getConnectString());
+                zkSpi.setZkConnectionString(zkCluster.getConnectString());
 
-            myCfg.setDiscoverySpi(zkSpi);
+                myCfg.setDiscoverySpi(zkSpi);
+            }
 
             return myCfg;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
index 00330e4..e7e8d31 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
@@ -49,6 +49,7 @@ abstract class ZkDiscoveryEventData implements Serializable {
     int flags;
 
     /**
+     * @param evtId Event ID.
      * @param evtType Event type.
      * @param topVer Topology version.
      */
@@ -60,21 +61,46 @@ abstract class ZkDiscoveryEventData implements Serializable 
{
         this.topVer = topVer;
     }
 
-    void remainingAcks(Collection<ZookeeperClusterNode> nodes) {
+    /**
+     * @param nodes Current nodes in topology.
+     */
+    void initRemainingAcks(Collection<ZookeeperClusterNode> nodes) {
         assert remainingAcks == null : this;
 
         remainingAcks = U.newHashSet(nodes.size());
 
         for (ZookeeperClusterNode node : nodes) {
-            if (!node.isLocal() && node.order() <= topVer)
-                remainingAcks.add(node.internalId());
+            if (!node.isLocal() && node.order() <= topVer) {
+                boolean add = remainingAcks.add(node.internalId());
+
+                assert add : node;
+            }
         }
     }
 
+    /**
+     * @param node Node.
+     */
+    void addRemainingAck(ZookeeperClusterNode node) {
+        assert node.order() <= topVer : node;
+
+        boolean add = remainingAcks.add(node.internalId());
+
+        assert add : node;
+    }
+
+    /**
+     * @return {@code True} if all nodes processed event.
+     */
     boolean allAcksReceived() {
         return remainingAcks.isEmpty();
     }
 
+    /**
+     * @param nodeInternalId Node ID.
+     * @param ackEvtId Last event ID processed on node.
+     * @return {@code True} if all nodes processed event.
+     */
     boolean onAckReceived(Integer nodeInternalId, long ackEvtId) {
         assert remainingAcks != null;
 
@@ -84,6 +110,10 @@ abstract class ZkDiscoveryEventData implements Serializable 
{
         return remainingAcks.isEmpty();
     }
 
+    /**
+     * @param node Failed node.
+     * @return {@code True} if all nodes processed event.
+     */
     boolean onNodeFail(ZookeeperClusterNode node) {
         assert remainingAcks != null : this;
 
@@ -92,18 +122,31 @@ abstract class ZkDiscoveryEventData implements 
Serializable {
         return remainingAcks.isEmpty();
     }
 
+    /**
+     * @param flag Flag mask.
+     * @return {@code True} if flag set.
+     */
     boolean flagSet(int flag) {
         return (flags & flag) == flag;
     }
 
+    /**
+     * @return Event ID.
+     */
     long eventId() {
         return evtId;
     }
 
+    /**
+     * @return Event type.
+     */
     int eventType() {
         return evtType;
     }
 
+    /**
+     * @return Event topology version.
+     */
     long topologyVersion() {
         return topVer;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
index ce21a06..6625ec0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -44,9 +44,9 @@ class ZkDiscoveryEventsData implements Serializable {
     TreeMap<Long, ZkDiscoveryEventData> evts;
 
     /**
-     * @param topVer
-     * @param gridStartTime
-     * @param evts
+     * @param topVer Current topology version.
+     * @param gridStartTime Cluster start time.
+     * @param evts Events history.
      */
     ZkDiscoveryEventsData(long gridStartTime, long topVer, TreeMap<Long, 
ZkDiscoveryEventData> evts) {
         this.gridStartTime = gridStartTime;
@@ -55,6 +55,7 @@ class ZkDiscoveryEventsData implements Serializable {
     }
 
     /**
+     * @param nodes Current nodes in topology (these nodes should ack that 
event processed).
      * @param evt Event.
      */
     void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData 
evt) {
@@ -62,6 +63,6 @@ class ZkDiscoveryEventsData implements Serializable {
 
         assert old == null : old;
 
-        evt.remainingAcks(nodes);
+        evt.initRemainingAcks(nodes);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
index e6ba4bd..b25f39c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
@@ -49,6 +49,8 @@ class ZkDiscoveryNodeFailEventData extends 
ZkDiscoveryEventData {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return "NodeFailEventData [topVer=" + topologyVersion() + ", nodeId=" 
+ failedNodeInternalId + ']';
+        return "ZkDiscoveryNodeFailEventData [topVer=" + topologyVersion() +
+            ", evtId=" + eventId() +
+            ", nodeId=" + failedNodeInternalId + ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
index 4482916..e96f386 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -51,6 +51,8 @@ class ZkDiscoveryNodeJoinEventData extends 
ZkDiscoveryEventData {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return "NodeJoinEventData [topVer=" + topologyVersion() + ", node=" + 
nodeId + ']';
+        return "ZkDiscoveryNodeJoinEventData [topVer=" + topologyVersion() +
+            ", evtId=" + eventId() +
+            ", node=" + nodeId + ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
index c89b586..ab0dad9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.IgniteLogger;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 1f6315c..9f1b859 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -171,49 +171,93 @@ class ZkIgnitePaths {
      * @param path Relative path.
      * @return Full path.
      */
-    String zkPath(String path) {
+    private String zkPath(String path) {
         return basePath + "/" + clusterName + "/" + path;
     }
 
+    /**
+     * @param path Alive node zk path.
+     * @return Node internal ID.
+     */
     static int aliveInternalId(String path) {
         int idx = path.lastIndexOf('|');
 
         return Integer.parseInt(path.substring(idx + 1));
     }
 
+    /**
+     * @param path Alive node zk path.
+     * @return Node ID.
+     */
+    static String aliveNodePrefixId(String path) {
+        return path.substring(0, ZkIgnitePaths.UUID_LEN);
+    }
+
+    /**
+     * @param path Alive node zk path.
+     * @return Node ID.
+     */
     static UUID aliveNodeId(String path) {
-        String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN);
+        // <uuid prefix>:<node id>|<join data seq>|<alive seq>
+        int startIdx = ZkIgnitePaths.UUID_LEN + 1;
+
+        String idStr = path.substring(startIdx, startIdx + 
ZkIgnitePaths.UUID_LEN);
 
         return UUID.fromString(idStr);
     }
 
-    static int aliveJoinSequence(String path) {
+    /**
+     * @param path Alive node zk path.
+     * @return Joined node sequence.
+     */
+    static int aliveJoinDataSequence(String path) {
         int idx1 = path.indexOf('|');
         int idx2 = path.lastIndexOf('|');
 
         return Integer.parseInt(path.substring(idx1 + 1, idx2));
     }
 
+    /**
+     * @param path Event zk path.
+     * @return Event sequence number.
+     */
     static int customEventSequence(String path) {
         int idx = path.lastIndexOf('|');
 
         return Integer.parseInt(path.substring(idx + 1));
     }
 
+    /**
+     * @param path Custom event zl path.
+     * @return Event node ID.
+     */
     static UUID customEventSendNodeId(String path) {
         String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN);
 
         return UUID.fromString(idStr);
     }
 
+    /**
+     * @param evtId Event ID.
+     * @return Event zk path.
+     */
     String joinEventDataPath(long evtId) {
         return evtsPath + "/" + evtId;
     }
 
+    /**
+     * @param evtId Event ID.
+     * @return Event zk path.
+     */
     String joinEventDataPathForJoined(long evtId) {
         return evtsPath + "/joined-" + evtId;
     }
 
+    /**
+     * @param ack Ack event flag.
+     * @param child Event child path.
+     * @return Full event data path.
+     */
     String customEventDataPath(boolean ack, String child) {
         String baseDir = ack ? customEvtsAcksDir : customEvtsDir;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
index cdbfdc0..eb24f27 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
@@ -43,10 +43,16 @@ class ZkJoinEventDataForJoined implements Serializable {
         this.discoData = discoData;
     }
 
+    /**
+     * @return Current topology.
+     */
     List<ZookeeperClusterNode> topology() {
         return top;
     }
 
+    /**
+     * @return Discovery data.
+     */
     Map<Integer, Serializable> discoveryData() {
         return discoData;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 626b235..d4d23ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -266,6 +266,55 @@ public class ZookeeperClient implements Watcher {
         }
     }
 
+    String createSequential(String checkPrefix, String dir, String childPath, 
byte[] data, CreateMode createMode)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        assert createMode.isSequential() : createMode;
+
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        boolean first = true;
+
+        String path = dir + "/" + childPath;
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                if (first) {
+                    List<String> children = zk.getChildren(dir, false);
+
+                    for (int i = 0; i < children.size(); i++) {
+                        String child = children.get(i);
+
+                        if (children.get(i).startsWith(checkPrefix)) {
+                            String resPath = dir + "/" + child;
+
+                            log.info("Check before retry, node already 
created: " + resPath);
+
+                            return resPath;
+                        }
+                    }
+                }
+
+                return zk.create(path, data, ZK_ACL, createMode);
+            }
+            catch (KeeperException.NodeExistsException e) {
+                assert !createMode.isSequential() : createMode;
+
+                log.info("Node already exists: " + path);
+
+                return path;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+
+            first = false;
+        }
+    }
+
     List<String> getChildren(String path)
         throws ZookeeperClientFailedException, InterruptedException
     {
@@ -469,6 +518,7 @@ public class ZookeeperClient implements Watcher {
                     U.warn(log, "Zookeeper operation failed, will retry [err=" 
+ e +
                         ", retryTimeout=" + RETRY_TIMEOUT +
                         ", connLossTimeout=" + connLossTimeout +
+                        ", path=" + ((KeeperException)e).getPath() +
                         ", remainingWaitTime=" + remainingTime + ']');
 
                     stateMux.wait(RETRY_TIMEOUT);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
index b222d58..99f2a6d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
@@ -27,7 +27,7 @@ public class ZookeeperClientFailedException extends Exception 
{
     /**
      * @param cause Cause.
      */
-    public ZookeeperClientFailedException(Throwable cause) {
+    ZookeeperClientFailedException(Throwable cause) {
         super(cause);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 00a0974..a04314d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
@@ -122,10 +123,13 @@ public class ZookeeperDiscoveryImpl {
     /** */
     private final int evtsAckThreshold;
 
+    /** */
+    private int procEvtCnt;
+
     /**
      * @param log Logger.
      * @param basePath Zookeeper base path node all nodes.
-     * @param clusterName Cluster name (
+     * @param clusterName Cluster name.
      * @param locNode Local node instance.
      * @param lsnr Discovery events listener.
      * @param exchange Discovery data exchange.
@@ -351,18 +355,29 @@ public class ZookeeperDiscoveryImpl {
      */
     private void startJoin(byte[] joinDataBytes) throws InterruptedException {
         try {
+            String prefix = UUID.randomUUID().toString();
+
             // TODO ZK: handle max size.
-            // TODO ZK: handle retries.
-            String path = zkClient.createIfNeeded(zkPaths.joinDataDir + "/" + 
locNode.id() + "|",
+
+            String path = zkClient.createSequential(prefix,
+                zkPaths.joinDataDir,
+                prefix + ":" + locNode.id() + "|",
                 joinDataBytes,
                 EPHEMERAL_SEQUENTIAL);
 
             int seqNum = Integer.parseInt(path.substring(path.lastIndexOf('|') 
+ 1));
 
-            locNodeZkPath = zkClient.createIfNeeded(zkPaths.aliveNodesDir + 
"/" + locNode.id() + "|" + seqNum + "|",
+            locNodeZkPath = zkClient.createSequential(
+                prefix,
+                zkPaths.aliveNodesDir,
+                prefix + ":" + locNode.id() + "|" + seqNum + "|",
                 null,
                 EPHEMERAL_SEQUENTIAL);
 
+            log.info("Node started join [nodeId=" + locNode.id() +
+                ", instanceName=" + 
locNode.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) +
+                ", nodePath=" + locNodeZkPath + ']');
+
             zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
AsyncCallback.Children2Callback() {
                 @Override public void processResult(int rc, String path, 
Object ctx, List<String> children, Stat stat) {
                     onConnected(rc, children);
@@ -439,38 +454,13 @@ public class ZookeeperDiscoveryImpl {
 
                 assert prevE != null;
 
-                final int crdInternalId = crdE.getKey();
-                final int locInternalId0 = locInternalId;
-
                 log.info("Discovery coordinator already exists, watch for 
previous node [" +
                     "locId=" + locNode.id() +
                     ", prevPath=" + prevE.getValue() + ']');
 
-                zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + 
prevE.getValue(), new Watcher() {
-                    @Override public void process(WatchedEvent evt) {
-                        if (evt.getType() == Event.EventType.NodeDeleted) {
-                            try {
-                                onPreviousNodeFail(aliveNodes, crdInternalId, 
locInternalId0);
-                            }
-                            catch (Throwable e) {
-                                onFatalError(e);
-                            }
-                        }
-                    }
-                }, new AsyncCallback.StatCallback() {
-                    @Override public void processResult(int rc, String path, 
Object ctx, Stat stat) {
-                        assert rc == 0 : rc;
-
-                        if (stat == null) {
-                            try {
-                                onPreviousNodeFail(aliveNodes, crdInternalId, 
locInternalId0);
-                            }
-                            catch (Throwable e) {
-                                onFatalError(e);
-                            }
-                        }
-                    }
-                });
+                PreviousNodeWatcher watcher = new PreviousNodeWatcher();
+
+                zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + 
prevE.getValue(), watcher, watcher);
             }
         }
         catch (Throwable e) {
@@ -478,7 +468,44 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
-    private void onPreviousNodeFail(List<String> aliveNodes, int 
crdInternalId, int locInternalId) throws Exception {
+    /**
+     *
+     */
+    private class PreviousNodeWatcher implements Watcher, 
AsyncCallback.StatCallback {
+        @Override public void process(WatchedEvent evt) {
+            if (evt.getType() == Event.EventType.NodeDeleted) {
+                try {
+                    onPreviousNodeFail();
+                }
+                catch (Throwable e) {
+                    onFatalError(e);
+                }
+            }
+            else {
+                if (log.isInfoEnabled())
+                    log.info("Previous node watch event: " + evt);
+
+                zkClient.existsAsync(evt.getPath(), this, this);
+            }
+        }
+
+        @Override public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+            log.info("Previous node stat callback [rc=" + rc + ", path=" + 
path + ", stat=" + stat + ']');
+
+            assert rc == 0 : rc;
+
+            if (stat == null) {
+                try {
+                    onPreviousNodeFail();
+                }
+                catch (Throwable e) {
+                    onFatalError(e);
+                }
+            }
+        }
+    }
+
+    private void onPreviousNodeFail() throws Exception {
         // TODO ZK:
 //        if (locInternalId == crdInternalId + 1) {
 //            if (log.isInfoEnabled())
@@ -517,7 +544,7 @@ public class ZookeeperDiscoveryImpl {
             assert this.evtsData != null;
 
             for (ZkDiscoveryEventData evtData : evtsData.evts.values())
-                evtData.remainingAcks(top.nodesByOrder.values());
+                evtData.initRemainingAcks(top.nodesByOrder.values());
 
             handleProcessedEvents();
         }
@@ -643,7 +670,7 @@ public class ZookeeperDiscoveryImpl {
             if (!alives.containsKey(e.getKey())) {
                 ZookeeperClusterNode failedNode = e.getValue();
 
-                processEventAcksOnNodeFail(failedNode);
+                handleProcessedEventsOnNodeFail(failedNode);
 
                 generateNodeFail(curTop, failedNode);
 
@@ -690,10 +717,8 @@ public class ZookeeperDiscoveryImpl {
 
         evtsData.addEvent(curTop.values(), evtData);
 
-        if (log.isInfoEnabled()) {
-            log.info("Generated NODE_FAILED event [topVer=" + 
evtData.topologyVersion() +
-                ", nodeId=" + failedNode.id() + ']');
-        }
+        if (log.isInfoEnabled())
+            log.info("Generated NODE_FAILED event [evt=" + evtData + ']');
     }
 
     /**
@@ -708,9 +733,13 @@ public class ZookeeperDiscoveryImpl {
         throws Exception
     {
         UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
-        int joinSeq = ZkIgnitePaths.aliveJoinSequence(aliveNodePath);
+        int joinSeq = ZkIgnitePaths.aliveJoinDataSequence(aliveNodePath);
 
-        String joinDataPath = zkPaths.joinDataDir + '/' + nodeId.toString() + 
"|" + String.format("%010d", joinSeq);
+        String joinDataPath = zkPaths.joinDataDir + '/' +
+            ZkIgnitePaths.aliveNodePrefixId(aliveNodePath) + ":" +
+            nodeId.toString() +
+            "|" +
+            String.format("%010d", joinSeq);
 
         byte[] joinData;
 
@@ -766,6 +795,8 @@ public class ZookeeperDiscoveryImpl {
 
         evtsData.addEvent(dataForJoined.topology(), evtData);
 
+        evtData.addRemainingAck(joinedNode); // Topology for joined node does 
not contain joined node.
+
         long start = System.currentTimeMillis();
 
         zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), 
joinData, PERSISTENT);
@@ -773,11 +804,8 @@ public class ZookeeperDiscoveryImpl {
 
         long time = System.currentTimeMillis() - start;
 
-        if (log.isInfoEnabled()) {
-            log.info("Generated NODE_JOINED event [topVer=" + 
evtData.topologyVersion() +
-                ", nodeId=" + joinedNode.id() +
-                ", addDataTime=" + time + ']');
-        }
+        if (log.isInfoEnabled())
+            log.info("Generated NODE_JOINED event [evt=" + evtData + ", 
addDataTime=" + time + ']');
     }
 
     /**
@@ -905,7 +933,7 @@ public class ZookeeperDiscoveryImpl {
                         evtsData.addEvent(top.nodesByOrder.values(), evtData);
 
                         if (log.isInfoEnabled())
-                            log.info("Generated CUSTOM event [topVer=" + 
evtData.topologyVersion() + ", evt=" + msg + ']');
+                            log.info("Generated CUSTOM event [evt=" + evtData 
+ ", msg=" + msg + ']');
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to unmarshal custom discovery 
message: " + e, e);
@@ -945,9 +973,6 @@ public class ZookeeperDiscoveryImpl {
         this.evtsData = newEvtsData;
     }
 
-    /** */
-    private int procEvtCnt;
-
     /**
      * @param evtsData Events.
      * @throws Exception If failed.
@@ -1067,83 +1092,6 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
-     * @throws Exception If failed.
-     */
-    private void handleProcessedEvents() throws Exception {
-        Iterator<ZkDiscoveryEventData> it = 
this.evtsData.evts.values().iterator();
-
-        List<ZkDiscoveryCustomEventData> newEvts = null;
-
-        while (it.hasNext()) {
-            ZkDiscoveryEventData evtData = it.next();
-
-            if (evtData.allAcksReceived()) {
-                switch (evtData.eventType()) {
-                    case EventType.EVT_NODE_JOINED: {
-                        
processNodesAckJoinEvent((ZkDiscoveryNodeJoinEventData)evtData);
-
-                        break;
-                    }
-
-                    case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
-                        DiscoverySpiCustomMessage ack = 
handleProcessedCustomEvent((ZkDiscoveryCustomEventData)evtData);
-
-                        if (ack != null) {
-                            evtsData.evtIdGen++;
-
-                            long evtId = evtsData.evtIdGen;
-
-                            byte[] ackBytes = marshal(ack);
-
-                            String evtChildPath = String.valueOf(evtId);
-
-                            zkClient.createIfNeeded(
-                                zkPaths.customEventDataPath(true, 
evtChildPath),
-                                ackBytes,
-                                CreateMode.PERSISTENT);
-
-                            ZkDiscoveryCustomEventData ackEvtData = new 
ZkDiscoveryCustomEventData(
-                                evtId,
-                                evtData.topologyVersion(), // Use topology 
version from original event.
-                                locNode.id(),
-                                evtChildPath,
-                                true);
-
-                            ackEvtData.msg = ack;
-
-                            if (newEvts == null)
-                                newEvts = new ArrayList<>();
-
-                            newEvts.add(ackEvtData);
-
-                            if (log.isInfoEnabled())
-                                log.info("Generated CUSTOM event [topVer=" + 
evtData.topologyVersion() + ", evt=" + ack + ']');
-                        }
-
-                        break;
-                    }
-
-                    case EventType.EVT_NODE_FAILED: {
-                        log.info("All nodes processed node fail [evtId=" + 
evtData.eventId() + ']');
-
-                        // Do not need cleanup.
-                        break;
-                    }
-                }
-
-                it.remove();
-            }
-        }
-
-        if (newEvts != null) {
-            for (int i = 0; i < newEvts.size(); i++)
-                evtsData.addEvent(top.nodesByOrder.values(), newEvts.get(i));
-
-            saveAndProcessNewEvents();
-        }
-    }
-
-    /**
      * @param evtsData Events data.
      * @param evtData Local join event data.
      * @throws Exception If failed.
@@ -1260,10 +1208,87 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    private void handleProcessedEvents() throws Exception {
+        Iterator<ZkDiscoveryEventData> it = 
this.evtsData.evts.values().iterator();
+
+        List<ZkDiscoveryCustomEventData> newEvts = null;
+
+        while (it.hasNext()) {
+            ZkDiscoveryEventData evtData = it.next();
+
+            if (evtData.allAcksReceived()) {
+                switch (evtData.eventType()) {
+                    case EventType.EVT_NODE_JOINED: {
+                        
handleProcessedJoinEvent((ZkDiscoveryNodeJoinEventData)evtData);
+
+                        break;
+                    }
+
+                    case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
+                        DiscoverySpiCustomMessage ack = 
handleProcessedCustomEvent((ZkDiscoveryCustomEventData)evtData);
+
+                        if (ack != null) {
+                            evtsData.evtIdGen++;
+
+                            long evtId = evtsData.evtIdGen;
+
+                            byte[] ackBytes = marshal(ack);
+
+                            String evtChildPath = String.valueOf(evtId);
+
+                            zkClient.createIfNeeded(
+                                zkPaths.customEventDataPath(true, 
evtChildPath),
+                                ackBytes,
+                                CreateMode.PERSISTENT);
+
+                            ZkDiscoveryCustomEventData ackEvtData = new 
ZkDiscoveryCustomEventData(
+                                evtId,
+                                evtData.topologyVersion(), // Use topology 
version from original event.
+                                locNode.id(),
+                                evtChildPath,
+                                true);
+
+                            ackEvtData.msg = ack;
+
+                            if (newEvts == null)
+                                newEvts = new ArrayList<>();
+
+                            newEvts.add(ackEvtData);
+
+                            if (log.isInfoEnabled())
+                                log.info("Generated CUSTOM event ack [evt=" + 
evtData + ", msg=" + ack + ']');
+                        }
+
+                        break;
+                    }
+
+                    case EventType.EVT_NODE_FAILED: {
+                        if (log.isInfoEnabled())
+                            log.info("All nodes processed node fail [evtData=" 
+ evtData + ']');
+
+                        break; // Do not need addition cleanup.
+                    }
+                }
+
+                it.remove();
+            }
+        }
+
+        if (newEvts != null) {
+            for (int i = 0; i < newEvts.size(); i++)
+                evtsData.addEvent(top.nodesByOrder.values(), newEvts.get(i));
+
+            saveAndProcessNewEvents();
+        }
+    }
+
+    /**
      * @param failedNode Failed node.
      * @throws Exception If failed.
      */
-    private void processEventAcksOnNodeFail(ZookeeperClusterNode failedNode) 
throws Exception {
+    private void handleProcessedEventsOnNodeFail(ZookeeperClusterNode 
failedNode) throws Exception {
         boolean processed = false;
 
         for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = 
evtsData.evts.entrySet().iterator(); it.hasNext();) {
@@ -1283,8 +1308,8 @@ public class ZookeeperDiscoveryImpl {
      * @param evtData Event data.
      * @throws Exception If failed.
      */
-    private void processNodesAckJoinEvent(ZkDiscoveryNodeJoinEventData 
evtData) throws Exception {
-        log.info("All nodes processed node join [evtId=" + evtData.eventId() + 
']');
+    private void handleProcessedJoinEvent(ZkDiscoveryNodeJoinEventData 
evtData) throws Exception {
+        log.info("All nodes processed node join [evtData=" + evtData + ']');
 
         zkClient.deleteIfExists(zkPaths.joinEventDataPath(evtData.eventId()), 
-1);
         
zkClient.deleteIfExists(zkPaths.joinEventDataPathForJoined(evtData.eventId()), 
-1);
@@ -1298,7 +1323,7 @@ public class ZookeeperDiscoveryImpl {
     @Nullable private DiscoverySpiCustomMessage 
handleProcessedCustomEvent(ZkDiscoveryCustomEventData evtData)
         throws Exception
     {
-        log.info("All nodes processed custom event [evtId=" + 
evtData.eventId() + ']');
+        log.info("All nodes processed custom event [evtData=" + evtData + ']');
 
         if (!evtData.ackEvent()) {
             zkClient.deleteIfExists(zkPaths.customEventDataPath(false, 
evtData.evtPath), -1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 8b3a117..39f9fbf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -26,6 +27,7 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.curator.test.TestingCluster;
@@ -183,14 +185,10 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
             zkCluster = new TestingCluster(3);
             zkCluster.start();
         }
-
-        System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1");
     }
 
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
-        System.clearProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD);
-
         if (zkCluster != null) {
             try {
                 zkCluster.close();
@@ -205,6 +203,20 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         super.afterTestsStopped();
     }
 
+    /**
+     *
+     */
+    private static void ackEveryEventSystemProperty() {
+        System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1");
+    }
+
+    /**
+     *
+     */
+    private void clearAckEveryEventSystemProperty() {
+        System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1");
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
@@ -226,6 +238,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
             stopAllGrids();
         }
+
+        clearAckEveryEventSystemProperty();
     }
 
     /**
@@ -301,6 +315,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testCustomEventsSimple1_SingleNode() throws Exception {
+        ackEveryEventSystemProperty();
+
         Ignite srv0 = startGrid(0);
 
         srv0.createCache(new CacheConfiguration<>("c1"));
@@ -312,6 +328,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testCustomEventsSimple1_5_Nodes() throws Exception {
+        ackEveryEventSystemProperty();
+
         Ignite srv0 = startGrids(5);
 
         srv0.createCache(new CacheConfiguration<>("c1"));
@@ -662,7 +680,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         startGrids(2);
 
         for (Ignite node : G.allGrids()) {
-            IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
+            IgniteCache<Object, Object> cache = node.cache(DEFAULT_CACHE_NAME);
 
             assertNotNull(cache);
 
@@ -680,6 +698,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testStartStop_2_Nodes() throws Exception {
+        ackEveryEventSystemProperty();
+
         startGrid(0);
 
         waitForTopology(1);
@@ -700,6 +720,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testStartStop1() throws Exception {
+        ackEveryEventSystemProperty();
+
         startGridsMultiThreaded(5, false);
 
         waitForTopology(5);
@@ -725,24 +747,46 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * @param node Node.
      * @throws Exception If failed.
      */
-    private void waitForEventsAcks(final Ignite node) throws Exception {
-        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                Map<Object, Object> evts = 
GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(),
-                    "impl", "evtsData", "evts");
+    public void testStartStop3() throws Exception {
+        startGrids(4);
 
-                if (!evts.isEmpty()) {
-                    info("Unacked events: " + evts);
+        awaitPartitionMapExchange();
 
-                    return false;
-                }
+        stopGrid(0);
 
-                return true;
-            }
-        }, 10_000));
+        startGrid(5);
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop4() throws Exception {
+        startGrids(6);
+
+        awaitPartitionMapExchange();
+
+        stopGrid(2);
+
+        if (ThreadLocalRandom.current().nextBoolean())
+            awaitPartitionMapExchange();
+
+        stopGrid(1);
+
+        if (ThreadLocalRandom.current().nextBoolean())
+            awaitPartitionMapExchange();
+
+        stopGrid(0);
+
+        if (ThreadLocalRandom.current().nextBoolean())
+            awaitPartitionMapExchange();
+
+        startGrid(7);
+
+        awaitPartitionMapExchange();
     }
 
     /**
@@ -796,6 +840,106 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testTopologyChangeAndZkRestart() throws Exception {
+
+    }
+
+    /**
+     * @param restartZk
+     * @throws Exception If failed.
+     */
+    private void topologyChangeWithRestarts(boolean restartZk) throws 
Exception {
+        startGrid(0);
+
+        long stopTime = System.currentTimeMillis();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomTopologyChanges() throws Exception {
+        List<Integer> startedNodes = new ArrayList<>();
+        List<String> startedCaches = new ArrayList<>();
+
+        int nextNodeIdx = 0;
+        int nextCacheIdx = 0;
+
+        long stopTime = System.currentTimeMillis() + 60_000;
+
+        int MAX_NODES = 20;
+        int MAX_CACHES = 10;
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        while (System.currentTimeMillis() < stopTime) {
+            if (startedNodes.size() > 0 && rnd.nextInt(10) == 0) {
+                boolean startCache = startedCaches.size() < 2 ||
+                    (startedCaches.size() < MAX_CACHES && rnd.nextInt(5) != 0);
+
+                int nodeIdx = 
startedNodes.get(rnd.nextInt(startedNodes.size()));
+
+                if (startCache) {
+                    String cacheName = "cache-" + nextCacheIdx++;
+
+                    log.info("Next, start new cache [cacheName=" + cacheName +
+                        ", node=" + nodeIdx +
+                        ", crd=" + (startedNodes.isEmpty() ? null : 
Collections.min(startedNodes)) +
+                        ", curCaches=" + startedCaches.size() + ']');
+
+                    ignite(nodeIdx).createCache(new 
CacheConfiguration<>(cacheName));
+
+                    startedCaches.add(cacheName);
+                }
+                else {
+                    if (startedCaches.size() > 1) {
+                        String cacheName = 
startedCaches.get(rnd.nextInt(startedCaches.size()));
+
+                        log.info("Next, stop cache [nodeIdx=" + nodeIdx +
+                            ", node=" + nodeIdx +
+                            ", crd=" + (startedNodes.isEmpty() ? null : 
Collections.min(startedNodes)) +
+                            ", cacheName=" + startedCaches.size() + ']');
+
+                        ignite(nodeIdx).destroyCache(cacheName);
+
+                        assertTrue(startedCaches.remove(cacheName));
+                    }
+                }
+            }
+            else {
+                boolean startNode = startedNodes.size() < 2 ||
+                    (startedNodes.size() < MAX_NODES && rnd.nextInt(5) != 0);
+
+                if (startNode) {
+                    int nodeIdx = nextNodeIdx++;
+
+                    log.info("Next, start new node [nodeIdx=" + nodeIdx +
+                        ", crd=" + (startedNodes.isEmpty() ? null : 
Collections.min(startedNodes)) +
+                        ", curNodes=" + startedNodes.size() + ']');
+
+                    startGrid(nodeIdx);
+
+                    assertTrue(startedNodes.add(nodeIdx));
+                }
+                else {
+                    if (startedNodes.size() > 1) {
+                        int nodeIdx = 
startedNodes.get(rnd.nextInt(startedNodes.size()));
+
+                        log.info("Next, stop [nodeIdx=" + nodeIdx +
+                            ", crd=" + (startedNodes.isEmpty() ? null : 
Collections.min(startedNodes)) +
+                            ", curNodes=" + startedNodes.size() + ']');
+
+                        stopGrid(nodeIdx);
+
+                        assertTrue(startedNodes.remove((Integer)nodeIdx));
+                    }
+                }
+            }
+        }
+    }
+
+    /**
      *
      */
     private void reset() {
@@ -811,6 +955,27 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @param node Node.
+     * @throws Exception If failed.
+     */
+    private void waitForEventsAcks(final Ignite node) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                Map<Object, Object> evts = 
GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(),
+                    "impl", "evtsData", "evts");
+
+                if (!evts.isEmpty()) {
+                    info("Unacked events: " + evts);
+
+                    return false;
+                }
+
+                return true;
+            }
+        }, 10_000));
+    }
+
+    /**
      * @throws Exception If failed.
      */
     private void checkEventsConsistency() throws Exception {

Reply via email to