Repository: ignite Updated Branches: refs/heads/ignite-zk 11e2567ff -> 98a171c68
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/98a171c6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/98a171c6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/98a171c6 Branch: refs/heads/ignite-zk Commit: 98a171c68a1f5610e5f5830144306ee73df866d6 Parents: 11e2567 Author: sboikov <sboi...@gridgain.com> Authored: Thu Nov 16 17:42:05 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Nov 16 18:02:19 2017 +0300 ---------------------------------------------------------------------- .../discovery/zk/ZookeeperDiscoverySpi2.java | 249 ++----------------- .../discovery/zk/internal/ZkDiscoveryImpl.java | 59 ++++- .../discovery/zk/internal/ZookeeperClient.java | 28 ++- 3 files changed, 91 insertions(+), 245 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/98a171c6/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi2.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi2.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi2.java index 52945f5..99810d0 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi2.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi2.java @@ -49,6 +49,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; +import org.apache.ignite.spi.discovery.zk.internal.ZkDiscoveryImpl; import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClient; import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode; import org.apache.zookeeper.AsyncCallback; @@ -63,33 +64,6 @@ import org.jetbrains.annotations.Nullable; @DiscoverySpiHistorySupport(true) public class ZookeeperDiscoverySpi2 extends IgniteSpiAdapter implements DiscoverySpi, JoiningNodesAware { /** */ - private static final String IGNITE_PATH = "/ignite"; - - /** */ - private static final String IGNITE_INIT_LOCK_PATH = "/igniteLock"; - - /** */ - private static final String CLUSTER_PATH = IGNITE_PATH + "/cluster"; - - /** */ - private static final String EVENTS_PATH = CLUSTER_PATH + "/events"; - - /** */ - private static final String JOIN_HIST_PATH = CLUSTER_PATH + "/joinHist"; - - /** */ - private static final String ALIVE_NODES_PATH = CLUSTER_PATH + "/alive"; - - /** */ - private static final String CUSTOM_EVTS_PATH = CLUSTER_PATH + "/customEvts"; - - /** */ - private static final String DISCO_EVTS_HIST_PATH = CLUSTER_PATH + "/evtsHist"; - - /** */ - private static final byte[] EMPTY_BYTES = new byte[0]; - - /** */ private String connectString; /** */ @@ -102,25 +76,10 @@ public class ZookeeperDiscoverySpi2 extends IgniteSpiAdapter implements Discover private DiscoveryMetricsProvider metricsProvider; /** */ - private ZookeeperClient zkClient; - - /** */ private int sesTimeout = 5000; /** */ - //private final ZookeeperWatcher zkWatcher; - - /** */ - private final JdkMarshaller marsh = new JdkMarshaller(); - - /** */ - private ZKChildrenUpdateCallback zkChildrenUpdateCallback; - - /** */ - //private final DataUpdateCallback dataUpdateCallback; - - /** */ - private final JoinedNodes joinHist = new JoinedNodes(); + private ZkDiscoveryImpl impl; /** */ private ZookeeperClusterNode locNode; @@ -138,25 +97,6 @@ public class ZookeeperDiscoverySpi2 extends IgniteSpiAdapter implements Discover @LoggerResource private IgniteLogger log; - /** */ - private CountDownLatch joinLatch = new CountDownLatch(1); - - /** */ - private Exception joinErr; - - /** For testing only. */ - private CountDownLatch connectStart = new CountDownLatch(1); - - /** - * - */ - public ZookeeperDiscoverySpi2() { -// zkWatcher = new ZookeeperWatcher(); -// -// zkChildrenUpdateCallback = new ZKChildrenUpdateCallback(); -// dataUpdateCallback = new DataUpdateCallback(); - } - public int getSessionTimeout() { return sesTimeout; } @@ -376,46 +316,21 @@ public class ZookeeperDiscoverySpi2 extends IgniteSpiAdapter implements Discover /** {@inheritDoc} */ @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException { - try { - initLocalNode(); - - DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id()); - - exchange.collect(discoDataBag); + initLocalNode(); - String threadName = Thread.currentThread().getName(); + DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id()); - // ZK generates internal threads' names using current thread name. - Thread.currentThread().setName("zk-" + igniteInstanceName); + exchange.collect(discoDataBag); - try { - } - finally { - Thread.currentThread().setName(threadName); - } - - boolean startedConnect = false; - - if (!startedConnect) - startConnect(discoDataBag); - - log.info("Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']'); - - for(;;) { - if (!joinLatch.await(10, TimeUnit.SECONDS)) { - U.warn(log, "Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']'); - } - else - break; - } + impl = new ZkDiscoveryImpl(log, lsnr); - if (joinErr != null) - throw new IgniteSpiException(joinErr); + try { + impl.joinTopology(igniteInstanceName, connectString, sesTimeout); } - catch (Exception e) { - connectStart.countDown(); + catch (InterruptedException e) { + Thread.currentThread().interrupt(); - throw new IgniteSpiException(e); + throw new IgniteSpiException("Failed to join cluster, thread was interrupted", e); } } @@ -425,37 +340,13 @@ public class ZookeeperDiscoverySpi2 extends IgniteSpiAdapter implements Discover * @throws Exception If failed. */ public void waitConnectStart() throws Exception { - connectStart.await(); + //connectStart.await(); } /** {@inheritDoc} */ @Override public void spiStop() throws IgniteSpiException { - closeZkClient(); - } - - private void closeZkClient() { - } - - private <T> T unmarshal(byte[] data) { - try { - return marsh.unmarshal(data, null); - } - catch (Exception e) { - U.error(log, "Unmarshal error: " + e); - - throw new IgniteException(e); - } - } - - private byte[] marshal(Object obj) { - try { - return marsh.marshal(obj); - } - catch (Exception e) { - U.error(log, "Marshal error: " + e); - - throw new IgniteException(e); - } + if (impl != null) + impl.stop(); } /** */ @@ -541,122 +432,10 @@ public class ZookeeperDiscoverySpi2 extends IgniteSpiAdapter implements Discover return new ZKNodeData(nodeOrder, nodeId); } - /** */ - private boolean crd; - - /** */ - private ZKAliveNodes curAlive; - - private void readJoinNodeData(ZKNodeData data, String path) throws Exception { - //byte[] bytes = zk.getData(path, null, null); -// byte[] bytes = zkCurator.getData().forPath(path); -// -// assert bytes.length > 0; -// -// ZKJoiningNodeData joinData = unmarshal(bytes); -// -// assert joinData != null && joinData.node != null && joinData.joiningNodeData != null : joinData; -// -// joinData.node.internalOrder(data.order); -// -// data.joinData = joinData; - } - - private void processJoinedNodesHistory(List<String> children, long joinOrder) { -// for (String child : children) { -// ZKNodeData data = parseNodePath(child); -// -// if (data.order >= joinOrder && !joinHist.hist.containsKey(data.order)) { -// try { -// Object old = joinHist.hist.put(data.order, data); -// -// assert old == null : old; -// -// readJoinNodeData(data, JOIN_HIST_PATH + "/" + child); -// -// assert data.joinData != null && joinHist.hist.get(data.order) == data : data; -// -// log.info("New joined node data: " + data); -// } -// catch (Exception e) { -// // TODO ZK -// U.error(log, "Failed to get node data: " + e, e); -// } -// } -// } - } - - /** - * - */ - private static class JoinedNodes { - /** */ - private Stat stat; - - /** */ - private final Map<Long, ZKNodeData> hist = new HashMap<>(); - } - - /** - * - */ - class ZKChildrenUpdateCallback implements AsyncCallback.Children2Callback { - @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - try { - if (children == null || children.isEmpty()) - return; - - if (path.equals(JOIN_HIST_PATH)) { - log.info("Join nodes changed [rc=" + rc + - ", path=" + path + - ", nodes=" + children + - ", ver=" + (stat != null ? stat.getCversion() : null) + ']'); -// -// if (stat != null) -// joinHist.stat = stat; -// -// processJoinedNodesHistory(children); - } - else if (path.equals(ALIVE_NODES_PATH)) { - log.info("Alive nodes changed [rc=" + rc + - ", path=" + path + - ", nodes=" + children + - ", ver=" + (stat != null ? stat.getCversion() : null) + ']'); - - assert stat != null; - - TreeMap<Long, ZKNodeData> nodes = new TreeMap<>(); - - for (String child : children) { - ZKNodeData data = parseNodePath(child); - - nodes.put(data.order, data); - } - - ZKAliveNodes newAlive = new ZKAliveNodes(stat.getCversion(), nodes); - - //generateEvents(curAlive, newAlive); - - curAlive = newAlive; - } - } - catch (Throwable e) { - log.info("Uncaught error: " + e); - - throw e; - } - } - } - /** * For testing only. */ public void closeClient() { - closeZkClient(); - - joinErr = new Exception("Start error"); - - joinLatch.countDown(); } /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/98a171c6/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryImpl.java index cfc9839..4c7a7bf 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryImpl.java @@ -18,10 +18,18 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.util.List; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; /** @@ -29,6 +37,9 @@ import org.apache.zookeeper.data.Stat; */ public class ZkDiscoveryImpl { /** */ + private static final String IGNITE_PATH = "/apache-ignite"; + + /** */ private final JdkMarshaller marsh = new JdkMarshaller(); /** */ @@ -40,14 +51,58 @@ public class ZkDiscoveryImpl { /** */ private ZookeeperClient zkClient; + /** */ + private final GridFutureAdapter<Void> joinFut = new GridFutureAdapter<>(); + public ZkDiscoveryImpl(IgniteLogger log, DiscoverySpiListener lsnr) { this.log = log.getLogger(getClass()); this.lsnr = lsnr; } - public void joinTopology(String igniteInstanceName) { + public void joinTopology(String igniteInstanceName, String connectString, int sesTimeout) + throws InterruptedException { + try { + zkClient = new ZookeeperClient(igniteInstanceName, + log, + connectString, + sesTimeout, + new ConnectionLossListener()); + } + catch (Exception e) { + throw new IgniteSpiException("Failed to create Zookeeper client", e); + } + + try { + zkClient.createIfNeeded(IGNITE_PATH, null, CreateMode.PERSISTENT); + } + catch (ZookeeperClientFailedException e) { + throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); + } + } + + /** + * + */ + public void stop() { + if (zkClient != null) + zkClient.close(); + } + + private <T> T unmarshal(byte[] data) throws IgniteCheckedException { + return marsh.unmarshal(data, null); + } + + private byte[] marshal(Object obj) throws IgniteCheckedException { + return marsh.marshal(obj); + } + + /** + * + */ + private class ConnectionLossListener implements IgniteRunnable { + @Override public void run() { - //zkClient. + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/98a171c6/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index f80201c..8f3f074 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -88,20 +88,23 @@ public class ZookeeperClient implements Watcher { connectStartTime = System.currentTimeMillis(); - zk = new ZooKeeper(connectString, sesTimeout, this); + String threadName = Thread.currentThread().getName(); + + // ZK generates internal threads' names using current thread name. + Thread.currentThread().setName("zk-" + igniteInstanceName); + + try { + zk = new ZooKeeper(connectString, sesTimeout, this); + } + finally { + Thread.currentThread().setName(threadName); + } connTimer = new Timer("zk-timer-" + igniteInstanceName); scheduleConnectionCheck(); } - /** - * - */ - private void scheduleConnectionCheck() { - connTimer.schedule(new ConnectionTimeoutTask(connectStartTime), connLossTimeout); - } - /** {@inheritDoc} */ @Override public void process(WatchedEvent evt) { if (evt.getType() == Event.EventType.None) { @@ -299,6 +302,15 @@ public class ZookeeperClient implements Watcher { /** * */ + private void scheduleConnectionCheck() { + assert state == ConnectionState.Disconnected : state; + + connTimer.schedule(new ConnectionTimeoutTask(connectStartTime), connLossTimeout); + } + + /** + * + */ private class ConnectionTimeoutTask extends TimerTask { /** */ private final long connectStartTime;