zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1842bb4c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1842bb4c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1842bb4c Branch: refs/heads/ignite-zk Commit: 1842bb4c48ce245a5b69b669087590351de686fa Parents: 3736abe Author: sboikov <sboi...@gridgain.com> Authored: Tue Nov 14 15:07:55 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Nov 14 15:10:16 2017 +0300 ---------------------------------------------------------------------- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 33 +++++++++++--------- 1 file changed, 19 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1842bb4c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index bae183d..80d563e 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -97,6 +97,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery private static final String ALIVE_NODES_PATH = CLUSTER_PATH + "/alive"; /** */ + private static final String CUSTOM_EVTS_PATH = CLUSTER_PATH + "/customEvts"; + + /** */ private static final byte[] EMPTY_BYTES = new byte[0]; /** */ @@ -127,7 +130,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery private final JdkMarshaller marsh = new JdkMarshaller(); /** */ - private final NodesUpdateCallback nodesUpdateCallback; + private final ZKChildrenUpdateCallback zkChildrenUpdateCallback; /** */ private final DataUpdateCallback dataUpdateCallback; @@ -166,7 +169,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery public ZookeeperDiscoverySpi() { zkWatcher = new ZookeeperWatcher(); - nodesUpdateCallback = new NodesUpdateCallback(); + zkChildrenUpdateCallback = new ZKChildrenUpdateCallback(); dataUpdateCallback = new DataUpdateCallback(); } @@ -307,7 +310,12 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { // TODO ZK - //throw new UnsupportedOperationException(); + try { + zkCurator.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(CUSTOM_EVTS_PATH, marshal(msg)); + } + catch (Exception e) { + throw new IgniteSpiException(e); + } } /** {@inheritDoc} */ @@ -318,8 +326,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Override public boolean isClientMode() throws IllegalStateException { - // TODO ZK - return false; + return locNode.isClient(); } /** @@ -354,11 +361,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery } private boolean igniteClusterStarted() throws Exception { - boolean started = zkCurator.checkExists().forPath(IGNITE_PATH) != null && + return zkCurator.checkExists().forPath(IGNITE_PATH) != null && zkCurator.checkExists().forPath(ALIVE_NODES_PATH) != null && !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty(); - - return started; } /** {@inheritDoc} */ @@ -454,8 +459,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery gridStartTime = clusterData.gridStartTime; zk.getData(EVENTS_PATH, zkWatcher, dataUpdateCallback, null); - zk.getChildren(JOIN_HIST_PATH, zkWatcher, nodesUpdateCallback, null); - zk.getChildren(ALIVE_NODES_PATH, zkWatcher, nodesUpdateCallback, null); + zk.getChildren(JOIN_HIST_PATH, zkWatcher, zkChildrenUpdateCallback, null); + zk.getChildren(ALIVE_NODES_PATH, zkWatcher, zkChildrenUpdateCallback, null); List<Op> joinOps = new ArrayList<>(); @@ -693,7 +698,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** * */ - class NodesUpdateCallback implements AsyncCallback.Children2Callback { + class ZKChildrenUpdateCallback implements AsyncCallback.Children2Callback { @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { try { if (children == null || children.isEmpty()) @@ -756,8 +761,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery private final TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(); /** - * @param oldNodes - * @param newNodes + * @param oldNodes Previous processed state. + * @param newNodes Current state. */ private void generateEvents(ZKAliveNodes oldNodes, ZKAliveNodes newNodes) { assert newNodes != null; @@ -1177,7 +1182,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery log.info("Process event [type=" + event.getType() + ", state=" + event.getState() + ", path=" + event.getPath() + ']'); if (event.getType() == Event.EventType.NodeChildrenChanged) { - zk.getChildren(event.getPath(), this, nodesUpdateCallback, null); + zk.getChildren(event.getPath(), this, zkChildrenUpdateCallback, null); } else if (event.getType() == Event.EventType.NodeDataChanged) { zk.getData(event.getPath(), this, dataUpdateCallback, null); }