Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 54211bfae -> bedc4e99e


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bedc4e99
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bedc4e99
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bedc4e99

Branch: refs/heads/ignite-zk
Commit: bedc4e99e14bd597616b134d99ea75cb4d22ea08
Parents: 54211bf
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Nov 14 22:37:24 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Nov 14 23:25:47 2017 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 136 ++++++++++++-------
 .../zk/ZookeeperDiscoverySpiBasicTest.java      |  17 ++-
 2 files changed, 103 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bedc4e99/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index cc0e6a4..a924b49 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -370,6 +370,30 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
             !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty();
     }
 
+    private void startConnect(DiscoveryDataBag discoDataBag) throws Exception {
+        ZKClusterData clusterData = unmarshal(zk.getData(CLUSTER_PATH, false, 
null));
+
+        gridStartTime = clusterData.gridStartTime;
+
+        zk.getData(EVENTS_PATH, zkWatcher, dataUpdateCallback, null);
+        zk.getChildren(JOIN_HIST_PATH, zkWatcher, zkChildrenUpdateCallback, 
null);
+        zk.getChildren(ALIVE_NODES_PATH, zkWatcher, zkChildrenUpdateCallback, 
null);
+
+        ZKJoiningNodeData joinData = new ZKJoiningNodeData(locNode, 
discoDataBag.joiningNodeData());
+
+        byte[] nodeData = marshal(joinData);
+
+        String zkNode = "/" + locNode.id().toString() + "-";
+
+        zkCurator.inTransaction().
+                
create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(JOIN_HIST_PATH
 + zkNode, nodeData).
+                and().
+                
create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(ALIVE_NODES_PATH
 + zkNode).
+                and().commit();
+
+        connectStart.countDown();
+    }
+
     /** {@inheritDoc} */
     @Override public void spiStart(@Nullable String igniteInstanceName) throws 
IgniteSpiException {
         try {
@@ -413,6 +437,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
                 Thread.currentThread().setName(threadName);
             }
 
+            boolean startedConnect = false;
+
             for (;;) {
                 boolean started = igniteClusterStarted();
 
@@ -434,19 +460,27 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
                             if (zkCurator.checkExists().forPath(IGNITE_PATH) 
== null) {
                                 log.info("Initialize Zookeeper nodes.");
 
-                                List<Op> initOps = new ArrayList<>();
-
                                 ZKClusterData clusterData = new 
ZKClusterData(U.currentTimeMillis());
 
-                                initOps.add(Op.create(IGNITE_PATH, 
EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-                                initOps.add(Op.create(CLUSTER_PATH, 
marshal(clusterData), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-                                initOps.add(Op.create(JOIN_HIST_PATH, 
EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-                                initOps.add(Op.create(ALIVE_NODES_PATH, 
EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-                                initOps.add(Op.create(EVENTS_PATH, 
EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-                                initOps.add(Op.create(DISCO_EVTS_HIST_PATH, 
EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-                                initOps.add(Op.create(CUSTOM_EVTS_PATH, 
EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-
-                                zk.multi(initOps);
+                                zkCurator.inTransaction().
+                                    
create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(IGNITE_PATH,
 EMPTY_BYTES).
+                                    and().
+                                    
create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(CLUSTER_PATH,
 marshal(clusterData)).
+                                    and().
+                                    
create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(JOIN_HIST_PATH,
 EMPTY_BYTES).
+                                    and().
+                                    
create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(ALIVE_NODES_PATH,
 EMPTY_BYTES).
+                                    and().
+                                    
create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(EVENTS_PATH,
 EMPTY_BYTES).
+                                    and().
+                                    
create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(DISCO_EVTS_HIST_PATH,
 EMPTY_BYTES).
+                                    and().
+                                    
create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(CUSTOM_EVTS_PATH,
 EMPTY_BYTES).
+                                    and().commit();
+
+                                startConnect(discoDataBag);
+
+                                startedConnect = true;
                             }
 
                             break;
@@ -460,33 +494,8 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
                     break;
             }
 
-            ZKClusterData clusterData = unmarshal(zk.getData(CLUSTER_PATH, 
false, null));
-
-            gridStartTime = clusterData.gridStartTime;
-
-            zk.getData(EVENTS_PATH, zkWatcher, dataUpdateCallback, null);
-            zk.getChildren(JOIN_HIST_PATH, zkWatcher, 
zkChildrenUpdateCallback, null);
-            zk.getChildren(ALIVE_NODES_PATH, zkWatcher, 
zkChildrenUpdateCallback, null);
-
-            List<Op> joinOps = new ArrayList<>();
-
-            ZKJoiningNodeData joinData = new ZKJoiningNodeData(locNode, 
discoDataBag.joiningNodeData());
-
-            byte[] nodeData = marshal(joinData);
-
-            String zkNode = "/" + locNode.id().toString() + "-";
-
-//            joinOps.add(Op.create(JOIN_HIST_PATH + zkNode, nodeData, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL));
-//            joinOps.add(Op.create(ALIVE_NODES_PATH + zkNode, EMPTY_BYTES, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL));
-//            List<OpResult> res = zk.multi(joinOps);
-
-            zkCurator.inTransaction().
-                
create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(JOIN_HIST_PATH
 + zkNode, nodeData).
-                and().
-                
create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(ALIVE_NODES_PATH
 + zkNode).
-                and().commit();
-
-            connectStart.countDown();
+            if (!startedConnect)
+                startConnect(discoDataBag);
 
             log.info("Waiting for local join event [nodeId=" + locNode.id() + 
", name=" + igniteInstanceName + ']');
 
@@ -663,11 +672,11 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
         data.joinData = joinData;
     }
 
-    private void processJoinedNodesHistory(List<String> children) {
+    private void processJoinedNodesHistory(List<String> children, long 
joinOrder) {
         for (String child : children) {
             ZKNodeData data = parseNodePath(child);
 
-            if (!joinHist.hist.containsKey(data.order)) {
+            if (data.order >= joinOrder && 
!joinHist.hist.containsKey(data.order)) {
                 try {
                     Object old = joinHist.hist.put(data.order, data);
 
@@ -712,11 +721,11 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
                         ", path=" + path +
                         ", nodes=" + children +
                         ", ver=" + (stat != null ? stat.getCversion() : null) 
+ ']');
-
-                    if (stat != null)
-                        joinHist.stat = stat;
-
-                    processJoinedNodesHistory(children);
+//
+//                    if (stat != null)
+//                        joinHist.stat = stat;
+//
+//                    processJoinedNodesHistory(children);
                 }
                 else if (path.equals(ALIVE_NODES_PATH)) {
                     log.info("Alive nodes changed [rc=" + rc +
@@ -820,7 +829,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
                     // TODO ZK: check version.
                     List<String> children = 
zkCurator.getChildren().forPath(JOIN_HIST_PATH);
 
-                    processJoinedNodesHistory(children);
+                    processJoinedNodesHistory(children, nextJoinOrder);
 
                     joined = joinHist.hist.get(nextJoinOrder);
                 }
@@ -877,6 +886,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
                             v,
                             failedNode,
                             new ArrayList<>(curTop.values())));
+
+                        joinHist.hist.remove(joined.order);
                     }
 
                     nextJoinOrder++;
@@ -898,6 +909,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
                                 v,
                                 failedNode,
                                 new ArrayList<>(curTop.values())));
+
+                            joinHist.hist.remove(oldData.order);
                         }
 
                         break;
@@ -969,6 +982,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
     /** */
     private ZKDiscoveryEvent lastEvt;
 
+    /** */
     private int lastProcessed = -1;
 
     /**
@@ -1028,6 +1042,8 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
                     if (fireEvt) {
                         assert lastEvt == null || lastEvt.topVer + 1 == 
e.topVer : "lastEvt=" + lastEvt + ", nextEvt=" + e;
 
+                        ZookeeperClusterNode evtNode = e.node;
+
                         if (!crd) {
                             synchronized (curTop) {
                                 if (locJoin) {
@@ -1039,6 +1055,8 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
                                         assert old == null : node;
                                     }
 
+                                    evtNode = locNode;
+
                                     DiscoveryDataBag dataBag = new 
DiscoveryDataBag(e.node.id());
 
                                     dataBag.joiningNodeData(e.joiningNodeData);
@@ -1060,6 +1078,8 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
 
                                             Object old = 
curTop.put(node.internalOrder(), node);
 
+                                            evtNode = node;
+
                                             assert old == null : node;
 
                                             break;
@@ -1068,9 +1088,9 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
                                         case EventType.EVT_NODE_FAILED: {
                                             ZookeeperClusterNode node = e.node;
 
-                                            Object failedNode = 
curTop.remove(node.internalOrder());
+                                            evtNode = 
curTop.remove(node.internalOrder());
 
-                                            assert failedNode != null : node;
+                                            assert evtNode != null : node;
 
                                             break;
                                         }
@@ -1081,17 +1101,30 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
                                 }
                             }
                         }
+                        else
+                            evtNode = 
curTop.containsKey(e.node.internalOrder()) ? curTop.get(e.node.internalOrder()) 
: e.node;
 
                         log.info("Received discovery event, notify listener: " 
+ e);
 
                         List<ClusterNode> allNodes = 
allNodesForEvent(e.allNodes);
 
-                        lsnr.onDiscovery(e.evtType, e.topVer, e.node, 
allNodes, null, null);
+                        evtNode.local(locNode.id().equals(evtNode.id()));
+
+                        lsnr.onDiscovery(e.evtType, e.topVer, evtNode, 
allNodes, null, null);
 
                         if (locJoin) {
                             log.info("Local node joined: " + e);
 
                             joinLatch.countDown();
+
+                            try {
+                                String zkNode = JOIN_HIST_PATH + "/" + 
locNode.id().toString() + "-" + String.format("%010d", locNode.internalOrder() 
- 1);
+
+                                zkCurator.delete().forPath(zkNode);
+                            }
+                            catch (Exception err) {
+                                U.error(log, "Failed to delete join history 
data");
+                            }
                         }
 
                         lastEvt = e;
@@ -1109,6 +1142,11 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
         for (int i = 0; i < allNodes.size(); i++) {
             ZookeeperClusterNode node = allNodes.get(i);
 
+            ZookeeperClusterNode node0 = curTop.get(node.internalOrder());
+
+            if (node0 != null)
+                node = node0;
+
             node.local(locNode.id().equals(node.id()));
 
             res.add(node);

http://git-wip-us.apache.org/repos/asf/ignite/blob/bedc4e99/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index b4db065..4cd86db 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -486,7 +486,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /**
      * @param nodeId Node ID.
      * @param expEvts Expected events.
-     * @throws Exception If fialed.
+     * @throws Exception If failed.
      */
     private void checkEvents(final UUID nodeId, final 
DiscoveryEvent...expEvts) throws Exception {
         assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@@ -521,6 +521,21 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testClusterRestart() throws Exception {
+        startGridsMultiThreaded(3, false);
+
+        stopAllGrids();
+
+        evts.clear();
+
+        startGridsMultiThreaded(3, false);
+
+        waitForTopology(3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testConnectionRestore4() throws Exception {
         testSockNio = true;
 

Reply via email to