zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/930a5179 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/930a5179 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/930a5179 Branch: refs/heads/ignite-zk Commit: 930a5179955eff26d7292af8a7b8f87c8ecee3b3 Parents: c145262 Author: sboikov <sboi...@gridgain.com> Authored: Mon Dec 25 14:38:05 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Dec 25 14:46:11 2017 +0300 ---------------------------------------------------------------------- .../zk/internal/ZkDiscoveryEventsData.java | 26 +- .../discovery/zk/internal/ZkIgnitePaths.java | 2 +- .../zk/internal/ZkNoServersMessage.java | 45 +++ .../discovery/zk/internal/ZookeeperClient.java | 26 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 281 ++++++++++++++++--- .../ZookeeperDiscoverySpiBasicTest.java | 101 ++++++- 6 files changed, 437 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/930a5179/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java index f8727e3..cb2d0be 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java @@ -31,6 +31,9 @@ class ZkDiscoveryEventsData implements Serializable { private static final long serialVersionUID = 0L; /** */ + final UUID clusterId; + + /** */ int procCustEvt = -1; /** */ @@ -55,17 +58,36 @@ class ZkDiscoveryEventsData implements Serializable { private UUID commErrFutId; /** - * @param startInternalOrder First + * @param startInternalOrder Starting internal order for cluster (znodes having lower order belong + * to previous cluster and should be ignored). + * @param clusterStartTime Start time of first node in cluster. + * @return Events. + */ + static ZkDiscoveryEventsData createForNewCluster(long startInternalOrder, long clusterStartTime) { + return new ZkDiscoveryEventsData( + UUID.randomUUID(), + startInternalOrder, + clusterStartTime, + 1L, + new TreeMap<Long, ZkDiscoveryEventData>() + ); + } + + /** + * @param clusterId Cluster ID. + * @param startInternalOrder Starting internal order for cluster. * @param topVer Current topology version. * @param gridStartTime Cluster start time. * @param evts Events history. */ - ZkDiscoveryEventsData( + private ZkDiscoveryEventsData( + UUID clusterId, long startInternalOrder, long gridStartTime, long topVer, TreeMap<Long, ZkDiscoveryEventData> evts) { + this.clusterId = clusterId; this.startInternalOrder = startInternalOrder; this.gridStartTime = gridStartTime; this.topVer = topVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/930a5179/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 8f35a8e..1c8706e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -201,7 +201,7 @@ class ZkIgnitePaths { * @param path Alive node zk path. * @return {@code True} if node is client. */ - static boolean aliveClientNode(String path) { + static boolean aliveNodeClientFlag(String path) { return (aliveFlags(path) & CLIENT_NODE_FLAG_MASK) != 0; } http://git-wip-us.apache.org/repos/asf/ignite/blob/930a5179/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java new file mode 100644 index 0000000..dcbd205 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class ZkNoServersMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkNoServersMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/930a5179/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index 9cd55d4..df0bb43 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -534,7 +534,7 @@ public class ZookeeperClient implements Watcher { * @throws InterruptedException If interrupted. */ void setData(String path, byte[] data, int ver) - throws ZookeeperClientFailedException, InterruptedException, KeeperException.NoNodeException + throws ZookeeperClientFailedException, InterruptedException, KeeperException.NoNodeException, KeeperException.BadVersionException { if (data == null) data = EMPTY_BYTES; @@ -550,6 +550,9 @@ public class ZookeeperClient implements Watcher { catch (KeeperException.NoNodeException e) { throw e; } + catch (KeeperException.BadVersionException e) { + throw e; + } catch (Exception e) { onZookeeperError(connStartTime, e); } @@ -558,19 +561,19 @@ public class ZookeeperClient implements Watcher { /** * @param path Path. + * @param stat Optional {@link Stat} instance to return znode state. * @return Data. * @throws KeeperException.NoNodeException If target node does not exist. * @throws ZookeeperClientFailedException If connection to zk was lost. * @throws InterruptedException If interrupted. */ - byte[] getData(String path) - throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException - { + byte[] getData(String path, @Nullable Stat stat) + throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException { for (;;) { long connStartTime = this.connStartTime; try { - return zk.getData(path, false, null); + return zk.getData(path, false, stat); } catch (KeeperException.NoNodeException e) { throw e; @@ -583,6 +586,19 @@ public class ZookeeperClient implements Watcher { /** * @param path Path. + * @return Data. + * @throws KeeperException.NoNodeException If target node does not exist. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + byte[] getData(String path) + throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException + { + return getData(path, null); + } + + /** + * @param path Path. */ void deleteIfExistsAsync(String path) { new DeleteIfExistsOperation(path).execute(); http://git-wip-us.apache.org/repos/asf/ignite/blob/930a5179/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 30bd750..1d3ad01 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -1209,19 +1209,18 @@ public class ZookeeperDiscoveryImpl { } /** - * @param rc Callback result code. * @param aliveNodes Alive nodes. * @throws Exception If failed. */ - private void checkIsCoordinator(int rc, final List<String> aliveNodes) throws Exception { - assert rc == 0 : KeeperException.Code.get(rc); + private void checkIsCoordinator(final List<String> aliveNodes) throws Exception { + assert !locNode.isClient(); TreeMap<Long, String> aliveSrvs = new TreeMap<>(); long locInternalOrder = rtState.internalOrder; for (String aliveNodePath : aliveNodes) { - if (ZkIgnitePaths.aliveClientNode(aliveNodePath)) + if (ZkIgnitePaths.aliveNodeClientFlag(aliveNodePath)) continue; Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath); @@ -1245,30 +1244,137 @@ public class ZookeeperDiscoveryImpl { if (log.isInfoEnabled()) { log.info("Discovery coordinator already exists, watch for previous server node [" + "locId=" + locNode.id() + - ", prevPath=" + prevE.getValue() + ']'); + ", watchPath=" + prevE.getValue() + ']'); } - PreviousNodeWatcher watcher = new PreviousNodeWatcher(rtState); + PreviousNodeWatcher watcher = new ServerPreviousNodeWatcher(rtState); rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(), watcher, watcher); } } /** - * + * @param aliveNodes Alive nodes. + * @throws Exception If failed. */ - private void onPreviousNodeFail() { - // TODO ZK: -// if (locInternalId == crdInternalId + 1) { -// if (log.isInfoEnabled()) -// log.info("Previous discovery coordinator failed [locId=" + locNode.id() + ']'); -// -// onBecomeCoordinator(aliveNodes, locInternalId); -// } - if (log.isInfoEnabled()) - log.info("Previous node failed, check is node new coordinator [locId=" + locNode.id() + ']'); + private void checkClientsStatus(final List<String> aliveNodes) throws Exception { + assert locNode.isClient(); + assert rtState.joined; + assert rtState.evtsData != null; + + TreeMap<Long, String> aliveClients = new TreeMap<>(); + + String srvPath = null; + Long srvInternalOrder = null; + + long locInternalOrder = rtState.internalOrder; + + for (String aliveNodePath : aliveNodes) { + Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath); + + if (ZkIgnitePaths.aliveNodeClientFlag(aliveNodePath)) + aliveClients.put(internalId, aliveNodePath); + else { + if (srvInternalOrder == null || internalId < srvInternalOrder) { + srvPath = aliveNodePath; + srvInternalOrder = internalId; + } + } + } + + assert !aliveClients.isEmpty(); + + Map.Entry<Long, String> oldest = aliveClients.firstEntry(); + + boolean oldestClient = locInternalOrder == oldest.getKey(); + + if (srvPath == null) { + if (oldestClient) { + Stat stat = new Stat(); + + ZkDiscoveryEventsData prevEvts = rtState.evtsData; + + ZkDiscoveryEventsData newEvts; + + byte[] evtsBytes = rtState.zkClient.getData(zkPaths.evtsPath, stat); + + if (evtsBytes.length == 0) { + // Possible if new cluster already started and removed old events, + // still can try generate {@link ZkNoServersMessage}. + newEvts = rtState.evtsData; + } + else + newEvts = unmarshalZip(evtsBytes); + + if (prevEvts.clusterId.equals(newEvts.clusterId)) { + U.warn(log, "All server nodes failed, notify all clients."); + + generateNoServersEvent(newEvts, stat); + } + } + } + else { + String watchPath; + + if (oldestClient) { + watchPath = srvPath; + + if (log.isInfoEnabled()) { + log.info("Servers exists, watch for server node [locId=" + locNode.id() + + ", watchPath=" + watchPath + ']'); + } + } + else { + assert aliveClients.size() > 1 : aliveClients; + + Map.Entry<Long, String> prevE = aliveClients.floorEntry(locInternalOrder - 1); + + assert prevE != null; + + watchPath = prevE.getValue(); + + if (log.isInfoEnabled()) { + log.info("Servers exists, watch for previous node [locId=" + locNode.id() + + ", watchPath=" + watchPath + ']'); + } + } + + PreviousNodeWatcher watcher = new ClientPreviousNodeWatcher(rtState); + + rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + watchPath, watcher, watcher); + } + } + + /** + * @param evtsData Events data. + * @param evtsStat Events zookeeper state. + * @throws Exception If failed. + */ + private void generateNoServersEvent(ZkDiscoveryEventsData evtsData, Stat evtsStat) throws Exception { + evtsData.evtIdGen++; - rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback(rtState)); + ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( + evtsData.evtIdGen, + evtsData.topVer, + locNode.id(), + new ZkNoServersMessage(), + null, + false); + + Collection<ZookeeperClusterNode> nodesToAck = Collections.emptyList(); + + evtsData.addEvent(nodesToAck, evtData); + + byte[] newEvtsBytes = marshalZip(evtsData); + + try { + rtState.zkClient.setData(zkPaths.evtsPath, newEvtsBytes, evtsStat.getVersion()); + } + catch (KeeperException.BadVersionException e) { + // Version can change if new cluster started and saved new events. + if (log.isDebugEnabled()) + log.debug("Failed to save no servers message"); + } } /** @@ -1395,8 +1501,15 @@ public class ZookeeperDiscoveryImpl { for (String child : aliveNodes) { Long internalId = ZkIgnitePaths.aliveInternalId(child); - if (internalId < rtState.evtsData.startInternalOrder) + if (internalId < rtState.evtsData.startInternalOrder) { + if (log.isInfoEnabled()) { + LT.info(log, "Ignore node from previous cluster [startOrder=" + rtState.evtsData.startInternalOrder + + ", nodeOrder=" + internalId + + ", znode=" + child + ']'); + } + continue; + } Object old = alives.put(internalId, child); @@ -1924,23 +2037,20 @@ public class ZookeeperDiscoveryImpl { */ @SuppressWarnings("unchecked") private void newClusterStarted(@Nullable ZkDiscoveryEventsData prevEvts) throws Exception { - assert prevEvts == null || prevEvts.maxInternalOrder < locNode.internalId(); + long locInternalId = rtState.internalOrder; + + assert prevEvts == null || prevEvts.maxInternalOrder < locInternalId; spi.getSpiContext().removeTimeoutObject(rtState.joinErrTimeoutObj); cleanupPreviousClusterData(); - long locInternalId = rtState.internalOrder; - rtState.joined = true; + rtState.gridStartTime = System.currentTimeMillis(); - rtState.gridStartTime = U.currentTimeMillis(); - - rtState.evtsData = new ZkDiscoveryEventsData( - prevEvts != null ? prevEvts.maxInternalOrder + 1 : locInternalId, - rtState.gridStartTime, - 1L, - new TreeMap<Long, ZkDiscoveryEventData>()); + rtState.evtsData = ZkDiscoveryEventsData.createForNewCluster( + prevEvts != null ? prevEvts.maxInternalOrder + 1 : -1L, + rtState.gridStartTime); locNode.internalId(locInternalId); locNode.order(1); @@ -2229,7 +2339,7 @@ public class ZookeeperDiscoveryImpl { ZkDiscoveryEventsData newEvts = unmarshalZip(data); - // Need keep processed custom events since they contains message object. + // Need keep processed custom events since they contain message object which is needed to create ack. if (rtState.evtsData != null) { for (Map.Entry<Long, ZkDiscoveryEventData> e : rtState.evtsData.evts.entrySet()) { ZkDiscoveryEventData evtData = e.getValue(); @@ -2246,7 +2356,8 @@ public class ZookeeperDiscoveryImpl { processNewEvents(newEvts); - rtState.evtsData = newEvts; + if (rtState.joined) + rtState.evtsData = newEvts; return newEvts; } @@ -2257,6 +2368,13 @@ public class ZookeeperDiscoveryImpl { */ @SuppressWarnings("unchecked") private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Exception { + if (rtState.joined && rtState.evtsData != null && !rtState.evtsData.clusterId.equals(evtsData.clusterId)) { + assert locNode.isClient() : locNode; + + throw localNodeFail("All server nodes failed, client node disconnected (received events from new custer) " + + "[locId=" + locNode.id() + ']', true); + } + TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts; ZookeeperClient zkClient = rtState.zkClient; @@ -2504,6 +2622,9 @@ public class ZookeeperDiscoveryImpl { joinFut.onDone(); deleteDataForJoinedAsync(evtData); + + if (locNode.isClient()) + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckClientsStatusCallback(rtState)); } /** @@ -2523,6 +2644,19 @@ public class ZookeeperDiscoveryImpl { processCommunicationErrorResolveFinishMessage( (ZkCommunicationErrorResolveFinishMessage)msg); } + else if (msg instanceof ZkNoServersMessage) + processNoServersMessage((ZkNoServersMessage)msg); + } + + /** + * @param msg Message. + * @throws Exception If failed. + */ + private void processNoServersMessage(ZkNoServersMessage msg) throws Exception { + assert locNode.isClient() : locNode; + + throw localNodeFail("All server nodes failed, client node disconnected " + + "(received 'no-servers' message ) [locId=" + locNode.id() + ']', true); } /** @@ -3654,7 +3788,7 @@ public class ZookeeperDiscoveryImpl { /** * */ - private class PreviousNodeWatcher extends ZkAbstractWatcher implements AsyncCallback.StatCallback { + private abstract class PreviousNodeWatcher extends ZkAbstractWatcher implements AsyncCallback.StatCallback { /** * @param rtState Runtime state. */ @@ -3689,6 +3823,62 @@ public class ZookeeperDiscoveryImpl { onProcessError(e); } } + + /** + * + */ + abstract void onPreviousNodeFail(); + } + + /** + * + */ + private class ServerPreviousNodeWatcher extends PreviousNodeWatcher { + /** + * @param rtState Runtime state. + */ + ServerPreviousNodeWatcher(ZkRuntimeState rtState) { + super(rtState); + + assert !locNode.isClient() : locNode; + } + + /** {@inheritDoc} */ + @Override void onPreviousNodeFail() { + // TODO ZK: +// if (locInternalId == crdInternalId + 1) { +// if (log.isInfoEnabled()) +// log.info("Previous discovery coordinator failed [locId=" + locNode.id() + ']'); +// +// onBecomeCoordinator(aliveNodes, locInternalId); +// } + if (log.isInfoEnabled()) + log.info("Previous server node failed, check is node new coordinator [locId=" + locNode.id() + ']'); + + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback(rtState)); + } + } + + /** + * + */ + private class ClientPreviousNodeWatcher extends PreviousNodeWatcher { + /** + * @param rtState Runtime state. + */ + ClientPreviousNodeWatcher(ZkRuntimeState rtState) { + super(rtState); + + assert locNode.isClient() : locNode; + } + + /** {@inheritDoc} */ + @Override void onPreviousNodeFail() { + if (log.isInfoEnabled()) + log.info("Watched node failed, check if there are alive servers [locId=" + locNode.id() + ']'); + + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckClientsStatusCallback(rtState)); + } } /** @@ -3703,10 +3893,33 @@ public class ZookeeperDiscoveryImpl { } /** {@inheritDoc} */ - @Override public void processResult0(int rc, String path, Object ctx, List<String> children, Stat stat) throws Exception { + @Override public void processResult0(int rc, String path, Object ctx, List<String> children, Stat stat) + throws Exception + { + assert rc == 0 : KeeperException.Code.get(rc); + + checkIsCoordinator(children); + } + } + + /** + * + */ + class CheckClientsStatusCallback extends ZkAbstractChildrenCallback { + /** + * @param rtState Runtime state. + */ + CheckClientsStatusCallback(ZkRuntimeState rtState) { + super(rtState, ZookeeperDiscoveryImpl.this); + } + + /** {@inheritDoc} */ + @Override void processResult0(int rc, String path, Object ctx, List<String> children, Stat stat) + throws Exception + { assert rc == 0 : KeeperException.Code.get(rc); - checkIsCoordinator(rc, children); + checkClientsStatus(children); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/930a5179/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 2a4fa76..2dd690d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -256,7 +256,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { try { DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt; - UUID locId = ignite.cluster().localNode().id(); + UUID locId = ((IgniteKernal)ignite).context().localNodeId(); Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId); @@ -281,7 +281,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { catch (Throwable e) { err = true; - info("Unexpected error: " + e); + error("Unexpected error [evt=" + evt + ", err=" + e + ']', e); } return true; @@ -2572,8 +2572,12 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } }); + U.sleep(3000); + waitSpi(getTestIgniteInstanceName(0)); + client = false; + startGrid(1); fut.get(); @@ -2584,6 +2588,99 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testDisconnectOnServersLeft_1() throws Exception { + disconnectOnServersLeft(1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testDisconnectOnServersLeft_2() throws Exception { + disconnectOnServersLeft(5, 1); + } + + /** + * @throws Exception If failed. + */ + public void testDisconnectOnServersLeft_3() throws Exception { + disconnectOnServersLeft(1, 10); + } + + /** + * @throws Exception If failed. + */ + public void testDisconnectOnServersLeft_4() throws Exception { + disconnectOnServersLeft(5, 10); + } + + /** + * @param srvs Number of servers. + * @param clients Number of clients. + * @throws Exception If failed. + */ + private void disconnectOnServersLeft(int srvs, int clients) throws Exception { + startGridsMultiThreaded(srvs); + + client = true; + + startGridsMultiThreaded(srvs, clients); + + final CountDownLatch disconnectLatch = new CountDownLatch(clients); + final CountDownLatch reconnectLatch = new CountDownLatch(clients); + + IgnitePredicate<Event> p = new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + log.info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + log.info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }; + + for (int i = 0; i < clients; i++) { + Ignite client = ignite(srvs + i); + + assertTrue(client.configuration().isClientMode()); + + client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + } + + log.info("Stop all servers."); + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer threadIdx) { + stopGrid(getTestIgniteInstanceName(threadIdx), true, false); + } + }, srvs, "stop-server"); + + waitReconnectEvent(log, disconnectLatch); + + evts.clear(); + + client = false; + + log.info("Restart servers."); + + startGridsMultiThreaded(0, srvs); + + waitReconnectEvent(log, reconnectLatch); + + waitForTopology(srvs + clients); + + log.info("Reconnect finished."); + } + + /** + * @throws Exception If failed. + */ public void testStartNoZk() throws Exception { stopZkCluster();