zk

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1842bb4c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1842bb4c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1842bb4c

Branch: refs/heads/ignite-zk
Commit: 1842bb4c48ce245a5b69b669087590351de686fa
Parents: 3736abe
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Nov 14 15:07:55 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Nov 14 15:10:16 2017 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 33 +++++++++++---------
 1 file changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1842bb4c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index bae183d..80d563e 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -97,6 +97,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
     private static final String ALIVE_NODES_PATH = CLUSTER_PATH + "/alive";
 
     /** */
+    private static final String CUSTOM_EVTS_PATH = CLUSTER_PATH + 
"/customEvts";
+
+    /** */
     private static final byte[] EMPTY_BYTES = new byte[0];
 
     /** */
@@ -127,7 +130,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
     private final JdkMarshaller marsh = new JdkMarshaller();
 
     /** */
-    private final NodesUpdateCallback nodesUpdateCallback;
+    private final ZKChildrenUpdateCallback zkChildrenUpdateCallback;
 
     /** */
     private final DataUpdateCallback dataUpdateCallback;
@@ -166,7 +169,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
     public ZookeeperDiscoverySpi() {
         zkWatcher = new ZookeeperWatcher();
 
-        nodesUpdateCallback = new NodesUpdateCallback();
+        zkChildrenUpdateCallback = new ZKChildrenUpdateCallback();
         dataUpdateCallback = new DataUpdateCallback();
     }
 
@@ -307,7 +310,12 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) 
throws IgniteException {
         // TODO ZK
-        //throw new UnsupportedOperationException();
+        try {
+            
zkCurator.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(CUSTOM_EVTS_PATH,
 marshal(msg));
+        }
+        catch (Exception e) {
+            throw new IgniteSpiException(e);
+        }
     }
 
     /** {@inheritDoc} */
@@ -318,8 +326,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
 
     /** {@inheritDoc} */
     @Override public boolean isClientMode() throws IllegalStateException {
-        // TODO ZK
-        return false;
+        return locNode.isClient();
     }
 
     /**
@@ -354,11 +361,9 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
     }
 
     private boolean igniteClusterStarted() throws Exception {
-        boolean started = zkCurator.checkExists().forPath(IGNITE_PATH) != null 
&&
+        return zkCurator.checkExists().forPath(IGNITE_PATH) != null &&
             zkCurator.checkExists().forPath(ALIVE_NODES_PATH) != null &&
             !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty();
-
-        return started;
     }
 
     /** {@inheritDoc} */
@@ -454,8 +459,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
             gridStartTime = clusterData.gridStartTime;
 
             zk.getData(EVENTS_PATH, zkWatcher, dataUpdateCallback, null);
-            zk.getChildren(JOIN_HIST_PATH, zkWatcher, nodesUpdateCallback, 
null);
-            zk.getChildren(ALIVE_NODES_PATH, zkWatcher, nodesUpdateCallback, 
null);
+            zk.getChildren(JOIN_HIST_PATH, zkWatcher, 
zkChildrenUpdateCallback, null);
+            zk.getChildren(ALIVE_NODES_PATH, zkWatcher, 
zkChildrenUpdateCallback, null);
 
             List<Op> joinOps = new ArrayList<>();
 
@@ -693,7 +698,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
     /**
      *
      */
-    class NodesUpdateCallback implements AsyncCallback.Children2Callback {
+    class ZKChildrenUpdateCallback implements AsyncCallback.Children2Callback {
         @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
             try {
                 if (children == null || children.isEmpty())
@@ -756,8 +761,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
     private final TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>();
 
     /**
-     * @param oldNodes
-     * @param newNodes
+     * @param oldNodes Previous processed state.
+     * @param newNodes Current state.
      */
     private void generateEvents(ZKAliveNodes oldNodes, ZKAliveNodes newNodes) {
         assert newNodes != null;
@@ -1177,7 +1182,7 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
                 log.info("Process event [type=" + event.getType() + ", state=" 
+ event.getState() + ", path=" + event.getPath() + ']');
 
             if (event.getType() == Event.EventType.NodeChildrenChanged) {
-                zk.getChildren(event.getPath(), this, nodesUpdateCallback, 
null);
+                zk.getChildren(event.getPath(), this, 
zkChildrenUpdateCallback, null);
             } else if (event.getType() == Event.EventType.NodeDataChanged) {
                 zk.getData(event.getPath(), this, dataUpdateCallback, null);
             }

Reply via email to