ignite-1758 Discovery fixes
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/80147128 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/80147128 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/80147128 Branch: refs/heads/ignite-638 Commit: 80147128a3b07f927dec65f0a6934f6782efab5c Parents: 5a116cb Author: sboikov <sboi...@gridgain.com> Authored: Tue Nov 17 09:48:58 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Nov 17 09:48:58 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 360 +++++++++++---- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 4 +- .../tcp/internal/TcpDiscoveryNodesRing.java | 95 ++-- .../messages/TcpDiscoveryAbstractMessage.java | 37 ++ .../TcpDiscoveryStatusCheckMessage.java | 11 + .../tcp/TcpDiscoveryMultiThreadedTest.java | 158 ++++--- .../discovery/tcp/TcpDiscoveryRestartTest.java | 10 +- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 459 ++++++++++++++++++- .../TcpDiscoveryMulticastIpFinderSelfTest.java | 28 +- .../testframework/junits/GridAbstractTest.java | 29 +- 10 files changed, 942 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 0fe2881..ae23d0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -159,6 +159,10 @@ class ServerImpl extends TcpDiscoveryImpl { private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024 * 10); /** */ + private static final IgniteProductVersion CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE = + IgniteProductVersion.fromString("1.5.0"); + + /** */ private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); @@ -191,10 +195,10 @@ class ServerImpl extends TcpDiscoveryImpl { private StatisticsPrinter statsPrinter; /** Failed nodes (but still in topology). */ - private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>(); + private final Collection<TcpDiscoveryNode> failedNodes = new HashSet<>(); /** Leaving nodes (but still in topology). */ - private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>(); + private final Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>(); /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */ private boolean ipFinderHasLocAddr; @@ -1080,13 +1084,34 @@ class ServerImpl extends TcpDiscoveryImpl { openSock = true; + TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId); + // Handshake. - spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk( - spi.getSocketTimeout())); + spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( ackTimeout0)); + if (msg instanceof TcpDiscoveryJoinRequestMessage) { + boolean ignore = false; + + synchronized (failedNodes) { + for (TcpDiscoveryNode failedNode : failedNodes) { + if (failedNode.id().equals(res.creatorNodeId())) { + if (log.isDebugEnabled()) + log.debug("Ignore response from node from failed list: " + res); + + ignore = true; + + break; + } + } + } + + if (ignore) + break; + } + if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) log.debug("Handshake response from local node: " + res); @@ -1104,7 +1129,7 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); if (debugMode) - debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + + debugLog(msg, "Message has been sent directly to address [msg=" + msg + ", addr=" + addr + ", rmtNodeId=" + res.creatorNodeId() + ']'); if (log.isDebugEnabled()) @@ -1754,6 +1779,32 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Adds failed nodes specified in the received message to the local failed nodes list. + * + * @param msg Message. + */ + private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { + if (msg.failedNodes() != null) { + for (UUID nodeId : msg.failedNodes()) { + TcpDiscoveryNode failedNode = ring.node(nodeId); + + if (failedNode != null) { + if (!failedNode.isLocal()) { + boolean added; + + synchronized (mux) { + added = failedNodes.add(failedNode); + } + + if (added && log.isDebugEnabled()) + log.debug("Added node to failed nodes list [node=" + failedNode + ", msg=" + msg + ']'); + } + } + } + } + } + + /** * Discovery messages history used for client reconnect. */ private class EnsuredMessageHistory { @@ -2131,10 +2182,28 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); if (debugMode) - debugLog("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); + debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); + + if (locNode.internalOrder() == 0) { + boolean process = false; + + if (msg instanceof TcpDiscoveryNodeAddedMessage) + process = ((TcpDiscoveryNodeAddedMessage)msg).node().equals(locNode); + + if (!process) { + if (log.isDebugEnabled()) { + log.debug("Ignore message, local node order is not initialized [msg=" + msg + + ", locNode=" + locNode + ']'); + } + + return; + } + } spi.stats.onMessageProcessingStarted(msg); + processMessageFailedNodes(msg); + if (msg instanceof TcpDiscoveryJoinRequestMessage) processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg); @@ -2200,6 +2269,8 @@ class ServerImpl extends TcpDiscoveryImpl { checkHeartbeatsReceiving(); checkPendingCustomMessages(); + + checkFailedNodesList(); } /** @@ -2262,50 +2333,50 @@ class ServerImpl extends TcpDiscoveryImpl { boolean sent = false; - boolean searchNext = true; + boolean newNextNode = false; UUID locNodeId = getLocalNodeId(); while (true) { - if (searchNext) { - TcpDiscoveryNode newNext = ring.nextNode(failedNodes); - - if (newNext == null) { - if (log.isDebugEnabled()) - log.debug("No next node in topology."); + TcpDiscoveryNode newNext = ring.nextNode(failedNodes); - if (debugMode) - debugLog("No next node in topology."); + if (newNext == null) { + if (log.isDebugEnabled()) + log.debug("No next node in topology."); - if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) && - !(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) { - msg.senderNodeId(locNodeId); + if (debugMode) + debugLog(msg, "No next node in topology."); - addMessage(msg); - } + if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) && + !(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) { + msg.senderNodeId(locNodeId); - break; + addMessage(msg); } - if (!newNext.equals(next)) { - if (log.isDebugEnabled()) - log.debug("New next node [newNext=" + newNext + ", formerNext=" + next + - ", ring=" + ring + ", failedNodes=" + failedNodes + ']'); + break; + } - if (debugMode) - debugLog("New next node [newNext=" + newNext + ", formerNext=" + next + - ", ring=" + ring + ", failedNodes=" + failedNodes + ']'); + if (!newNext.equals(next)) { + if (log.isDebugEnabled()) + log.debug("New next node [newNext=" + newNext + ", formerNext=" + next + + ", ring=" + ring + ", failedNodes=" + failedNodes + ']'); - U.closeQuiet(sock); + if (debugMode) + debugLog(msg, "New next node [newNext=" + newNext + ", formerNext=" + next + + ", ring=" + ring + ", failedNodes=" + failedNodes + ']'); - sock = null; + U.closeQuiet(sock); - next = newNext; - } - else if (log.isDebugEnabled()) - log.debug("Next node remains the same [nextId=" + next.id() + - ", nextOrder=" + next.internalOrder() + ']'); + sock = null; + + next = newNext; + + newNextNode = true; } + else if (log.isDebugEnabled()) + log.debug("Next node remains the same [nextId=" + next.id() + + ", nextOrder=" + next.internalOrder() + ']'); // Flag that shows whether next node exists and accepts incoming connections. boolean nextNodeExists = sock != null; @@ -2379,8 +2450,8 @@ class ServerImpl extends TcpDiscoveryImpl { "expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']'); if (debugMode) - debugLog("Failed to restore ring because next node ID received is not as " + - "expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']'); + debugLog(msg, "Failed to restore ring because next node ID received is not " + + "as expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']'); break; } @@ -2401,8 +2472,8 @@ class ServerImpl extends TcpDiscoveryImpl { ", rcvd=" + nextOrder + ", id=" + next.id() + ']'); if (debugMode) - debugLog("Failed to restore ring because next node order received " + - "is not as expected [expected=" + next.internalOrder() + + debugLog(msg, "Failed to restore ring because next node order " + + "received is not as expected [expected=" + next.internalOrder() + ", rcvd=" + nextOrder + ", id=" + next.id() + ']'); break; @@ -2413,7 +2484,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Initialized connection with next node: " + next.id()); if (debugMode) - debugLog("Initialized connection with next node: " + next.id()); + debugLog(msg, "Initialized connection with next node: " + next.id()); errs = null; @@ -2477,13 +2548,20 @@ class ServerImpl extends TcpDiscoveryImpl { assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage; - if (failure || forceSndPending) { + boolean sndPending= + (newNextNode && ring.minimumNodeVersion().compareTo(CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE) >= 0) || + failure || + forceSndPending; + + if (sndPending) { if (log.isDebugEnabled()) log.debug("Pending messages will be sent [failure=" + failure + + ", newNextNode=" + newNextNode + ", forceSndPending=" + forceSndPending + ']'); if (debugMode) - debugLog("Pending messages will be sent [failure=" + failure + + debugLog(msg, "Pending messages will be sent [failure=" + failure + + ", newNextNode=" + newNextNode + ", forceSndPending=" + forceSndPending + ']'); for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { @@ -2513,7 +2591,7 @@ class ServerImpl extends TcpDiscoveryImpl { ", res=" + res + ']'); if (debugMode) - debugLog("Pending message has been sent to next node [msgId=" + msg.id() + + debugLog(msg, "Pending message has been sent to next node [msgId=" + msg.id() + ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() + ", res=" + res + ']'); @@ -2540,6 +2618,14 @@ class ServerImpl extends TcpDiscoveryImpl { if (timeoutHelper == null) timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + if (!failedNodes.isEmpty()) { + for (TcpDiscoveryNode failedNode : failedNodes) { + assert !failedNode.equals(next) : failedNode; + + msg.addFailedNode(failedNode.id()); + } + } + writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -2548,15 +2634,17 @@ class ServerImpl extends TcpDiscoveryImpl { onMessageExchanged(); - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("Message has been sent to next node [msg=" + msg + ", next=" + next.id() + ", res=" + res + ']'); + } - if (debugMode) - debugLog("Message has been sent to next node [msg=" + msg + + if (debugMode) { + debugLog(msg, "Message has been sent to next node [msg=" + msg + ", next=" + next.id() + ", res=" + res + ']'); + } } finally { clearNodeAddedMessage(msg); @@ -2635,8 +2723,6 @@ class ServerImpl extends TcpDiscoveryImpl { next = null; - searchNext = true; - errs = null; } else @@ -2665,25 +2751,30 @@ class ServerImpl extends TcpDiscoveryImpl { msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, n.id(), n.internalOrder())); if (!sent) { + assert next == null : next; + if (log.isDebugEnabled()) log.debug("Pending messages will be resent to local node"); if (debugMode) - log.debug("Pending messages will be resent to local node"); + debugLog(msg, "Pending messages will be resent to local node"); for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId, pendingMsgs.customDiscardId); + pendingMsg.senderNodeId(locNodeId); + msgWorker.addMessage(pendingMsg); if (log.isDebugEnabled()) log.debug("Pending message has been sent to local node [msg=" + msg.id() + - ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']'); + ", pendingMsgId=" + pendingMsg + ']'); - if (debugMode) - debugLog("Pending message has been sent to local node [msg=" + msg.id() + - ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']'); + if (debugMode) { + debugLog(msg, "Pending message has been sent to local node [msg=" + msg.id() + + ", pendingMsgId=" + pendingMsg + ']'); + } } } @@ -3317,15 +3408,17 @@ class ServerImpl extends TcpDiscoveryImpl { if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " + - "coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode=" - + locNode + ", msg=" + msg + ']'); + "coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode=" + + locNode + ", msg=" + msg + ']'); + } - if (debugMode) - debugLog("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " + - "coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode=" - + locNode + ", msg=" + msg + ']'); + if (debugMode) { + debugLog(msg, "Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " + + "coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode=" + + locNode + ", msg=" + msg + ']'); + } return; } @@ -3338,7 +3431,7 @@ class ServerImpl extends TcpDiscoveryImpl { ", msg=" + msg + ']'); if (debugMode) - debugLog("Discarding node added message since new node's order is less than " + + debugLog(msg, "Discarding node added message since new node's order is less than " + "max order in ring [ring=" + ring + ", node=" + node + ", locNode=" + locNode + ", msg=" + msg + ']'); @@ -3427,6 +3520,8 @@ class ServerImpl extends TcpDiscoveryImpl { spi.onExchange(node.id(), node.id(), data, U.gridClassLoader()); msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id())); + + processMessageFailedNodes(msg); } if (log.isDebugEnabled()) @@ -3447,6 +3542,9 @@ class ServerImpl extends TcpDiscoveryImpl { spi.gridStartTime = msg.gridStartTime(); for (TcpDiscoveryNode n : top) { + assert n.internalOrder() < node.internalOrder() : + "Invalid node [topNode=" + n + ", added=" + node + ']'; + // Make all preceding nodes and local node visible. n.visible(true); } @@ -3500,6 +3598,8 @@ class ServerImpl extends TcpDiscoveryImpl { for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader()); } + + processMessageFailedNodes(msg); } if (sendMessageToRemotes(msg)) @@ -3733,7 +3833,7 @@ class ServerImpl extends TcpDiscoveryImpl { interruptPing(leavingNode); - assert leftNode != null; + assert leftNode != null : msg; if (log.isDebugEnabled()) log.debug("Removed node from topology: " + leftNode); @@ -3887,6 +3987,8 @@ class ServerImpl extends TcpDiscoveryImpl { } if (node != null) { + assert !node.isLocal() : msg; + synchronized (mux) { failedNodes.add(node); } @@ -4036,32 +4138,46 @@ class ServerImpl extends TcpDiscoveryImpl { return; } + TcpDiscoveryStatusCheckMessage msg0 = msg; + + if (F.contains(msg.failedNodes(), msg.creatorNodeId())) { + msg0 = new TcpDiscoveryStatusCheckMessage(msg); + + msg0.failedNodes(null); + + for (UUID failedNodeId : msg.failedNodes()) { + if (!failedNodeId.equals(msg.creatorNodeId())) + msg0.addFailedNode(failedNodeId); + } + } + try { - trySendMessageDirectly(msg.creatorNode(), msg); + trySendMessageDirectly(msg0.creatorNode(), msg0); if (log.isDebugEnabled()) log.debug("Responded to status check message " + - "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']'); + "[recipient=" + msg0.creatorNodeId() + ", status=" + msg0.status() + ']'); } catch (IgniteSpiException e) { if (e.hasCause(SocketException.class)) { if (log.isDebugEnabled()) log.debug("Failed to respond to status check message (connection " + - "refused) [recipient=" + msg.creatorNodeId() + ", status=" + - msg.status() + ']'); + "refused) [recipient=" + msg0.creatorNodeId() + ", status=" + + msg0.status() + ']'); onException("Failed to respond to status check message (connection refused) " + - "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e); + "[recipient=" + msg0.creatorNodeId() + ", status=" + msg0.status() + ']', e); } - else { - if (pingNode(msg.creatorNode())) + else if (!spi.isNodeStopping0()) { + if (pingNode(msg0.creatorNode())) // Node exists and accepts incoming connections. U.error(log, "Failed to respond to status check message [recipient=" + - msg.creatorNodeId() + ", status=" + msg.status() + ']', e); - else if (log.isDebugEnabled()) - log.debug("Failed to respond to status check message (did the node " + - "stop?) [recipient=" + msg.creatorNodeId() + ", status=" + msg.status() - + ']'); + msg0.creatorNodeId() + ", status=" + msg0.status() + ']', e); + else if (log.isDebugEnabled()) { + log.debug("Failed to respond to status check message (did the node stop?)" + + "[recipient=" + msg0.creatorNodeId() + + ", status=" + msg0.status() + ']'); + } } } } @@ -4364,27 +4480,42 @@ class ServerImpl extends TcpDiscoveryImpl { */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { if (isLocalNodeCoordinator()) { - if (!joiningNodes.isEmpty()) { + boolean delayMsg; + + assert ring.minimumNodeVersion() != null : ring; + + if (ring.minimumNodeVersion().compareTo(CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE) >= 0) + delayMsg = msg.topologyVersion() == 0L && !joiningNodes.isEmpty(); + else + delayMsg = !joiningNodes.isEmpty(); + + if (delayMsg) { + if (log.isDebugEnabled()) { + log.debug("Delay custom message processing, there are joining nodes [msg=" + msg + + ", joiningNodes=" + joiningNodes + ']'); + } + pendingCustomMsgs.add(msg); return; } - boolean sndNext = !msg.verified(); - - if (sndNext) { + if (!msg.verified()) { msg.verify(getLocalNodeId()); msg.topologyVersion(ring.topologyVersion()); - if (pendingMsgs.procCustomMsgs.add(msg.id())) + if (pendingMsgs.procCustomMsgs.add(msg.id())) { notifyDiscoveryListener(msg); - else - sndNext = false; - } - if (sndNext && ring.hasRemoteNodes()) - sendMessageAcrossRing(msg); + if (sendMessageToRemotes(msg)) + sendMessageAcrossRing(msg); + else + processCustomMessage(msg); + } + } else { + addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true)); + spi.stats.onRingMessageReceived(msg); DiscoverySpiCustomMessage msgObj = null; @@ -4401,16 +4532,21 @@ class ServerImpl extends TcpDiscoveryImpl { if (nextMsg != null) { try { - addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg, - spi.marsh.marshal(nextMsg))); + TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage( + getLocalNodeId(), nextMsg, spi.marsh.marshal(nextMsg)); + + ackMsg.topologyVersion(msg.topologyVersion()); + + processCustomMessage(ackMsg); + + if (ackMsg.verified()) + msgHist.add(ackMsg); } catch (IgniteCheckedException e) { U.error(log, "Failed to marshal discovery custom message.", e); } } } - - addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true)); } } else { @@ -4428,9 +4564,8 @@ class ServerImpl extends TcpDiscoveryImpl { } if (msg.verified() && state0 == CONNECTED && pendingMsgs.procCustomMsgs.add(msg.id())) { - assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes + ", msg=" + msg + ", loc=" + locNode.id() + - ", topver=" + ring.topologyVersion(); - assert msg.topologyVersion() == ring.topologyVersion() : "msg: " + msg + ", topver=" + ring.topologyVersion(); + assert msg.topologyVersion() == ring.topologyVersion() : + "msg: " + msg + ", topVer=" + ring.topologyVersion(); notifyDiscoveryListener(msg); } @@ -4441,6 +4576,38 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed node + * is still in the ring. + */ + private void checkFailedNodesList() { + List<TcpDiscoveryNodeFailedMessage> msgs = null; + + synchronized (mux) { + for (Iterator<TcpDiscoveryNode> it = failedNodes.iterator(); it.hasNext();) { + TcpDiscoveryNode node = it.next(); + + if (ring.node(node.id()) != null) { + if (msgs == null) + msgs = new ArrayList<>(failedNodes.size()); + + msgs.add(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), node.id(), node.internalOrder())); + } + else + it.remove(); + } + } + + if (msgs != null) { + for (TcpDiscoveryNodeFailedMessage msg : msgs) { + if (log.isDebugEnabled()) + log.debug("Add node failed message for node from failed nodes list: " + msg); + + addMessage(msg); + } + } + } + + /** * Checks and flushes custom event messages if no nodes are attempting to join the grid. */ private void checkPendingCustomMessages() { @@ -4640,10 +4807,10 @@ class ServerImpl extends TcpDiscoveryImpl { synchronized (mux) { readers.add(reader); - - reader.start(); } + reader.start(); + spi.stats.onServerSocketInitialized(U.currentTimeMillis() - tstamp); } } @@ -4861,9 +5028,10 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Initialized connection with remote node [nodeId=" + nodeId + ", client=" + req.client() + ']'); - if (debugMode) - debugLog("Initialized connection with remote node [nodeId=" + nodeId + + if (debugMode) { + debugLog(msg, "Initialized connection with remote node [nodeId=" + nodeId + ", client=" + req.client() + ']'); + } } catch (IOException e) { if (log.isDebugEnabled()) @@ -4932,7 +5100,7 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onMessageReceived(msg); if (debugMode && recordable(msg)) - debugLog("Message has been received: " + msg); + debugLog(msg, "Message has been received: " + msg); if (msg instanceof TcpDiscoveryConnectionCheckMessage) { spi.writeToSocket(msg, sock, RES_OK, socketTimeout); http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index 2786d0b..1aef728 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -37,6 +37,7 @@ import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiThread; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.jetbrains.annotations.Nullable; /** @@ -99,9 +100,10 @@ abstract class TcpDiscoveryImpl { } /** + * @param discoMsg Discovery message. * @param msg Message. */ - protected void debugLog(String msg) { + protected void debugLog(@Nullable TcpDiscoveryAbstractMessage discoMsg, String msg) { assert debugMode; String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) + http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java index 7ca092c..eb0f74a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.util.typedef.PN; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteProductVersion; import org.jetbrains.annotations.Nullable; import java.util.Collection; @@ -88,6 +89,23 @@ public class TcpDiscoveryNodesRing { @GridToStringExclude private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + /** */ + private IgniteProductVersion minNodeVer; + + /** + * @return Minimum node version. + */ + public IgniteProductVersion minimumNodeVersion() { + rwLock.readLock().lock(); + + try { + return minNodeVer; + } + finally { + rwLock.readLock().unlock(); + } + } + /** * Sets local node. * @@ -225,6 +243,8 @@ public class TcpDiscoveryNodesRing { nodeOrder = node.internalOrder(); maxInternalOrder = node.internalOrder(); + + initializeMinimumVersion(); } finally { rwLock.writeLock().unlock(); @@ -295,6 +315,8 @@ public class TcpDiscoveryNodesRing { } nodeOrder = topVer; + + initializeMinimumVersion(); } finally { rwLock.writeLock().unlock(); @@ -341,6 +363,8 @@ public class TcpDiscoveryNodesRing { nodes.remove(rmv); } + initializeMinimumVersion(); + return rmv; } finally { @@ -372,6 +396,8 @@ public class TcpDiscoveryNodesRing { maxInternalOrder = 0; topVer = 0; + + minNodeVer = locNode.version(); } finally { rwLock.writeLock().unlock(); @@ -451,61 +477,8 @@ public class TcpDiscoveryNodesRing { * topology contains less than two nodes. */ @Nullable public TcpDiscoveryNode nextNode(@Nullable Collection<TcpDiscoveryNode> excluded) { - assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode); - - rwLock.readLock().lock(); - - try { - Collection<TcpDiscoveryNode> filtered = serverNodes(excluded); - - if (filtered.size() < 2) - return null; - - Iterator<TcpDiscoveryNode> iter = filtered.iterator(); - - while (iter.hasNext()) { - TcpDiscoveryNode node = iter.next(); - - if (locNode.equals(node)) - break; - } - - return iter.hasNext() ? iter.next() : F.first(filtered); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Finds previous node in the topology. - * - * @return Previous node. - */ - @Nullable public TcpDiscoveryNode previousNode() { - rwLock.readLock().lock(); - - try { - if (nodes.size() < 2) - return null; - - return previousNode(null); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Finds previous node in the topology filtering excluded nodes from search. - * - * @param excluded Nodes to exclude from the search (optional). If provided, - * cannot contain local node. - * @return Previous node or {@code null} if all nodes were filtered out or - * topology contains less than two nodes. - */ - @Nullable public TcpDiscoveryNode previousNode(@Nullable Collection<TcpDiscoveryNode> excluded) { - assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode); + assert locNode.internalOrder() > 0 : locNode; + assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode) : excluded; rwLock.readLock().lock(); @@ -638,6 +611,18 @@ public class TcpDiscoveryNodesRing { }); } + /** + * + */ + private void initializeMinimumVersion() { + minNodeVer = null; + + for (TcpDiscoveryNode node : nodes) { + if (minNodeVer == null || node.version().compareTo(minNodeVer) < 0) + minNodeVer = node.version(); + } + } + /** {@inheritDoc} */ @Override public String toString() { rwLock.readLock().lock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 875d18e..9cb47af 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -19,10 +19,15 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.io.Externalizable; import java.io.Serializable; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; /** * Base class to implement discovery messages. @@ -62,6 +67,10 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { /** Pending message index. */ private short pendingIdx; + /** */ + @GridToStringInclude + private Set<UUID> failedNodes; + /** * Default no-arg constructor for {@link Externalizable} interface. */ @@ -236,6 +245,34 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { return false; } + /** + * Adds node ID to the failed nodes list. + * + * @param nodeId Node ID. + */ + public void addFailedNode(UUID nodeId) { + assert nodeId != null; + + if (failedNodes == null) + failedNodes = new HashSet<>(); + + failedNodes.add(nodeId); + } + + /** + * @param failedNodes Failed nodes. + */ + public void failedNodes(@Nullable Set<UUID> failedNodes) { + this.failedNodes = failedNodes; + } + + /** + * @return Failed nodes IDs. + */ + @Nullable public Collection<UUID> failedNodes() { + return failedNodes; + } + /** {@inheritDoc} */ @Override public final boolean equals(Object obj) { if (this == obj) http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java index 6118f4d..70b0080 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java @@ -62,6 +62,17 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage } /** + * @param msg Message to copy. + */ + public TcpDiscoveryStatusCheckMessage(TcpDiscoveryStatusCheckMessage msg) { + super(msg); + + this.creatorNode = msg.creatorNode; + this.failedNodeId = msg.failedNodeId; + this.status = msg.status; + } + + /** * Gets creator node. * * @return Creator node. http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 55474dc..5053c2d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; @@ -212,6 +213,22 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { public void testMultiThreadedClientsServersRestart() throws Throwable { fail("https://issues.apache.org/jira/browse/IGNITE-1123"); + multiThreadedClientsServersRestart(GRID_CNT, CLIENT_GRID_CNT); + } + + /** + * @throws Exception If any error occurs. + */ + public void _testMultiThreadedServersRestart() throws Throwable { + multiThreadedClientsServersRestart(GRID_CNT * 2, 0); + } + + /** + * @param srvs Number of servers. + * @param clients Number of clients. + * @throws Exception If any error occurs. + */ + private void multiThreadedClientsServersRestart(int srvs, int clients) throws Throwable { final AtomicBoolean done = new AtomicBoolean(); try { @@ -219,91 +236,95 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min."); - startGridsMultiThreaded(GRID_CNT); - - clientFlagGlobal = true; + startGridsMultiThreaded(srvs); - startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); + IgniteInternalFuture<?> clientFut = null; final AtomicReference<Throwable> error = new AtomicReference<>(); - final BlockingQueue<Integer> clientStopIdxs = new LinkedBlockingQueue<>(); - - for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++) - clientStopIdxs.add(i); + if (clients > 0) { + clientFlagGlobal = true; - final AtomicInteger clientStartIdx = new AtomicInteger(9000); + startGridsMultiThreaded(srvs, clients); - IgniteInternalFuture<?> fut1 = multithreadedAsync( - new Callable<Object>() { - @Override public Object call() throws Exception { - try { - clientFlagPerThread.set(true); + final BlockingQueue<Integer> clientStopIdxs = new LinkedBlockingQueue<>(); - while (!done.get() && error.get() == null) { - Integer stopIdx = clientStopIdxs.take(); + for (int i = srvs; i < srvs + clients; i++) + clientStopIdxs.add(i); - log.info("Stop client: " + stopIdx); + final AtomicInteger clientStartIdx = new AtomicInteger(9000); - stopGrid(stopIdx); + clientFut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + try { + clientFlagPerThread.set(true); while (!done.get() && error.get() == null) { - // Generate unique name to simplify debugging. - int startIdx = clientStartIdx.getAndIncrement(); + Integer stopIdx = clientStopIdxs.take(); - log.info("Start client: " + startIdx); + log.info("Stop client: " + stopIdx); - UUID id = UUID.randomUUID(); + stopGrid(stopIdx); - nodeId.set(id); + while (!done.get() && error.get() == null) { + // Generate unique name to simplify debugging. + int startIdx = clientStartIdx.getAndIncrement(); - try { - Ignite ignite = startGrid(startIdx); + log.info("Start client: " + startIdx); - assertTrue(ignite.configuration().isClientMode()); + UUID id = UUID.randomUUID(); - clientStopIdxs.add(startIdx); + nodeId.set(id); - break; - } - catch (Exception e) { - if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) || - X.hasCause(e, IgniteClientDisconnectedException.class)) - log.info("Client disconnected: " + e); - else if (X.hasCause(e, ClusterTopologyCheckedException.class)) - log.info("Client failed to start: " + e); - else { - if (failedNodes.contains(id) && X.hasCause(e, IgniteSpiException.class)) - log.info("Client failed: " + e); - else - throw e; + try { + Ignite ignite = startGrid(startIdx); + + assertTrue(ignite.configuration().isClientMode()); + + clientStopIdxs.add(startIdx); + + break; + } + catch (Exception e) { + if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) || + X.hasCause(e, IgniteClientDisconnectedException.class)) + log.info("Client disconnected: " + e); + else if (X.hasCause(e, ClusterTopologyCheckedException.class)) + log.info("Client failed to start: " + e); + else { + if (failedNodes.contains(id) && X.hasCause(e, IgniteSpiException.class)) + log.info("Client failed: " + e); + else + throw e; + } } } } } - } - catch (Throwable e) { - log.error("Unexpected error: " + e, e); + catch (Throwable e) { + log.error("Unexpected error: " + e, e); - error.compareAndSet(null, e); + error.compareAndSet(null, e); + + return null; + } return null; } - - return null; - } - }, - CLIENT_GRID_CNT, - "client-restart"); + }, + clients, + "client-restart"); + } final BlockingQueue<Integer> srvStopIdxs = new LinkedBlockingQueue<>(); - for (int i = 0; i < GRID_CNT; i++) + for (int i = 0; i < srvs; i++) srvStopIdxs.add(i); - final AtomicInteger srvStartIdx = new AtomicInteger(GRID_CNT + CLIENT_GRID_CNT); + final AtomicInteger srvStartIdx = new AtomicInteger(srvs + clients); - IgniteInternalFuture<?> fut2 = multithreadedAsync( + IgniteInternalFuture<?> srvFut = multithreadedAsync( new Callable<Object>() { @Override public Object call() throws Exception { try { @@ -312,6 +333,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { while (!done.get() && error.get() == null) { int stopIdx = srvStopIdxs.take(); + U.sleep(50); + + Thread.currentThread().setName("stop-server-" + getTestGridName(stopIdx)); + log.info("Stop server: " + stopIdx); stopGrid(stopIdx); @@ -319,13 +344,20 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { // Generate unique name to simplify debugging. int startIdx = srvStartIdx.getAndIncrement(); + Thread.currentThread().setName("start-server-" + getTestGridName(startIdx)); + log.info("Start server: " + startIdx); - Ignite ignite = startGrid(startIdx); + try { + Ignite ignite = startGrid(startIdx); - assertFalse(ignite.configuration().isClientMode()); + assertFalse(ignite.configuration().isClientMode()); - srvStopIdxs.add(startIdx); + srvStopIdxs.add(startIdx); + } + catch (IgniteCheckedException e) { + log.info("Failed to start: " + e); + } } } catch (Throwable e) { @@ -339,7 +371,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { return null; } }, - GRID_CNT - 1, + srvs - 1, "server-restart"); final long timeToExec = getTestTimeout() - 60_000; @@ -356,8 +388,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { done.set(true); - fut1.cancel(); - fut2.cancel(); + if (clientFut != null) + clientFut.cancel(); + + srvFut.cancel(); throw err; } @@ -367,8 +401,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { done.set(true); - fut1.get(); - fut2.get(); + if (clientFut != null) + clientFut.get(); + + srvFut.get(); } finally { done.set(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java index 8b94f54..7beeb41 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -196,8 +197,15 @@ public class TcpDiscoveryRestartTest extends GridCommonAbstractTest { /** * @param nodeId Node ID. + * @throws Exception If failed. */ - void checkEvents(UUID nodeId) { + void checkEvents(final UUID nodeId) throws Exception { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return joinIds.contains(nodeId) && leftIds.contains(nodeId); + } + }, 5000); + assertTrue("No join event: " + nodeId, joinIds.contains(nodeId)); assertTrue("No left event: " + nodeId, leftIds.contains(nodeId)); http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 51d8a2d..379a3a6 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -21,16 +21,19 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -38,6 +41,8 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.Ignition; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; @@ -46,12 +51,16 @@ import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.IgniteSpiException; @@ -64,8 +73,8 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; @@ -94,7 +103,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private UUID nodeId; /** */ - private TcpDiscoverySpi nodeSpi; + private static ThreadLocal<TcpDiscoverySpi> nodeSpi = new ThreadLocal<>(); /** * @throws Exception If fails. @@ -104,15 +113,17 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @SuppressWarnings({"IfMayBeConditional", "deprecation"}) @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi spi = nodeSpi; + TcpDiscoverySpi spi = nodeSpi.get(); - if (spi == null) + if (spi == null) { spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ? new TestTcpDiscoverySpi() : new TcpDiscoverySpi(); + } + else + nodeSpi.set(null); discoMap.put(gridName, spi); @@ -176,6 +187,13 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { return cfg; } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + discoMap = null; + + super.afterTest(); + } + /** * @throws Exception If any error occurs. */ @@ -1202,11 +1220,11 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private void customEventRace1(final boolean cacheStartFrom1, boolean stopCrd) throws Exception { TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi(); - nodeSpi = spi0; + nodeSpi.set(spi0); final Ignite ignite0 = startGrid(0); - nodeSpi = new TestCustomEventRaceSpi(); + nodeSpi.set(new TestCustomEventRaceSpi()); final Ignite ignite1 = startGrid(1); @@ -1221,7 +1239,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { @Override public Void call() throws Exception { log.info("Start 2"); - nodeSpi = new TestCustomEventRaceSpi(); + nodeSpi.set(new TestCustomEventRaceSpi()); Ignite ignite2 = startGrid(2); @@ -1271,7 +1289,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { assertEquals(1, cache.get(1)); - nodeSpi = new TestCustomEventRaceSpi(); + nodeSpi.set(new TestCustomEventRaceSpi()); Ignite ignite = startGrid(3); @@ -1314,15 +1332,15 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private void customEventCoordinatorFailure(boolean twoNodes) throws Exception { TestCustomEventCoordinatorFailureSpi spi0 = new TestCustomEventCoordinatorFailureSpi(); - nodeSpi = spi0; + nodeSpi.set(spi0); Ignite ignite0 = startGrid(0); - nodeSpi = new TestCustomEventCoordinatorFailureSpi(); + nodeSpi.set(new TestCustomEventCoordinatorFailureSpi()); Ignite ignite1 = startGrid(1); - nodeSpi = new TestCustomEventCoordinatorFailureSpi(); + nodeSpi.set(new TestCustomEventCoordinatorFailureSpi()); Ignite ignite2 = twoNodes ? null : startGrid(2); @@ -1366,7 +1384,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { log.info("Try start one more node."); - nodeSpi = new TestCustomEventCoordinatorFailureSpi(); + nodeSpi.set(new TestCustomEventCoordinatorFailureSpi()); Ignite ignite = startGrid(twoNodes ? 2 : 3); @@ -1381,6 +1399,421 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** + * Coordinator is added in failed list during node start. + * + * @throws Exception If failed. + */ + public void testFailedNodes1() throws Exception { + try { + final int FAIL_ORDER = 3; + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + final Ignite ignite0 = startGrid(0); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + startGrid(1); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + Ignite ignite2 = startGrid(2); + + assertEquals(2, ignite2.cluster().nodes().size()); + + waitNodeStop(ignite0.name()); + + tryCreateCache(2); + } + finally { + stopAllGrids(); + } + } + + /** + * Coordinator is added in failed list, concurrent nodes start. + * + * @throws Exception If failed. + */ + public void testFailedNodes2() throws Exception { + try { + final int FAIL_ORDER = 3; + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + Ignite ignite0 = startGrid(0); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + startGrid(1); + + final AtomicInteger nodeIdx = new AtomicInteger(1); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + int idx = nodeIdx.incrementAndGet(); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + startGrid(idx); + + return null; + } + }, 3, "start-node"); + + Ignite ignite2 = ignite(2); + + waitForRemoteNodes(ignite2, 3); + + waitNodeStop(ignite0.name()); + + tryCreateCache(4); + } + finally { + stopAllGrids(); + } + } + + /** + * Coordinator is added in failed list during node start, test with two nodes. + * + * @throws Exception If failed. + */ + public void testFailedNodes3() throws Exception { + try { + nodeSpi.set(new TestFailedNodesSpi(-1)); + + Ignite ignite0 = startGrid(0); + + nodeSpi.set(new TestFailedNodesSpi(2)); + + Ignite ignite1 = startGrid(1); + + assertEquals(1, ignite1.cluster().nodes().size()); + + waitNodeStop(ignite0.name()); + + ignite1.getOrCreateCache(new CacheConfiguration<>()).put(1, 1); + + startGrid(2); + + assertEquals(2, ignite1.cluster().nodes().size()); + + tryCreateCache(2); + } + finally { + stopAllGrids(); + } + } + + /** + * Coordinator is added in failed list during node start, but node detected failure dies before + * sending {@link TcpDiscoveryNodeFailedMessage}. + * + * @throws Exception If failed. + */ + public void testFailedNodes4() throws Exception { + try { + final int FAIL_ORDER = 3; + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + final Ignite ignite0 = startGrid(0); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + Ignite ignite1 = startGrid(1); + + TestFailedNodesSpi spi = new TestFailedNodesSpi(FAIL_ORDER); + + spi.stopBeforeSndFail = true; + + nodeSpi.set(spi); + + Ignite ignite2 = startGrid(2); + + waitNodeStop(ignite2.name()); + + log.info("Try start new node."); + + Ignite ignite3 = startGrid(3); + + waitNodeStop(ignite0.name()); + + assertEquals(2, ignite1.cluster().nodes().size()); + assertEquals(2, ignite3.cluster().nodes().size()); + + tryCreateCache(2); + } + finally { + stopAllGrids(); + } + } + + /** + * Adds some node in failed list after join process finished. + * + * @throws Exception If failed. + */ + public void testFailedNodes5() throws Exception { + try { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int iter = 0; iter < 3; iter++) { + final int NODES = iter == 0 ? 2 : rnd.nextInt(3, 6); + + for (int i = 0; i < NODES; i++) { + nodeSpi.set(new TestFailedNodesSpi(-1)); + + startGrid(i); + } + + Map<Long, Ignite> nodes = new HashMap<>(); + + for (int i = 0; i < NODES; i++) { + Ignite ignite = ignite(i); + + nodes.put(ignite.cluster().localNode().order(), ignite); + } + + Ignite ignite = ignite(rnd.nextInt(NODES)); + + log.info("Iteration [iter=" + iter + ", nodes=" + NODES + ", failFrom=" + ignite.name() + ']'); + + TestFailedNodesSpi spi = (TestFailedNodesSpi)ignite.configuration().getDiscoverySpi(); + + spi.failSingleMsg = true; + + long order = ignite.cluster().localNode().order(); + + long nextOrder = order == NODES ? 1 : order + 1; + + Ignite failingNode = nodes.get(nextOrder); + + assertNotNull(failingNode); + + waitNodeStop(failingNode.name()); + + Ignite newNode = startGrid(NODES); + + assertEquals(NODES, newNode.cluster().nodes().size()); + + tryCreateCache(NODES); + + stopAllGrids(); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testCustomEventAckNotSend() throws Exception { + try { + TestCustomerEventAckSpi spi0 = new TestCustomerEventAckSpi(); + + nodeSpi.set(spi0); + + Ignite ignite0 = startGrid(0); + + nodeSpi.set(new TestCustomerEventAckSpi()); + + Ignite ignite1 = startGrid(1); + + spi0.stopBeforeSndAck = true; + + ignite1.message().remoteListen("test", new DummyPredicate()); + + waitNodeStop(ignite0.name()); + + startGrid(2); + } + finally { + stopAllGrids(); + } + } + + /** + * @param nodeName Node name. + * @throws Exception If failed. + */ + private void waitNodeStop(final String nodeName) throws Exception { + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + Ignition.ignite(nodeName); + + return false; + } + catch (IgniteIllegalStateException e) { + return true; + } + } + }, 10_000); + + if (!wait) + U.dumpThreads(log); + + assertTrue("Failed to wait for node stop.", wait); + } + + /** + * @param expNodes Expected nodes number. + */ + private void tryCreateCache(int expNodes) { + List<Ignite> allNodes = G.allGrids(); + + assertEquals(expNodes, allNodes.size()); + + int cntr = 0; + + for (Ignite ignite : allNodes) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setName("cache-" + cntr++); + + log.info("Try create cache [node=" + ignite.name() + ", cache=" + ccfg.getName() + ']'); + + ignite.getOrCreateCache(ccfg).put(1, 1); + } + } + + /** + * + */ + static class DummyPredicate implements IgniteBiPredicate<UUID, Object> { + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Object o) { + return true; + } + } + + /** + * + */ + private static class TestCustomerEventAckSpi extends TcpDiscoverySpi { + /** */ + private volatile boolean stopBeforeSndAck; + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, + TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout, + long timeout) throws IOException, IgniteCheckedException { + if (stopBeforeSndAck) { + if (msg instanceof TcpDiscoveryCustomEventMessage) { + try { + DiscoveryCustomMessage custMsg = GridTestUtils.getFieldValue( + ((TcpDiscoveryCustomEventMessage)msg).message(marsh), "delegate"); + + if (custMsg instanceof StartRoutineAckDiscoveryMessage) { + log.info("Skip message send and stop node: " + msg); + + sock.close(); + + GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + ignite.close(); + + return null; + } + }, "stop-node"); + + return; + } + } + catch (Throwable e) { + fail("Unexpected error: " + e); + } + } + } + + super.writeToSocket(sock, msg, bout, timeout); + } + } + + /** + * Simulate scenario when node detects node failure trying to send message, but node still alive. + */ + private static class TestFailedNodesSpi extends TcpDiscoverySpi { + /** */ + private AtomicBoolean failMsg = new AtomicBoolean(); + + /** */ + private int failOrder; + + /** */ + private boolean stopBeforeSndFail; + + /** */ + private boolean stop; + + /** */ + private volatile boolean failSingleMsg; + + /** + * @param failOrder Spi fails connection if local node order equals to this order. + */ + TestFailedNodesSpi(int failOrder) { + this.failOrder = failOrder; + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, + TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout, + long timeout) throws IOException, IgniteCheckedException { + if (stop) + return; + + if (failSingleMsg) { + failSingleMsg = false; + + log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg + ']'); + + sock.close(); + + throw new SocketTimeoutException(); + } + + if (locNode.internalOrder() == failOrder && + (msg instanceof TcpDiscoveryNodeAddedMessage) && + failMsg.compareAndSet(false, true)) { + log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg + ']'); + + sock.close(); + + throw new SocketTimeoutException(); + } + + if (stopBeforeSndFail && + locNode.internalOrder() == failOrder && + (msg instanceof TcpDiscoveryNodeFailedMessage)) { + stop = true; + + log.info("Skip messages send and stop node [locNode=" + locNode + ", msg=" + msg + ']'); + + sock.close(); + + GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + ignite.close(); + + return null; + } + }, "stop-node"); + + return; + } + + super.writeToSocket(sock, msg, bout, timeout); + } + } + + /** * */ private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySpi { http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java index 97ba5cf..1e710ee 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java @@ -58,14 +58,20 @@ public class TcpDiscoveryMulticastIpFinderSelfTest try { ipFinder1 = ipFinder(); + ipFinder1.setResponseWaitTime(1000); + ipFinder1.setAddressRequestAttempts(10); ipFinder2 = new TcpDiscoveryMulticastIpFinder(); + ipFinder2.setResponseWaitTime(1000); + ipFinder2.setAddressRequestAttempts(10); ipFinder2.setMulticastGroup(ipFinder1.getMulticastGroup()); ipFinder2.setMulticastPort(ipFinder1.getMulticastPort()); ipFinder3 = new TcpDiscoveryMulticastIpFinder(); + ipFinder3.setResponseWaitTime(1000); + ipFinder3.setAddressRequestAttempts(10); ipFinder3.setMulticastGroup(ipFinder1.getMulticastGroup()); ipFinder3.setMulticastPort(ipFinder1.getMulticastPort()); @@ -81,21 +87,13 @@ public class TcpDiscoveryMulticastIpFinderSelfTest ipFinder2.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host2", 1002))); ipFinder3.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host3", 1003))); - for (int i = 0; i < 5; i++) { - Collection<InetSocketAddress> addrs1 = ipFinder1.getRegisteredAddresses(); - Collection<InetSocketAddress> addrs2 = ipFinder2.getRegisteredAddresses(); - Collection<InetSocketAddress> addrs3 = ipFinder3.getRegisteredAddresses(); - - if (addrs1.size() != 1 || addrs2.size() != 2 || addrs3.size() != 3) { - info("Addrs1: " + addrs1); - info("Addrs2: " + addrs2); - info("Addrs2: " + addrs3); - - Thread.sleep(1000); - } - else - break; - } + Collection<InetSocketAddress> addrs1 = ipFinder1.getRegisteredAddresses(); + Collection<InetSocketAddress> addrs2 = ipFinder2.getRegisteredAddresses(); + Collection<InetSocketAddress> addrs3 = ipFinder3.getRegisteredAddresses(); + + info("Addrs1: " + addrs1); + info("Addrs2: " + addrs2); + info("Addrs2: " + addrs3); assertEquals(1, ipFinder1.getRegisteredAddresses().size()); assertEquals(2, ipFinder2.getRegisteredAddresses().size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 41d4b4a..3e41979 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1120,16 +1120,31 @@ public abstract class GridAbstractTest extends TestCase { if (gridName != null && gridName.matches(".*\\d")) { String idStr = UUID.randomUUID().toString(); - char[] chars = idStr.toCharArray(); + if (gridName.startsWith(getTestGridName())) { + String idxStr = String.valueOf(getTestGridIndex(gridName)); - chars[0] = gridName.charAt(gridName.length() - 1); - chars[1] = '0'; + while (idxStr.length() < 5) + idxStr = '0' + idxStr; - chars[chars.length - 3] = '0'; - chars[chars.length - 2] = '0'; - chars[chars.length - 1] = gridName.charAt(gridName.length() - 1); + char[] chars = idStr.toCharArray(); - cfg.setNodeId(UUID.fromString(new String(chars))); + for (int i = 0; i < idxStr.length(); i++) + chars[chars.length - idxStr.length() + i] = idxStr.charAt(i); + + cfg.setNodeId(UUID.fromString(new String(chars))); + } + else { + char[] chars = idStr.toCharArray(); + + chars[0] = gridName.charAt(gridName.length() - 1); + chars[1] = '0'; + + chars[chars.length - 3] = '0'; + chars[chars.length - 2] = '0'; + chars[chars.length - 1] = gridName.charAt(gridName.length() - 1); + + cfg.setNodeId(UUID.fromString(new String(chars))); + } } if (isMultiJvm())