Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 11e2567ff -> 98a171c68


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/98a171c6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/98a171c6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/98a171c6

Branch: refs/heads/ignite-zk
Commit: 98a171c68a1f5610e5f5830144306ee73df866d6
Parents: 11e2567
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Nov 16 17:42:05 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Nov 16 18:02:19 2017 +0300

----------------------------------------------------------------------
 .../discovery/zk/ZookeeperDiscoverySpi2.java    | 249 ++-----------------
 .../discovery/zk/internal/ZkDiscoveryImpl.java  |  59 ++++-
 .../discovery/zk/internal/ZookeeperClient.java  |  28 ++-
 3 files changed, 91 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/98a171c6/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi2.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi2.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi2.java
index 52945f5..99810d0 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi2.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi2.java
@@ -49,6 +49,7 @@ import 
org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
 import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
 import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.ignite.spi.discovery.zk.internal.ZkDiscoveryImpl;
 import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClient;
 import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode;
 import org.apache.zookeeper.AsyncCallback;
@@ -63,33 +64,6 @@ import org.jetbrains.annotations.Nullable;
 @DiscoverySpiHistorySupport(true)
 public class ZookeeperDiscoverySpi2 extends IgniteSpiAdapter implements 
DiscoverySpi, JoiningNodesAware {
     /** */
-    private static final String IGNITE_PATH = "/ignite";
-
-    /** */
-    private static final String IGNITE_INIT_LOCK_PATH = "/igniteLock";
-
-    /** */
-    private static final String CLUSTER_PATH = IGNITE_PATH + "/cluster";
-
-    /** */
-    private static final String EVENTS_PATH = CLUSTER_PATH + "/events";
-
-    /** */
-    private static final String JOIN_HIST_PATH = CLUSTER_PATH + "/joinHist";
-
-    /** */
-    private static final String ALIVE_NODES_PATH = CLUSTER_PATH + "/alive";
-
-    /** */
-    private static final String CUSTOM_EVTS_PATH = CLUSTER_PATH + 
"/customEvts";
-
-    /** */
-    private static final String DISCO_EVTS_HIST_PATH = CLUSTER_PATH + 
"/evtsHist";
-
-    /** */
-    private static final byte[] EMPTY_BYTES = new byte[0];
-
-    /** */
     private String connectString;
 
     /** */
@@ -102,25 +76,10 @@ public class ZookeeperDiscoverySpi2 extends 
IgniteSpiAdapter implements Discover
     private DiscoveryMetricsProvider metricsProvider;
 
     /** */
-    private ZookeeperClient zkClient;
-
-    /** */
     private int sesTimeout = 5000;
 
     /** */
-    //private final ZookeeperWatcher zkWatcher;
-
-    /** */
-    private final JdkMarshaller marsh = new JdkMarshaller();
-
-    /** */
-    private ZKChildrenUpdateCallback zkChildrenUpdateCallback;
-
-    /** */
-    //private final DataUpdateCallback dataUpdateCallback;
-
-    /** */
-    private final JoinedNodes joinHist = new JoinedNodes();
+    private ZkDiscoveryImpl impl;
 
     /** */
     private ZookeeperClusterNode locNode;
@@ -138,25 +97,6 @@ public class ZookeeperDiscoverySpi2 extends 
IgniteSpiAdapter implements Discover
     @LoggerResource
     private IgniteLogger log;
 
-    /** */
-    private CountDownLatch joinLatch = new CountDownLatch(1);
-
-    /** */
-    private Exception joinErr;
-
-    /** For testing only. */
-    private CountDownLatch connectStart = new CountDownLatch(1);
-
-    /**
-     *
-     */
-    public ZookeeperDiscoverySpi2() {
-//        zkWatcher = new ZookeeperWatcher();
-//
-//        zkChildrenUpdateCallback = new ZKChildrenUpdateCallback();
-//        dataUpdateCallback = new DataUpdateCallback();
-    }
-
     public int getSessionTimeout() {
         return sesTimeout;
     }
@@ -376,46 +316,21 @@ public class ZookeeperDiscoverySpi2 extends 
IgniteSpiAdapter implements Discover
 
     /** {@inheritDoc} */
     @Override public void spiStart(@Nullable String igniteInstanceName) throws 
IgniteSpiException {
-        try {
-            initLocalNode();
-
-            DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id());
-
-            exchange.collect(discoDataBag);
+        initLocalNode();
 
-            String threadName = Thread.currentThread().getName();
+        DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id());
 
-            // ZK generates internal threads' names using current thread name.
-            Thread.currentThread().setName("zk-" + igniteInstanceName);
+        exchange.collect(discoDataBag);
 
-            try {
-            }
-            finally {
-                Thread.currentThread().setName(threadName);
-            }
-
-            boolean startedConnect = false;
-
-            if (!startedConnect)
-                startConnect(discoDataBag);
-
-            log.info("Waiting for local join event [nodeId=" + locNode.id() + 
", name=" + igniteInstanceName + ']');
-
-            for(;;) {
-                if (!joinLatch.await(10, TimeUnit.SECONDS)) {
-                    U.warn(log, "Waiting for local join event [nodeId=" + 
locNode.id() + ", name=" + igniteInstanceName + ']');
-                }
-                else
-                    break;
-            }
+        impl = new ZkDiscoveryImpl(log, lsnr);
 
-            if (joinErr != null)
-                throw new IgniteSpiException(joinErr);
+        try {
+            impl.joinTopology(igniteInstanceName, connectString, sesTimeout);
         }
-        catch (Exception e) {
-            connectStart.countDown();
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
 
-            throw new IgniteSpiException(e);
+            throw new IgniteSpiException("Failed to join cluster, thread was 
interrupted", e);
         }
     }
 
@@ -425,37 +340,13 @@ public class ZookeeperDiscoverySpi2 extends 
IgniteSpiAdapter implements Discover
      * @throws Exception If failed.
      */
     public void waitConnectStart() throws Exception {
-        connectStart.await();
+        //connectStart.await();
     }
 
     /** {@inheritDoc} */
     @Override public void spiStop() throws IgniteSpiException {
-        closeZkClient();
-    }
-
-    private void closeZkClient() {
-    }
-
-    private <T> T unmarshal(byte[] data) {
-        try {
-            return marsh.unmarshal(data, null);
-        }
-        catch (Exception e) {
-            U.error(log, "Unmarshal error: " + e);
-
-            throw new IgniteException(e);
-        }
-    }
-
-    private byte[] marshal(Object obj) {
-        try {
-            return marsh.marshal(obj);
-        }
-        catch (Exception e) {
-            U.error(log, "Marshal error: " + e);
-
-            throw new IgniteException(e);
-        }
+        if (impl != null)
+            impl.stop();
     }
 
     /** */
@@ -541,122 +432,10 @@ public class ZookeeperDiscoverySpi2 extends 
IgniteSpiAdapter implements Discover
         return new ZKNodeData(nodeOrder, nodeId);
     }
 
-    /** */
-    private boolean crd;
-
-    /** */
-    private ZKAliveNodes curAlive;
-
-    private void readJoinNodeData(ZKNodeData data, String path) throws 
Exception {
-        //byte[] bytes = zk.getData(path, null, null);
-//        byte[] bytes = zkCurator.getData().forPath(path);
-//
-//        assert bytes.length > 0;
-//
-//        ZKJoiningNodeData joinData = unmarshal(bytes);
-//
-//        assert joinData != null && joinData.node != null && 
joinData.joiningNodeData != null : joinData;
-//
-//        joinData.node.internalOrder(data.order);
-//
-//        data.joinData = joinData;
-    }
-
-    private void processJoinedNodesHistory(List<String> children, long 
joinOrder) {
-//        for (String child : children) {
-//            ZKNodeData data = parseNodePath(child);
-//
-//            if (data.order >= joinOrder && 
!joinHist.hist.containsKey(data.order)) {
-//                try {
-//                    Object old = joinHist.hist.put(data.order, data);
-//
-//                    assert old == null : old;
-//
-//                    readJoinNodeData(data, JOIN_HIST_PATH + "/" + child);
-//
-//                    assert data.joinData != null && 
joinHist.hist.get(data.order) == data : data;
-//
-//                    log.info("New joined node data: " + data);
-//                }
-//                catch (Exception e) {
-//                    // TODO ZK
-//                    U.error(log, "Failed to get node data: " + e, e);
-//                }
-//            }
-//        }
-    }
-
-    /**
-     *
-     */
-    private static class JoinedNodes {
-        /** */
-        private Stat stat;
-
-        /** */
-        private final Map<Long, ZKNodeData> hist = new HashMap<>();
-    }
-
-    /**
-     *
-     */
-    class ZKChildrenUpdateCallback implements AsyncCallback.Children2Callback {
-        @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
-            try {
-                if (children == null || children.isEmpty())
-                    return;
-
-                if (path.equals(JOIN_HIST_PATH)) {
-                    log.info("Join nodes changed [rc=" + rc +
-                        ", path=" + path +
-                        ", nodes=" + children +
-                        ", ver=" + (stat != null ? stat.getCversion() : null) 
+ ']');
-//
-//                    if (stat != null)
-//                        joinHist.stat = stat;
-//
-//                    processJoinedNodesHistory(children);
-                }
-                else if (path.equals(ALIVE_NODES_PATH)) {
-                    log.info("Alive nodes changed [rc=" + rc +
-                        ", path=" + path +
-                        ", nodes=" + children +
-                        ", ver=" + (stat != null ? stat.getCversion() : null) 
+ ']');
-
-                    assert stat != null;
-
-                    TreeMap<Long, ZKNodeData> nodes = new TreeMap<>();
-
-                    for (String child : children) {
-                        ZKNodeData data = parseNodePath(child);
-
-                        nodes.put(data.order, data);
-                    }
-
-                    ZKAliveNodes newAlive = new 
ZKAliveNodes(stat.getCversion(), nodes);
-
-                    //generateEvents(curAlive, newAlive);
-
-                    curAlive = newAlive;
-                }
-            }
-            catch (Throwable e) {
-                log.info("Uncaught error: " + e);
-
-                throw e;
-            }
-        }
-    }
-
     /**
      * For testing only.
      */
     public void closeClient() {
-        closeZkClient();
-
-        joinErr = new Exception("Start error");
-
-        joinLatch.countDown();
     }
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/98a171c6/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryImpl.java
index cfc9839..4c7a7bf 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryImpl.java
@@ -18,10 +18,18 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
 
 /**
@@ -29,6 +37,9 @@ import org.apache.zookeeper.data.Stat;
  */
 public class ZkDiscoveryImpl {
     /** */
+    private static final String IGNITE_PATH = "/apache-ignite";
+
+    /** */
     private final JdkMarshaller marsh = new JdkMarshaller();
 
     /** */
@@ -40,14 +51,58 @@ public class ZkDiscoveryImpl {
     /** */
     private ZookeeperClient zkClient;
 
+    /** */
+    private final GridFutureAdapter<Void> joinFut = new GridFutureAdapter<>();
+
     public ZkDiscoveryImpl(IgniteLogger log, DiscoverySpiListener lsnr) {
         this.log = log.getLogger(getClass());
         this.lsnr = lsnr;
     }
 
-    public void joinTopology(String igniteInstanceName) {
+    public void joinTopology(String igniteInstanceName, String connectString, 
int sesTimeout)
+        throws InterruptedException {
+        try {
+            zkClient = new ZookeeperClient(igniteInstanceName,
+                log,
+                connectString,
+                sesTimeout,
+                new ConnectionLossListener());
+        }
+        catch (Exception e) {
+            throw new IgniteSpiException("Failed to create Zookeeper client", 
e);
+        }
+
+        try {
+            zkClient.createIfNeeded(IGNITE_PATH, null, CreateMode.PERSISTENT);
+        }
+        catch (ZookeeperClientFailedException e) {
+            throw new IgniteSpiException("Failed to initialize Zookeeper 
nodes", e);
+        }
+    }
+
+    /**
+     *
+     */
+    public void stop() {
+        if (zkClient != null)
+            zkClient.close();
+    }
+
+    private <T> T unmarshal(byte[] data) throws IgniteCheckedException {
+        return marsh.unmarshal(data, null);
+    }
+
+    private byte[] marshal(Object obj) throws IgniteCheckedException {
+        return marsh.marshal(obj);
+    }
+
+    /**
+     *
+     */
+    private class ConnectionLossListener implements IgniteRunnable {
+        @Override public void run() {
 
-        //zkClient.
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/98a171c6/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index f80201c..8f3f074 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -88,20 +88,23 @@ public class ZookeeperClient implements Watcher {
 
         connectStartTime = System.currentTimeMillis();
 
-        zk = new ZooKeeper(connectString, sesTimeout, this);
+        String threadName = Thread.currentThread().getName();
+
+        // ZK generates internal threads' names using current thread name.
+        Thread.currentThread().setName("zk-" + igniteInstanceName);
+
+        try {
+            zk = new ZooKeeper(connectString, sesTimeout, this);
+        }
+        finally {
+            Thread.currentThread().setName(threadName);
+        }
 
         connTimer = new Timer("zk-timer-" + igniteInstanceName);
 
         scheduleConnectionCheck();
     }
 
-    /**
-     *
-     */
-    private void scheduleConnectionCheck() {
-        connTimer.schedule(new ConnectionTimeoutTask(connectStartTime), 
connLossTimeout);
-    }
-
     /** {@inheritDoc} */
     @Override public void process(WatchedEvent evt) {
         if (evt.getType() == Event.EventType.None) {
@@ -299,6 +302,15 @@ public class ZookeeperClient implements Watcher {
     /**
      *
      */
+    private void scheduleConnectionCheck() {
+        assert state == ConnectionState.Disconnected : state;
+
+        connTimer.schedule(new ConnectionTimeoutTask(connectStartTime), 
connLossTimeout);
+    }
+
+    /**
+     *
+     */
     private class ConnectionTimeoutTask extends TimerTask {
         /** */
         private final long connectStartTime;

Reply via email to