Repository: ignite Updated Branches: refs/heads/ignite-zk 930a51799 -> 879f99930
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/879f9993 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/879f9993 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/879f9993 Branch: refs/heads/ignite-zk Commit: 879f999308be3549250a3032f6a5d007d5415464 Parents: 930a517 Author: sboikov <sboi...@gridgain.com> Authored: Mon Dec 25 15:04:27 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Dec 25 15:53:46 2017 +0300 ---------------------------------------------------------------------- .../discovery/zk/internal/ZkRuntimeState.java | 8 ++ .../zk/internal/ZookeeperDiscoveryImpl.java | 77 ++++++------ .../ZookeeperDiscoverySpiBasicTest.java | 119 ++++++++++++++----- .../testframework/junits/GridAbstractTest.java | 23 +++- 4 files changed, 154 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/879f9993/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java index 02e75ff..e61e2b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java @@ -98,6 +98,14 @@ class ZkRuntimeState { } /** + * @param internalOrder Node internal order. + * @return {@code True} if node belongs to previous cluster and should be ignored. + */ + boolean ignoreAliveNode(long internalOrder) { + return evtsData != null && internalOrder < evtsData.startInternalOrder; + } + + /** * @param err Error. */ void onCloseStart(Exception err) { http://git-wip-us.apache.org/repos/asf/ignite/blob/879f9993/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 1d3ad01..d7e0a76 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -308,7 +308,7 @@ public class ZookeeperDiscoveryImpl { if (log.isInfoEnabled()) { log.info("Created new communication error process future [errNode=" + node0.id() + - ", err= " + err + ']'); + ", err=" + err + ']'); } try { @@ -1225,6 +1225,9 @@ public class ZookeeperDiscoveryImpl { Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath); + if (rtState.ignoreAliveNode(internalId)) + continue; + aliveSrvs.put(internalId, aliveNodePath); } @@ -1258,7 +1261,7 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void checkClientsStatus(final List<String> aliveNodes) throws Exception { - assert locNode.isClient(); + assert locNode.isClient() : locNode; assert rtState.joined; assert rtState.evtsData != null; @@ -1272,6 +1275,9 @@ public class ZookeeperDiscoveryImpl { for (String aliveNodePath : aliveNodes) { Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath); + if (rtState.ignoreAliveNode(internalId)) + continue; + if (ZkIgnitePaths.aliveNodeClientFlag(aliveNodePath)) aliveClients.put(internalId, aliveNodePath); else { @@ -1294,23 +1300,19 @@ public class ZookeeperDiscoveryImpl { ZkDiscoveryEventsData prevEvts = rtState.evtsData; - ZkDiscoveryEventsData newEvts; - byte[] evtsBytes = rtState.zkClient.getData(zkPaths.evtsPath, stat); - if (evtsBytes.length == 0) { - // Possible if new cluster already started and removed old events, - // still can try generate {@link ZkNoServersMessage}. - newEvts = rtState.evtsData; - } - else - newEvts = unmarshalZip(evtsBytes); + assert evtsBytes.length > 0; + + ZkDiscoveryEventsData newEvts = unmarshalZip(evtsBytes); if (prevEvts.clusterId.equals(newEvts.clusterId)) { - U.warn(log, "All server nodes failed, notify all clients."); + U.warn(log, "All server nodes failed, notify all clients [locId=" + locNode.id() + ']'); generateNoServersEvent(newEvts, stat); } + else + U.warn(log, "All server nodes failed (received events from new cluster)."); } } else { @@ -1445,13 +1447,15 @@ public class ZookeeperDiscoveryImpl { handleProcessedEvents("crd"); } else { - if (log.isInfoEnabled()) - log.info("Node is first server node in cluster [locId=" + locNode.id() + ']'); - DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator(); if (nodeAuth != null) { try { + if (log.isInfoEnabled()) { + log.info("Node is first server node in cluster, try authenticate local node " + + "[locId=" + locNode.id() + ']'); + } + localAuthentication(nodeAuth, unmarshalCredentials(locNode)); } catch (Exception e) { @@ -1501,7 +1505,7 @@ public class ZookeeperDiscoveryImpl { for (String child : aliveNodes) { Long internalId = ZkIgnitePaths.aliveInternalId(child); - if (internalId < rtState.evtsData.startInternalOrder) { + if (rtState.ignoreAliveNode(internalId)) { if (log.isInfoEnabled()) { LT.info(log, "Ignore node from previous cluster [startOrder=" + rtState.evtsData.startInternalOrder + ", nodeOrder=" + internalId + @@ -2037,6 +2041,8 @@ public class ZookeeperDiscoveryImpl { */ @SuppressWarnings("unchecked") private void newClusterStarted(@Nullable ZkDiscoveryEventsData prevEvts) throws Exception { + assert !locNode.isClient() : locNode; + long locInternalId = rtState.internalOrder; assert prevEvts == null || prevEvts.maxInternalOrder < locInternalId; @@ -2052,6 +2058,12 @@ public class ZookeeperDiscoveryImpl { prevEvts != null ? prevEvts.maxInternalOrder + 1 : -1L, rtState.gridStartTime); + if (log.isInfoEnabled()) { + log.info("New cluster started [locId=" + locNode.id() + + ", clusterId=" + rtState.evtsData.clusterId + + ", startTime=" + rtState.evtsData.gridStartTime + ']'); + } + locNode.internalId(locInternalId); locNode.order(1); @@ -2074,16 +2086,7 @@ public class ZookeeperDiscoveryImpl { Collections.<Long, Collection<ClusterNode>>emptyMap(), null); - if (rtState.prevJoined) { - lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED, - 1L, - locNode, - topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); - - U.quietAndWarn(log, "Client node was reconnected after it was already considered failed."); - } + rtState.zkClient.setData(zkPaths.evtsPath, marshalZip(rtState.evtsData), -1); joinFut.onDone(); } @@ -2097,8 +2100,6 @@ public class ZookeeperDiscoveryImpl { ZookeeperClient client = rtState.zkClient; // TODO ZK: use multi, better batching + max-size safe + NoNodeException safe. - client.setData(zkPaths.evtsPath, null, -1); - List<String> evtChildren = rtState.zkClient.getChildren(zkPaths.evtsPath); for (String evtPath : evtChildren) { @@ -2332,13 +2333,20 @@ public class ZookeeperDiscoveryImpl { * @return Events. */ @Nullable private ZkDiscoveryEventsData processNewEvents(byte[] data) throws Exception { - if (data.length == 0) + ZkDiscoveryEventsData newEvts = data.length > 0 ? (ZkDiscoveryEventsData)unmarshalZip(data) : null; + + if (rtState.joined && (newEvts == null || !rtState.evtsData.clusterId.equals(newEvts.clusterId))) { + assert locNode.isClient() : locNode; + + throw localNodeFail("All server nodes failed, client node disconnected (received events from new custer) " + + "[locId=" + locNode.id() + ']', true); + } + + if (newEvts == null) return null; assert !rtState.crd; - ZkDiscoveryEventsData newEvts = unmarshalZip(data); - // Need keep processed custom events since they contain message object which is needed to create ack. if (rtState.evtsData != null) { for (Map.Entry<Long, ZkDiscoveryEventData> e : rtState.evtsData.evts.entrySet()) { @@ -2368,13 +2376,6 @@ public class ZookeeperDiscoveryImpl { */ @SuppressWarnings("unchecked") private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Exception { - if (rtState.joined && rtState.evtsData != null && !rtState.evtsData.clusterId.equals(evtsData.clusterId)) { - assert locNode.isClient() : locNode; - - throw localNodeFail("All server nodes failed, client node disconnected (received events from new custer) " + - "[locId=" + locNode.id() + ']', true); - } - TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts; ZookeeperClient zkClient = rtState.zkClient; http://git-wip-us.apache.org/repos/asf/ignite/blob/879f9993/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 2dd690d..293eb87 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -279,9 +279,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } } catch (Throwable e) { - err = true; - error("Unexpected error [evt=" + evt + ", err=" + e + ']', e); + + err = true; } return true; @@ -2625,57 +2625,112 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { startGridsMultiThreaded(srvs, clients); - final CountDownLatch disconnectLatch = new CountDownLatch(clients); - final CountDownLatch reconnectLatch = new CountDownLatch(clients); + for (int i = 0; i < 5; i++) { + info("Iteration: " + i); - IgnitePredicate<Event> p = new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - log.info("Disconnected: " + evt); + final CountDownLatch disconnectLatch = new CountDownLatch(clients); + final CountDownLatch reconnectLatch = new CountDownLatch(clients); - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - log.info("Reconnected: " + evt); + IgnitePredicate<Event> p = new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + log.info("Disconnected: " + evt); - reconnectLatch.countDown(); + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + log.info("Reconnected: " + evt); + + reconnectLatch.countDown(); + + return false; + } + + return true; } + }; - return true; + for (int c = 0; c < clients; c++) { + Ignite client = ignite(srvs + c); + + assertTrue(client.configuration().isClientMode()); + + client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); } - }; - for (int i = 0; i < clients; i++) { - Ignite client = ignite(srvs + i); + log.info("Stop all servers."); - assertTrue(client.configuration().isClientMode()); + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer threadIdx) { + stopGrid(getTestIgniteInstanceName(threadIdx), true, false); + } + }, srvs, "stop-server"); - client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + waitReconnectEvent(log, disconnectLatch); + + evts.clear(); + + client = false; + + log.info("Restart servers."); + + startGridsMultiThreaded(0, srvs); + + waitReconnectEvent(log, reconnectLatch); + + waitForTopology(srvs + clients); + + log.info("Reconnect finished."); } + } - log.info("Stop all servers."); + /** + * @throws Exception If failed. + */ + public void testReconnectServersRestart() throws Exception { + startGrid(0); - GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { - @Override public void apply(Integer threadIdx) { - stopGrid(getTestIgniteInstanceName(threadIdx), true, false); - } - }, srvs, "stop-server"); + client = true; - waitReconnectEvent(log, disconnectLatch); + final int CLIENTS = 10; - evts.clear(); + startGridsMultiThreaded(1, CLIENTS); client = false; - log.info("Restart servers."); + long stopTime = System.currentTimeMillis() + 30_000; - startGridsMultiThreaded(0, srvs); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - waitReconnectEvent(log, reconnectLatch); + final int NODES = 1 + CLIENTS; - waitForTopology(srvs + clients); + int iter = 0; - log.info("Reconnect finished."); + while (System.currentTimeMillis() < stopTime) { + int restarts = rnd.nextInt(10) + 1; + + info("Test iteration [iter=" + iter++ + ", restarts=" + restarts + ']'); + + for (int i = 0; i < restarts; i++) { + stopGrid(getTestIgniteInstanceName(0), true, false); + + startGrid(0); + } + + final Ignite srv = ignite(0); + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return srv.cluster().nodes().size() == NODES; + } + }, 30_000)); + + waitForTopology(NODES); + + awaitPartitionMapExchange(); + } + + evts.clear(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/879f9993/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 1084ce1..3f0b3e3 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -46,6 +46,7 @@ import junit.framework.TestCase; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; @@ -82,6 +83,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.logger.NullLogger; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.marshaller.MarshallerContextTestImpl; @@ -2179,10 +2181,25 @@ public abstract class GridAbstractTest extends TestCase { } for (Ignite node: nodes) { - int sizeOnNode = node.cluster().nodes().size(); + try { + IgniteFuture<?> reconnectFut = node.cluster().clientReconnectFuture(); - if (sizeOnNode != expSize) { - info("Wait for size on node [node=" + node.name() + ", size=" + sizeOnNode + ", exp=" + expSize + ']'); + if (reconnectFut != null && !reconnectFut.isDone()) { + info("Wait for size on node, reconnect is in progress [node=" + node.name() + ']'); + + return false; + } + + int sizeOnNode = node.cluster().nodes().size(); + + if (sizeOnNode != expSize) { + info("Wait for size on node [node=" + node.name() + ", size=" + sizeOnNode + ", exp=" + expSize + ']'); + + return false; + } + } + catch (IgniteClientDisconnectedException e) { + info("Wait for size on node, node disconnected [node=" + node.name() + ']'); return false; }