ignite-1758 Discovery fixes

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/80147128
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/80147128
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/80147128

Branch: refs/heads/ignite-638
Commit: 80147128a3b07f927dec65f0a6934f6782efab5c
Parents: 5a116cb
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Nov 17 09:48:58 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Nov 17 09:48:58 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 360 +++++++++++----
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   4 +-
 .../tcp/internal/TcpDiscoveryNodesRing.java     |  95 ++--
 .../messages/TcpDiscoveryAbstractMessage.java   |  37 ++
 .../TcpDiscoveryStatusCheckMessage.java         |  11 +
 .../tcp/TcpDiscoveryMultiThreadedTest.java      | 158 ++++---
 .../discovery/tcp/TcpDiscoveryRestartTest.java  |  10 +-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 459 ++++++++++++++++++-
 .../TcpDiscoveryMulticastIpFinderSelfTest.java  |  28 +-
 .../testframework/junits/GridAbstractTest.java  |  29 +-
 10 files changed, 942 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0fe2881..ae23d0e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -159,6 +159,10 @@ class ServerImpl extends TcpDiscoveryImpl {
     private static final int ENSURED_MSG_HIST_SIZE = 
getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024 * 10);
 
     /** */
+    private static final IgniteProductVersion 
CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE =
+        IgniteProductVersion.fromString("1.5.0");
+
+    /** */
     private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 
1, 2000, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>());
 
@@ -191,10 +195,10 @@ class ServerImpl extends TcpDiscoveryImpl {
     private StatisticsPrinter statsPrinter;
 
     /** Failed nodes (but still in topology). */
-    private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
+    private final Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
 
     /** Leaving nodes (but still in topology). */
-    private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
+    private final Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
 
     /** If non-shared IP finder is used this flag shows whether IP finder 
contains local address. */
     private boolean ipFinderHasLocAddr;
@@ -1080,13 +1084,34 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 openSock = true;
 
+                TcpDiscoveryHandshakeRequest req = new 
TcpDiscoveryHandshakeRequest(locNodeId);
+
                 // Handshake.
-                spi.writeToSocket(sock, new 
TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(
-                    spi.getSocketTimeout()));
+                spi.writeToSocket(sock, req, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, 
null, timeoutHelper.nextTimeoutChunk(
                     ackTimeout0));
 
+                if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+                    boolean ignore = false;
+
+                    synchronized (failedNodes) {
+                        for (TcpDiscoveryNode failedNode : failedNodes) {
+                            if (failedNode.id().equals(res.creatorNodeId())) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Ignore response from node from 
failed list: " + res);
+
+                                ignore = true;
+
+                                break;
+                            }
+                        }
+                    }
+
+                    if (ignore)
+                        break;
+                }
+
                 if (locNodeId.equals(res.creatorNodeId())) {
                     if (log.isDebugEnabled())
                         log.debug("Handshake response from local node: " + 
res);
@@ -1104,7 +1129,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
                 if (debugMode)
-                    debugLog("Message has been sent directly to address [msg=" 
+ msg + ", addr=" + addr +
+                    debugLog(msg, "Message has been sent directly to address 
[msg=" + msg + ", addr=" + addr +
                         ", rmtNodeId=" + res.creatorNodeId() + ']');
 
                 if (log.isDebugEnabled())
@@ -1754,6 +1779,32 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
+     * Adds failed nodes specified in the received message to the local failed 
nodes list.
+     *
+     * @param msg Message.
+     */
+    private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) {
+        if (msg.failedNodes() != null) {
+            for (UUID nodeId : msg.failedNodes()) {
+                TcpDiscoveryNode failedNode = ring.node(nodeId);
+
+                if (failedNode != null) {
+                    if (!failedNode.isLocal()) {
+                        boolean added;
+
+                        synchronized (mux) {
+                            added = failedNodes.add(failedNode);
+                        }
+
+                        if (added && log.isDebugEnabled())
+                            log.debug("Added node to failed nodes list [node=" 
+ failedNode + ", msg=" + msg + ']');
+                    }
+                }
+            }
+        }
+    }
+
+    /**
      * Discovery messages history used for client reconnect.
      */
     private class EnsuredMessageHistory {
@@ -2131,10 +2182,28 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.debug("Processing message [cls=" + 
msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
 
             if (debugMode)
-                debugLog("Processing message [cls=" + 
msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+                debugLog(msg, "Processing message [cls=" + 
msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+
+            if (locNode.internalOrder() == 0) {
+                boolean process = false;
+
+                if (msg instanceof TcpDiscoveryNodeAddedMessage)
+                    process = 
((TcpDiscoveryNodeAddedMessage)msg).node().equals(locNode);
+
+                if (!process) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Ignore message, local node order is not 
initialized [msg=" + msg +
+                            ", locNode=" + locNode + ']');
+                    }
+
+                    return;
+                }
+            }
 
             spi.stats.onMessageProcessingStarted(msg);
 
+            processMessageFailedNodes(msg);
+
             if (msg instanceof TcpDiscoveryJoinRequestMessage)
                 processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
 
@@ -2200,6 +2269,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             checkHeartbeatsReceiving();
 
             checkPendingCustomMessages();
+
+            checkFailedNodesList();
         }
 
         /**
@@ -2262,50 +2333,50 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             boolean sent = false;
 
-            boolean searchNext = true;
+            boolean newNextNode = false;
 
             UUID locNodeId = getLocalNodeId();
 
             while (true) {
-                if (searchNext) {
-                    TcpDiscoveryNode newNext = ring.nextNode(failedNodes);
-
-                    if (newNext == null) {
-                        if (log.isDebugEnabled())
-                            log.debug("No next node in topology.");
+                TcpDiscoveryNode newNext = ring.nextNode(failedNodes);
 
-                        if (debugMode)
-                            debugLog("No next node in topology.");
+                if (newNext == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("No next node in topology.");
 
-                        if (ring.hasRemoteNodes() && !(msg instanceof 
TcpDiscoveryConnectionCheckMessage) &&
-                            !(msg instanceof TcpDiscoveryStatusCheckMessage && 
msg.creatorNodeId().equals(locNodeId))) {
-                            msg.senderNodeId(locNodeId);
+                    if (debugMode)
+                        debugLog(msg, "No next node in topology.");
 
-                            addMessage(msg);
-                        }
+                    if (ring.hasRemoteNodes() && !(msg instanceof 
TcpDiscoveryConnectionCheckMessage) &&
+                        !(msg instanceof TcpDiscoveryStatusCheckMessage && 
msg.creatorNodeId().equals(locNodeId))) {
+                        msg.senderNodeId(locNodeId);
 
-                        break;
+                        addMessage(msg);
                     }
 
-                    if (!newNext.equals(next)) {
-                        if (log.isDebugEnabled())
-                            log.debug("New next node [newNext=" + newNext + ", 
formerNext=" + next +
-                                ", ring=" + ring + ", failedNodes=" + 
failedNodes + ']');
+                    break;
+                }
 
-                        if (debugMode)
-                            debugLog("New next node [newNext=" + newNext + ", 
formerNext=" + next +
-                                ", ring=" + ring + ", failedNodes=" + 
failedNodes + ']');
+                if (!newNext.equals(next)) {
+                    if (log.isDebugEnabled())
+                        log.debug("New next node [newNext=" + newNext + ", 
formerNext=" + next +
+                            ", ring=" + ring + ", failedNodes=" + failedNodes 
+ ']');
 
-                        U.closeQuiet(sock);
+                    if (debugMode)
+                        debugLog(msg, "New next node [newNext=" + newNext + ", 
formerNext=" + next +
+                            ", ring=" + ring + ", failedNodes=" + failedNodes 
+ ']');
 
-                        sock = null;
+                    U.closeQuiet(sock);
 
-                        next = newNext;
-                    }
-                    else if (log.isDebugEnabled())
-                        log.debug("Next node remains the same [nextId=" + 
next.id() +
-                            ", nextOrder=" + next.internalOrder() + ']');
+                    sock = null;
+
+                    next = newNext;
+
+                    newNextNode = true;
                 }
+                else if (log.isDebugEnabled())
+                    log.debug("Next node remains the same [nextId=" + 
next.id() +
+                        ", nextOrder=" + next.internalOrder() + ']');
 
                 // Flag that shows whether next node exists and accepts 
incoming connections.
                 boolean nextNodeExists = sock != null;
@@ -2379,8 +2450,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                             "expected [expectedId=" + 
next.id() + ", rcvdId=" + nextId + ']');
 
                                     if (debugMode)
-                                        debugLog("Failed to restore ring 
because next node ID received is not as " +
-                                            "expected [expectedId=" + 
next.id() + ", rcvdId=" + nextId + ']');
+                                        debugLog(msg, "Failed to restore ring 
because next node ID received is not " +
+                                            "as expected [expectedId=" + 
next.id() + ", rcvdId=" + nextId + ']');
 
                                     break;
                                 }
@@ -2401,8 +2472,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                                     ", rcvd=" + nextOrder + ", 
id=" + next.id() + ']');
 
                                             if (debugMode)
-                                                debugLog("Failed to restore 
ring because next node order received " +
-                                                    "is not as expected 
[expected=" + next.internalOrder() +
+                                                debugLog(msg, "Failed to 
restore ring because next node order " +
+                                                    "received is not as 
expected [expected=" + next.internalOrder() +
                                                     ", rcvd=" + nextOrder + ", 
id=" + next.id() + ']');
 
                                             break;
@@ -2413,7 +2484,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         log.debug("Initialized connection with 
next node: " + next.id());
 
                                     if (debugMode)
-                                        debugLog("Initialized connection with 
next node: " + next.id());
+                                        debugLog(msg, "Initialized connection 
with next node: " + next.id());
 
                                     errs = null;
 
@@ -2477,13 +2548,20 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             assert !forceSndPending || msg instanceof 
TcpDiscoveryNodeLeftMessage;
 
-                            if (failure || forceSndPending) {
+                            boolean sndPending=
+                                (newNextNode && 
ring.minimumNodeVersion().compareTo(CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE)
 >= 0) ||
+                                failure ||
+                                forceSndPending;
+
+                            if (sndPending) {
                                 if (log.isDebugEnabled())
                                     log.debug("Pending messages will be sent 
[failure=" + failure +
+                                        ", newNextNode=" + newNextNode +
                                         ", forceSndPending=" + forceSndPending 
+ ']');
 
                                 if (debugMode)
-                                    debugLog("Pending messages will be sent 
[failure=" + failure +
+                                    debugLog(msg, "Pending messages will be 
sent [failure=" + failure +
+                                        ", newNextNode=" + newNextNode +
                                         ", forceSndPending=" + forceSndPending 
+ ']');
 
                                 for (TcpDiscoveryAbstractMessage pendingMsg : 
pendingMsgs) {
@@ -2513,7 +2591,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                             ", res=" + res + ']');
 
                                     if (debugMode)
-                                        debugLog("Pending message has been 
sent to next node [msgId=" + msg.id() +
+                                        debugLog(msg, "Pending message has 
been sent to next node [msgId=" + msg.id() +
                                             ", pendingMsgId=" + 
pendingMsg.id() + ", next=" + next.id() +
                                             ", res=" + res + ']');
 
@@ -2540,6 +2618,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (timeoutHelper == null)
                                     timeoutHelper = new 
IgniteSpiOperationTimeoutHelper(spi);
 
+                                if (!failedNodes.isEmpty()) {
+                                    for (TcpDiscoveryNode failedNode : 
failedNodes) {
+                                        assert !failedNode.equals(next) : 
failedNode;
+
+                                        msg.addFailedNode(failedNode.id());
+                                    }
+                                }
+
                                 writeToSocket(sock, msg, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 spi.stats.onMessageSent(msg, 
U.currentTimeMillis() - tstamp);
@@ -2548,15 +2634,17 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 onMessageExchanged();
 
-                                if (log.isDebugEnabled())
+                                if (log.isDebugEnabled()) {
                                     log.debug("Message has been sent to next 
node [msg=" + msg +
                                         ", next=" + next.id() +
                                         ", res=" + res + ']');
+                                }
 
-                                if (debugMode)
-                                    debugLog("Message has been sent to next 
node [msg=" + msg +
+                                if (debugMode) {
+                                    debugLog(msg, "Message has been sent to 
next node [msg=" + msg +
                                         ", next=" + next.id() +
                                         ", res=" + res + ']');
+                                }
                             }
                             finally {
                                 clearNodeAddedMessage(msg);
@@ -2635,8 +2723,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     next = null;
 
-                    searchNext = true;
-
                     errs = null;
                 }
                 else
@@ -2665,25 +2751,30 @@ class ServerImpl extends TcpDiscoveryImpl {
                     msgWorker.addMessage(new 
TcpDiscoveryNodeFailedMessage(locNodeId, n.id(), n.internalOrder()));
 
                 if (!sent) {
+                    assert next == null : next;
+
                     if (log.isDebugEnabled())
                         log.debug("Pending messages will be resent to local 
node");
 
                     if (debugMode)
-                        log.debug("Pending messages will be resent to local 
node");
+                        debugLog(msg, "Pending messages will be resent to 
local node");
 
                     for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) 
{
                         prepareNodeAddedMessage(pendingMsg, locNodeId, 
pendingMsgs.msgs, pendingMsgs.discardId,
                             pendingMsgs.customDiscardId);
 
+                        pendingMsg.senderNodeId(locNodeId);
+
                         msgWorker.addMessage(pendingMsg);
 
                         if (log.isDebugEnabled())
                             log.debug("Pending message has been sent to local 
node [msg=" + msg.id() +
-                                ", pendingMsgId=" + pendingMsg + ", next=" + 
next.id() + ']');
+                                ", pendingMsgId=" + pendingMsg + ']');
 
-                        if (debugMode)
-                            debugLog("Pending message has been sent to local 
node [msg=" + msg.id() +
-                                ", pendingMsgId=" + pendingMsg + ", next=" + 
next.id() + ']');
+                        if (debugMode) {
+                            debugLog(msg, "Pending message has been sent to 
local node [msg=" + msg.id() +
+                                ", pendingMsgId=" + pendingMsg + ']');
+                        }
                     }
                 }
 
@@ -3317,15 +3408,17 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (sendMessageToRemotes(msg))
                     sendMessageAcrossRing(msg);
 
-                if (log.isDebugEnabled())
+                if (log.isDebugEnabled()) {
                     log.debug("Local node already has node being added. 
Passing TcpDiscoveryNodeAddedMessage to " +
-                                  "coordinator for final processing [ring=" + 
ring + ", node=" + node + ", locNode="
-                                  + locNode + ", msg=" + msg + ']');
+                        "coordinator for final processing [ring=" + ring + ", 
node=" + node + ", locNode="
+                        + locNode + ", msg=" + msg + ']');
+                }
 
-                if (debugMode)
-                    debugLog("Local node already has node being added. Passing 
TcpDiscoveryNodeAddedMessage to " +
-                                 "coordinator for final processing [ring=" + 
ring + ", node=" + node + ", locNode="
-                                 + locNode + ", msg=" + msg + ']');
+                if (debugMode) {
+                    debugLog(msg, "Local node already has node being added. 
Passing TcpDiscoveryNodeAddedMessage to " +
+                        "coordinator for final processing [ring=" + ring + ", 
node=" + node + ", locNode="
+                        + locNode + ", msg=" + msg + ']');
+                }
 
                 return;
             }
@@ -3338,7 +3431,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             ", msg=" + msg + ']');
 
                     if (debugMode)
-                        debugLog("Discarding node added message since new 
node's order is less than " +
+                        debugLog(msg, "Discarding node added message since new 
node's order is less than " +
                             "max order in ring [ring=" + ring + ", node=" + 
node + ", locNode=" + locNode +
                             ", msg=" + msg + ']');
 
@@ -3427,6 +3520,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         spi.onExchange(node.id(), node.id(), data, 
U.gridClassLoader());
 
                     msg.addDiscoveryData(locNodeId, 
spi.collectExchangeData(node.id()));
+
+                    processMessageFailedNodes(msg);
                 }
 
                 if (log.isDebugEnabled())
@@ -3447,6 +3542,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                             spi.gridStartTime = msg.gridStartTime();
 
                             for (TcpDiscoveryNode n : top) {
+                                assert n.internalOrder() < 
node.internalOrder() :
+                                    "Invalid node [topNode=" + n + ", added=" 
+ node + ']';
+
                                 // Make all preceding nodes and local node 
visible.
                                 n.visible(true);
                             }
@@ -3500,6 +3598,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     for (Map.Entry<UUID, Map<Integer, byte[]>> entry : 
dataMap.entrySet())
                         spi.onExchange(node.id(), entry.getKey(), 
entry.getValue(), U.gridClassLoader());
                 }
+
+                processMessageFailedNodes(msg);
             }
 
             if (sendMessageToRemotes(msg))
@@ -3733,7 +3833,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 interruptPing(leavingNode);
 
-                assert leftNode != null;
+                assert leftNode != null : msg;
 
                 if (log.isDebugEnabled())
                     log.debug("Removed node from topology: " + leftNode);
@@ -3887,6 +3987,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (node != null) {
+                assert !node.isLocal() : msg;
+
                 synchronized (mux) {
                     failedNodes.add(node);
                 }
@@ -4036,32 +4138,46 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     return;
                                 }
 
+                                TcpDiscoveryStatusCheckMessage msg0 = msg;
+
+                                if (F.contains(msg.failedNodes(), 
msg.creatorNodeId())) {
+                                    msg0 = new 
TcpDiscoveryStatusCheckMessage(msg);
+
+                                    msg0.failedNodes(null);
+
+                                    for (UUID failedNodeId : 
msg.failedNodes()) {
+                                        if 
(!failedNodeId.equals(msg.creatorNodeId()))
+                                            msg0.addFailedNode(failedNodeId);
+                                    }
+                                }
+
                                 try {
-                                    trySendMessageDirectly(msg.creatorNode(), 
msg);
+                                    trySendMessageDirectly(msg0.creatorNode(), 
msg0);
 
                                     if (log.isDebugEnabled())
                                         log.debug("Responded to status check 
message " +
-                                            "[recipient=" + 
msg.creatorNodeId() + ", status=" + msg.status() + ']');
+                                            "[recipient=" + 
msg0.creatorNodeId() + ", status=" + msg0.status() + ']');
                                 }
                                 catch (IgniteSpiException e) {
                                     if (e.hasCause(SocketException.class)) {
                                         if (log.isDebugEnabled())
                                             log.debug("Failed to respond to 
status check message (connection " +
-                                                "refused) [recipient=" + 
msg.creatorNodeId() + ", status=" +
-                                                msg.status() + ']');
+                                                "refused) [recipient=" + 
msg0.creatorNodeId() + ", status=" +
+                                                msg0.status() + ']');
 
                                         onException("Failed to respond to 
status check message (connection refused) " +
-                                            "[recipient=" + 
msg.creatorNodeId() + ", status=" + msg.status() + ']', e);
+                                            "[recipient=" + 
msg0.creatorNodeId() + ", status=" + msg0.status() + ']', e);
                                     }
-                                    else {
-                                        if (pingNode(msg.creatorNode()))
+                                    else if (!spi.isNodeStopping0()) {
+                                        if (pingNode(msg0.creatorNode()))
                                             // Node exists and accepts 
incoming connections.
                                             U.error(log, "Failed to respond to 
status check message [recipient=" +
-                                                msg.creatorNodeId() + ", 
status=" + msg.status() + ']', e);
-                                        else if (log.isDebugEnabled())
-                                            log.debug("Failed to respond to 
status check message (did the node " +
-                                                "stop?) [recipient=" + 
msg.creatorNodeId() + ", status=" + msg.status()
-                                                + ']');
+                                                msg0.creatorNodeId() + ", 
status=" + msg0.status() + ']', e);
+                                        else if (log.isDebugEnabled()) {
+                                            log.debug("Failed to respond to 
status check message (did the node stop?)" +
+                                                "[recipient=" + 
msg0.creatorNodeId() +
+                                                ", status=" + msg0.status() + 
']');
+                                        }
                                     }
                                 }
                             }
@@ -4364,27 +4480,42 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
             if (isLocalNodeCoordinator()) {
-                if (!joiningNodes.isEmpty()) {
+                boolean delayMsg;
+
+                assert ring.minimumNodeVersion() != null : ring;
+
+                if 
(ring.minimumNodeVersion().compareTo(CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE)
 >= 0)
+                    delayMsg = msg.topologyVersion() == 0L && 
!joiningNodes.isEmpty();
+                else
+                    delayMsg = !joiningNodes.isEmpty();
+
+                if (delayMsg) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Delay custom message processing, there are 
joining nodes [msg=" + msg +
+                            ", joiningNodes=" + joiningNodes + ']');
+                    }
+
                     pendingCustomMsgs.add(msg);
 
                     return;
                 }
 
-                boolean sndNext = !msg.verified();
-
-                if (sndNext) {
+                if (!msg.verified()) {
                     msg.verify(getLocalNodeId());
                     msg.topologyVersion(ring.topologyVersion());
 
-                    if (pendingMsgs.procCustomMsgs.add(msg.id()))
+                    if (pendingMsgs.procCustomMsgs.add(msg.id())) {
                         notifyDiscoveryListener(msg);
-                    else
-                        sndNext = false;
-                }
 
-                if (sndNext && ring.hasRemoteNodes())
-                    sendMessageAcrossRing(msg);
+                        if (sendMessageToRemotes(msg))
+                            sendMessageAcrossRing(msg);
+                        else
+                            processCustomMessage(msg);
+                    }
+                }
                 else {
+                    addMessage(new 
TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
+
                     spi.stats.onRingMessageReceived(msg);
 
                     DiscoverySpiCustomMessage msgObj = null;
@@ -4401,16 +4532,21 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         if (nextMsg != null) {
                             try {
-                                addMessage(new 
TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg,
-                                    spi.marsh.marshal(nextMsg)));
+                                TcpDiscoveryCustomEventMessage ackMsg = new 
TcpDiscoveryCustomEventMessage(
+                                    getLocalNodeId(), nextMsg, 
spi.marsh.marshal(nextMsg));
+
+                                ackMsg.topologyVersion(msg.topologyVersion());
+
+                                processCustomMessage(ackMsg);
+
+                                if (ackMsg.verified())
+                                    msgHist.add(ackMsg);
                             }
                             catch (IgniteCheckedException e) {
                                 U.error(log, "Failed to marshal discovery 
custom message.", e);
                             }
                         }
                     }
-
-                    addMessage(new 
TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
                 }
             }
             else {
@@ -4428,9 +4564,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 if (msg.verified() && state0 == CONNECTED && 
pendingMsgs.procCustomMsgs.add(msg.id())) {
-                    assert joiningNodes.isEmpty() : "Joining nodes: " + 
joiningNodes + ", msg=" + msg + ", loc=" + locNode.id() +
-                        ", topver=" + ring.topologyVersion();
-                    assert msg.topologyVersion() == ring.topologyVersion() : 
"msg: " + msg + ", topver=" + ring.topologyVersion();
+                    assert msg.topologyVersion() == ring.topologyVersion() :
+                        "msg: " + msg + ", topVer=" + ring.topologyVersion();
 
                     notifyDiscoveryListener(msg);
                 }
@@ -4441,6 +4576,38 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * Checks failed nodes list and sends {@link 
TcpDiscoveryNodeFailedMessage} if failed node
+         * is still in the ring.
+         */
+        private void checkFailedNodesList() {
+            List<TcpDiscoveryNodeFailedMessage> msgs = null;
+
+            synchronized (mux) {
+                for (Iterator<TcpDiscoveryNode> it = failedNodes.iterator(); 
it.hasNext();) {
+                    TcpDiscoveryNode node = it.next();
+
+                    if (ring.node(node.id()) != null) {
+                        if (msgs == null)
+                            msgs = new ArrayList<>(failedNodes.size());
+
+                        msgs.add(new 
TcpDiscoveryNodeFailedMessage(getLocalNodeId(), node.id(), 
node.internalOrder()));
+                    }
+                    else
+                        it.remove();
+                }
+            }
+
+            if (msgs != null) {
+                for (TcpDiscoveryNodeFailedMessage msg : msgs) {
+                    if (log.isDebugEnabled())
+                        log.debug("Add node failed message for node from 
failed nodes list: " + msg);
+
+                    addMessage(msg);
+                }
+            }
+        }
+
+        /**
          * Checks and flushes custom event messages if no nodes are attempting 
to join the grid.
          */
         private void checkPendingCustomMessages() {
@@ -4640,10 +4807,10 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     synchronized (mux) {
                         readers.add(reader);
-
-                        reader.start();
                     }
 
+                    reader.start();
+
                     spi.stats.onServerSocketInitialized(U.currentTimeMillis() 
- tstamp);
                 }
             }
@@ -4861,9 +5028,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                         log.debug("Initialized connection with remote node 
[nodeId=" + nodeId +
                             ", client=" + req.client() + ']');
 
-                    if (debugMode)
-                        debugLog("Initialized connection with remote node 
[nodeId=" + nodeId +
+                    if (debugMode) {
+                        debugLog(msg, "Initialized connection with remote node 
[nodeId=" + nodeId +
                             ", client=" + req.client() + ']');
+                    }
                 }
                 catch (IOException e) {
                     if (log.isDebugEnabled())
@@ -4932,7 +5100,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         spi.stats.onMessageReceived(msg);
 
                         if (debugMode && recordable(msg))
-                            debugLog("Message has been received: " + msg);
+                            debugLog(msg, "Message has been received: " + msg);
 
                         if (msg instanceof TcpDiscoveryConnectionCheckMessage) 
{
                             spi.writeToSocket(msg, sock, RES_OK, 
socketTimeout);

http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 2786d0b..1aef728 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -37,6 +37,7 @@ import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.IgniteSpiThread;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -99,9 +100,10 @@ abstract class TcpDiscoveryImpl {
     }
 
     /**
+     * @param discoMsg Discovery message.
      * @param msg Message.
      */
-    protected void debugLog(String msg) {
+    protected void debugLog(@Nullable TcpDiscoveryAbstractMessage discoMsg, 
String msg) {
         assert debugMode;
 
         String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new 
Date(System.currentTimeMillis())) +

http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index 7ca092c..eb0f74a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.util.typedef.PN;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
 
 import java.util.Collection;
@@ -88,6 +89,23 @@ public class TcpDiscoveryNodesRing {
     @GridToStringExclude
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
+    /** */
+    private IgniteProductVersion minNodeVer;
+
+    /**
+     * @return Minimum node version.
+     */
+    public IgniteProductVersion minimumNodeVersion() {
+        rwLock.readLock().lock();
+
+        try {
+            return minNodeVer;
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
     /**
      * Sets local node.
      *
@@ -225,6 +243,8 @@ public class TcpDiscoveryNodesRing {
             nodeOrder = node.internalOrder();
 
             maxInternalOrder = node.internalOrder();
+
+            initializeMinimumVersion();
         }
         finally {
             rwLock.writeLock().unlock();
@@ -295,6 +315,8 @@ public class TcpDiscoveryNodesRing {
             }
 
             nodeOrder = topVer;
+
+            initializeMinimumVersion();
         }
         finally {
             rwLock.writeLock().unlock();
@@ -341,6 +363,8 @@ public class TcpDiscoveryNodesRing {
                 nodes.remove(rmv);
             }
 
+            initializeMinimumVersion();
+
             return rmv;
         }
         finally {
@@ -372,6 +396,8 @@ public class TcpDiscoveryNodesRing {
             maxInternalOrder = 0;
 
             topVer = 0;
+
+            minNodeVer = locNode.version();
         }
         finally {
             rwLock.writeLock().unlock();
@@ -451,61 +477,8 @@ public class TcpDiscoveryNodesRing {
      * topology contains less than two nodes.
      */
     @Nullable public TcpDiscoveryNode nextNode(@Nullable 
Collection<TcpDiscoveryNode> excluded) {
-        assert excluded == null || excluded.isEmpty() || 
!excluded.contains(locNode);
-
-        rwLock.readLock().lock();
-
-        try {
-            Collection<TcpDiscoveryNode> filtered = serverNodes(excluded);
-
-            if (filtered.size() < 2)
-                return null;
-
-            Iterator<TcpDiscoveryNode> iter = filtered.iterator();
-
-            while (iter.hasNext()) {
-                TcpDiscoveryNode node = iter.next();
-
-                if (locNode.equals(node))
-                    break;
-            }
-
-            return iter.hasNext() ? iter.next() : F.first(filtered);
-        }
-        finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    /**
-     * Finds previous node in the topology.
-     *
-     * @return Previous node.
-     */
-    @Nullable public TcpDiscoveryNode previousNode() {
-        rwLock.readLock().lock();
-
-        try {
-            if (nodes.size() < 2)
-                return null;
-
-            return previousNode(null);
-        }
-        finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    /**
-     * Finds previous node in the topology filtering excluded nodes from 
search.
-     *
-     * @param excluded Nodes to exclude from the search (optional). If 
provided,
-     * cannot contain local node.
-     * @return Previous node or {@code null} if all nodes were filtered out or
-     * topology contains less than two nodes.
-     */
-    @Nullable public TcpDiscoveryNode previousNode(@Nullable 
Collection<TcpDiscoveryNode> excluded) {
-        assert excluded == null || excluded.isEmpty() || 
!excluded.contains(locNode);
+        assert locNode.internalOrder() > 0 : locNode;
+        assert excluded == null || excluded.isEmpty() || 
!excluded.contains(locNode) : excluded;
 
         rwLock.readLock().lock();
 
@@ -638,6 +611,18 @@ public class TcpDiscoveryNodesRing {
         });
     }
 
+    /**
+     *
+     */
+    private void initializeMinimumVersion() {
+        minNodeVer = null;
+
+        for (TcpDiscoveryNode node : nodes) {
+            if (minNodeVer == null || node.version().compareTo(minNodeVer) < 0)
+                minNodeVer = node.version();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         rwLock.readLock().lock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 875d18e..9cb47af 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -19,10 +19,15 @@ package org.apache.ignite.spi.discovery.tcp.messages;
 
 import java.io.Externalizable;
 import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Base class to implement discovery messages.
@@ -62,6 +67,10 @@ public abstract class TcpDiscoveryAbstractMessage implements 
Serializable {
     /** Pending message index. */
     private short pendingIdx;
 
+    /** */
+    @GridToStringInclude
+    private Set<UUID> failedNodes;
+
     /**
      * Default no-arg constructor for {@link Externalizable} interface.
      */
@@ -236,6 +245,34 @@ public abstract class TcpDiscoveryAbstractMessage 
implements Serializable {
         return false;
     }
 
+    /**
+     * Adds node ID to the failed nodes list.
+     *
+     * @param nodeId Node ID.
+     */
+    public void addFailedNode(UUID nodeId) {
+        assert nodeId != null;
+
+        if (failedNodes == null)
+            failedNodes = new HashSet<>();
+
+        failedNodes.add(nodeId);
+    }
+
+    /**
+     * @param failedNodes Failed nodes.
+     */
+    public void failedNodes(@Nullable Set<UUID> failedNodes) {
+        this.failedNodes = failedNodes;
+    }
+
+    /**
+     * @return Failed nodes IDs.
+     */
+    @Nullable public Collection<UUID> failedNodes() {
+        return failedNodes;
+    }
+
     /** {@inheritDoc} */
     @Override public final boolean equals(Object obj) {
         if (this == obj)

http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
index 6118f4d..70b0080 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
@@ -62,6 +62,17 @@ public class TcpDiscoveryStatusCheckMessage extends 
TcpDiscoveryAbstractMessage
     }
 
     /**
+     * @param msg Message to copy.
+     */
+    public TcpDiscoveryStatusCheckMessage(TcpDiscoveryStatusCheckMessage msg) {
+        super(msg);
+
+        this.creatorNode = msg.creatorNode;
+        this.failedNodeId = msg.failedNodeId;
+        this.status = msg.status;
+    }
+
+    /**
      * Gets creator node.
      *
      * @return Creator node.

http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 55474dc..5053c2d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -212,6 +213,22 @@ public class TcpDiscoveryMultiThreadedTest extends 
GridCommonAbstractTest {
     public void testMultiThreadedClientsServersRestart() throws Throwable {
         fail("https://issues.apache.org/jira/browse/IGNITE-1123";);
 
+        multiThreadedClientsServersRestart(GRID_CNT, CLIENT_GRID_CNT);
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
+    public void _testMultiThreadedServersRestart() throws Throwable {
+        multiThreadedClientsServersRestart(GRID_CNT * 2, 0);
+    }
+
+    /**
+     * @param srvs Number of servers.
+     * @param clients Number of clients.
+     * @throws Exception If any error occurs.
+     */
+    private void multiThreadedClientsServersRestart(int srvs, int clients) 
throws Throwable {
         final AtomicBoolean done = new AtomicBoolean();
 
         try {
@@ -219,91 +236,95 @@ public class TcpDiscoveryMultiThreadedTest extends 
GridCommonAbstractTest {
 
             info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " 
min.");
 
-            startGridsMultiThreaded(GRID_CNT);
-
-            clientFlagGlobal = true;
+            startGridsMultiThreaded(srvs);
 
-            startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+            IgniteInternalFuture<?> clientFut = null;
 
             final AtomicReference<Throwable> error = new AtomicReference<>();
 
-            final BlockingQueue<Integer> clientStopIdxs = new 
LinkedBlockingQueue<>();
-
-            for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++)
-                clientStopIdxs.add(i);
+            if (clients > 0) {
+                clientFlagGlobal = true;
 
-            final AtomicInteger clientStartIdx = new AtomicInteger(9000);
+                startGridsMultiThreaded(srvs, clients);
 
-            IgniteInternalFuture<?> fut1 = multithreadedAsync(
-                new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        try {
-                            clientFlagPerThread.set(true);
+                final BlockingQueue<Integer> clientStopIdxs = new 
LinkedBlockingQueue<>();
 
-                            while (!done.get() && error.get() == null) {
-                                Integer stopIdx = clientStopIdxs.take();
+                for (int i = srvs; i < srvs + clients; i++)
+                    clientStopIdxs.add(i);
 
-                                log.info("Stop client: " + stopIdx);
+                final AtomicInteger clientStartIdx = new AtomicInteger(9000);
 
-                                stopGrid(stopIdx);
+                clientFut = multithreadedAsync(
+                    new Callable<Object>() {
+                        @Override public Object call() throws Exception {
+                            try {
+                                clientFlagPerThread.set(true);
 
                                 while (!done.get() && error.get() == null) {
-                                    // Generate unique name to simplify 
debugging.
-                                    int startIdx = 
clientStartIdx.getAndIncrement();
+                                    Integer stopIdx = clientStopIdxs.take();
 
-                                    log.info("Start client: " + startIdx);
+                                    log.info("Stop client: " + stopIdx);
 
-                                    UUID id = UUID.randomUUID();
+                                    stopGrid(stopIdx);
 
-                                    nodeId.set(id);
+                                    while (!done.get() && error.get() == null) 
{
+                                        // Generate unique name to simplify 
debugging.
+                                        int startIdx = 
clientStartIdx.getAndIncrement();
 
-                                    try {
-                                        Ignite ignite = startGrid(startIdx);
+                                        log.info("Start client: " + startIdx);
 
-                                        
assertTrue(ignite.configuration().isClientMode());
+                                        UUID id = UUID.randomUUID();
 
-                                        clientStopIdxs.add(startIdx);
+                                        nodeId.set(id);
 
-                                        break;
-                                    }
-                                    catch (Exception e) {
-                                        if (X.hasCause(e, 
IgniteClientDisconnectedCheckedException.class) ||
-                                            X.hasCause(e, 
IgniteClientDisconnectedException.class))
-                                            log.info("Client disconnected: " + 
e);
-                                        else if (X.hasCause(e, 
ClusterTopologyCheckedException.class))
-                                            log.info("Client failed to start: 
" + e);
-                                        else {
-                                            if (failedNodes.contains(id) && 
X.hasCause(e, IgniteSpiException.class))
-                                                log.info("Client failed: " + 
e);
-                                            else
-                                                throw e;
+                                        try {
+                                            Ignite ignite = 
startGrid(startIdx);
+
+                                            
assertTrue(ignite.configuration().isClientMode());
+
+                                            clientStopIdxs.add(startIdx);
+
+                                            break;
+                                        }
+                                        catch (Exception e) {
+                                            if (X.hasCause(e, 
IgniteClientDisconnectedCheckedException.class) ||
+                                                X.hasCause(e, 
IgniteClientDisconnectedException.class))
+                                                log.info("Client disconnected: 
" + e);
+                                            else if (X.hasCause(e, 
ClusterTopologyCheckedException.class))
+                                                log.info("Client failed to 
start: " + e);
+                                            else {
+                                                if (failedNodes.contains(id) 
&& X.hasCause(e, IgniteSpiException.class))
+                                                    log.info("Client failed: " 
+ e);
+                                                else
+                                                    throw e;
+                                            }
                                         }
                                     }
                                 }
                             }
-                        }
-                        catch (Throwable e) {
-                            log.error("Unexpected error: " + e, e);
+                            catch (Throwable e) {
+                                log.error("Unexpected error: " + e, e);
 
-                            error.compareAndSet(null, e);
+                                error.compareAndSet(null, e);
+
+                                return null;
+                            }
 
                             return null;
                         }
-
-                        return null;
-                    }
-                },
-                CLIENT_GRID_CNT,
-                "client-restart");
+                    },
+                    clients,
+                    "client-restart");
+            }
 
             final BlockingQueue<Integer> srvStopIdxs = new 
LinkedBlockingQueue<>();
 
-            for (int i = 0; i < GRID_CNT; i++)
+            for (int i = 0; i < srvs; i++)
                 srvStopIdxs.add(i);
 
-            final AtomicInteger srvStartIdx = new AtomicInteger(GRID_CNT + 
CLIENT_GRID_CNT);
+            final AtomicInteger srvStartIdx = new AtomicInteger(srvs + 
clients);
 
-            IgniteInternalFuture<?> fut2 = multithreadedAsync(
+            IgniteInternalFuture<?> srvFut = multithreadedAsync(
                 new Callable<Object>() {
                     @Override public Object call() throws Exception {
                         try {
@@ -312,6 +333,10 @@ public class TcpDiscoveryMultiThreadedTest extends 
GridCommonAbstractTest {
                             while (!done.get() && error.get() == null) {
                                 int stopIdx = srvStopIdxs.take();
 
+                                U.sleep(50);
+
+                                Thread.currentThread().setName("stop-server-" 
+ getTestGridName(stopIdx));
+
                                 log.info("Stop server: " + stopIdx);
 
                                 stopGrid(stopIdx);
@@ -319,13 +344,20 @@ public class TcpDiscoveryMultiThreadedTest extends 
GridCommonAbstractTest {
                                 // Generate unique name to simplify debugging.
                                 int startIdx = srvStartIdx.getAndIncrement();
 
+                                Thread.currentThread().setName("start-server-" 
+ getTestGridName(startIdx));
+
                                 log.info("Start server: " + startIdx);
 
-                                Ignite ignite = startGrid(startIdx);
+                                try {
+                                    Ignite ignite = startGrid(startIdx);
 
-                                
assertFalse(ignite.configuration().isClientMode());
+                                    
assertFalse(ignite.configuration().isClientMode());
 
-                                srvStopIdxs.add(startIdx);
+                                    srvStopIdxs.add(startIdx);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    log.info("Failed to start: " + e);
+                                }
                             }
                         }
                         catch (Throwable e) {
@@ -339,7 +371,7 @@ public class TcpDiscoveryMultiThreadedTest extends 
GridCommonAbstractTest {
                         return null;
                     }
                 },
-                GRID_CNT - 1,
+                srvs - 1,
                 "server-restart");
 
             final long timeToExec = getTestTimeout() - 60_000;
@@ -356,8 +388,10 @@ public class TcpDiscoveryMultiThreadedTest extends 
GridCommonAbstractTest {
 
                     done.set(true);
 
-                    fut1.cancel();
-                    fut2.cancel();
+                    if (clientFut != null)
+                        clientFut.cancel();
+
+                    srvFut.cancel();
 
                     throw err;
                 }
@@ -367,8 +401,10 @@ public class TcpDiscoveryMultiThreadedTest extends 
GridCommonAbstractTest {
 
             done.set(true);
 
-            fut1.get();
-            fut2.get();
+            if (clientFut != null)
+                clientFut.get();
+
+            srvFut.get();
         }
         finally {
             done.set(true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java
index 8b94f54..7beeb41 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -196,8 +197,15 @@ public class TcpDiscoveryRestartTest extends 
GridCommonAbstractTest {
 
         /**
          * @param nodeId Node ID.
+         * @throws Exception If failed.
          */
-        void checkEvents(UUID nodeId) {
+        void checkEvents(final UUID nodeId) throws Exception {
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return joinIds.contains(nodeId) && 
leftIds.contains(nodeId);
+                }
+            }, 5000);
+
             assertTrue("No join event: " + nodeId, joinIds.contains(nodeId));
 
             assertTrue("No left event: " + nodeId, leftIds.contains(nodeId));

http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 51d8a2d..379a3a6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -21,16 +21,19 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +41,8 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.Ignition;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -46,12 +51,16 @@ import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import 
org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.IgniteSpiException;
@@ -64,8 +73,8 @@ import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
-import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
@@ -94,7 +103,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     private UUID nodeId;
 
     /** */
-    private TcpDiscoverySpi nodeSpi;
+    private static ThreadLocal<TcpDiscoverySpi> nodeSpi = new ThreadLocal<>();
 
     /**
      * @throws Exception If fails.
@@ -104,15 +113,17 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings({"IfMayBeConditional", "deprecation"})
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TcpDiscoverySpi spi = nodeSpi;
+        TcpDiscoverySpi spi = nodeSpi.get();
 
-        if (spi == null)
+        if (spi == null) {
             spi = 
gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
                 new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
+        }
+        else
+            nodeSpi.set(null);
 
         discoMap.put(gridName, spi);
 
@@ -176,6 +187,13 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        discoMap = null;
+
+        super.afterTest();
+    }
+
     /**
      * @throws Exception If any error occurs.
      */
@@ -1202,11 +1220,11 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     private void customEventRace1(final boolean cacheStartFrom1, boolean 
stopCrd) throws Exception {
         TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi();
 
-        nodeSpi = spi0;
+        nodeSpi.set(spi0);
 
         final Ignite ignite0 = startGrid(0);
 
-        nodeSpi = new TestCustomEventRaceSpi();
+        nodeSpi.set(new TestCustomEventRaceSpi());
 
         final Ignite ignite1 = startGrid(1);
 
@@ -1221,7 +1239,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
             @Override public Void call() throws Exception {
                 log.info("Start 2");
 
-                nodeSpi = new TestCustomEventRaceSpi();
+                nodeSpi.set(new TestCustomEventRaceSpi());
 
                 Ignite ignite2 = startGrid(2);
 
@@ -1271,7 +1289,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
         assertEquals(1, cache.get(1));
 
-        nodeSpi = new TestCustomEventRaceSpi();
+        nodeSpi.set(new TestCustomEventRaceSpi());
 
         Ignite ignite = startGrid(3);
 
@@ -1314,15 +1332,15 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     private void customEventCoordinatorFailure(boolean twoNodes) throws 
Exception {
         TestCustomEventCoordinatorFailureSpi spi0 = new 
TestCustomEventCoordinatorFailureSpi();
 
-        nodeSpi = spi0;
+        nodeSpi.set(spi0);
 
         Ignite ignite0 = startGrid(0);
 
-        nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+        nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
 
         Ignite ignite1 = startGrid(1);
 
-        nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+        nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
 
         Ignite ignite2 = twoNodes ? null : startGrid(2);
 
@@ -1366,7 +1384,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
         log.info("Try start one more node.");
 
-        nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+        nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
 
         Ignite ignite = startGrid(twoNodes ? 2 : 3);
 
@@ -1381,6 +1399,421 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Coordinator is added in failed list during node start.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFailedNodes1() throws Exception {
+        try {
+            final int FAIL_ORDER = 3;
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            final Ignite ignite0 = startGrid(0);
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            startGrid(1);
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            Ignite ignite2 = startGrid(2);
+
+            assertEquals(2, ignite2.cluster().nodes().size());
+
+            waitNodeStop(ignite0.name());
+
+            tryCreateCache(2);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Coordinator is added in failed list, concurrent nodes start.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFailedNodes2() throws Exception {
+        try {
+            final int FAIL_ORDER = 3;
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            Ignite ignite0 = startGrid(0);
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            startGrid(1);
+
+            final AtomicInteger nodeIdx = new AtomicInteger(1);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    int idx = nodeIdx.incrementAndGet();
+
+                    nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+                    startGrid(idx);
+
+                    return null;
+                }
+            }, 3, "start-node");
+
+            Ignite ignite2 = ignite(2);
+
+            waitForRemoteNodes(ignite2, 3);
+
+            waitNodeStop(ignite0.name());
+
+            tryCreateCache(4);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Coordinator is added in failed list during node start, test with two 
nodes.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFailedNodes3() throws Exception {
+        try {
+            nodeSpi.set(new TestFailedNodesSpi(-1));
+
+            Ignite ignite0 = startGrid(0);
+
+            nodeSpi.set(new TestFailedNodesSpi(2));
+
+            Ignite ignite1 = startGrid(1);
+
+            assertEquals(1, ignite1.cluster().nodes().size());
+
+            waitNodeStop(ignite0.name());
+
+            ignite1.getOrCreateCache(new CacheConfiguration<>()).put(1, 1);
+
+            startGrid(2);
+
+            assertEquals(2, ignite1.cluster().nodes().size());
+
+            tryCreateCache(2);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Coordinator is added in failed list during node start, but node 
detected failure dies before
+     * sending {@link TcpDiscoveryNodeFailedMessage}.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFailedNodes4() throws Exception {
+        try {
+            final int FAIL_ORDER = 3;
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            final Ignite ignite0 = startGrid(0);
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            Ignite ignite1 = startGrid(1);
+
+            TestFailedNodesSpi spi = new TestFailedNodesSpi(FAIL_ORDER);
+
+            spi.stopBeforeSndFail = true;
+
+            nodeSpi.set(spi);
+
+            Ignite ignite2 = startGrid(2);
+
+            waitNodeStop(ignite2.name());
+
+            log.info("Try start new node.");
+
+            Ignite ignite3 = startGrid(3);
+
+            waitNodeStop(ignite0.name());
+
+            assertEquals(2, ignite1.cluster().nodes().size());
+            assertEquals(2, ignite3.cluster().nodes().size());
+
+            tryCreateCache(2);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Adds some node in failed list after join process finished.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFailedNodes5() throws Exception {
+        try {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            for (int iter = 0; iter < 3; iter++) {
+                final int NODES = iter == 0 ? 2 : rnd.nextInt(3, 6);
+
+                for (int i = 0; i < NODES; i++) {
+                    nodeSpi.set(new TestFailedNodesSpi(-1));
+
+                    startGrid(i);
+                }
+
+                Map<Long, Ignite> nodes = new HashMap<>();
+
+                for (int i = 0; i < NODES; i++) {
+                    Ignite ignite = ignite(i);
+
+                    nodes.put(ignite.cluster().localNode().order(), ignite);
+                }
+
+                Ignite ignite = ignite(rnd.nextInt(NODES));
+
+                log.info("Iteration [iter=" + iter + ", nodes=" + NODES + ", 
failFrom=" + ignite.name() + ']');
+
+                TestFailedNodesSpi spi = 
(TestFailedNodesSpi)ignite.configuration().getDiscoverySpi();
+
+                spi.failSingleMsg = true;
+
+                long order = ignite.cluster().localNode().order();
+
+                long nextOrder = order == NODES ? 1 : order + 1;
+
+                Ignite failingNode = nodes.get(nextOrder);
+
+                assertNotNull(failingNode);
+
+                waitNodeStop(failingNode.name());
+
+                Ignite newNode = startGrid(NODES);
+
+                assertEquals(NODES, newNode.cluster().nodes().size());
+
+                tryCreateCache(NODES);
+
+                stopAllGrids();
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomEventAckNotSend() throws Exception {
+        try {
+            TestCustomerEventAckSpi spi0 = new TestCustomerEventAckSpi();
+
+            nodeSpi.set(spi0);
+
+            Ignite ignite0 = startGrid(0);
+
+            nodeSpi.set(new TestCustomerEventAckSpi());
+
+            Ignite ignite1 = startGrid(1);
+
+            spi0.stopBeforeSndAck = true;
+
+            ignite1.message().remoteListen("test", new DummyPredicate());
+
+            waitNodeStop(ignite0.name());
+
+            startGrid(2);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param nodeName Node name.
+     * @throws Exception If failed.
+     */
+    private void waitNodeStop(final String nodeName) throws Exception {
+        boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    Ignition.ignite(nodeName);
+
+                    return false;
+                }
+                catch (IgniteIllegalStateException e) {
+                    return true;
+                }
+            }
+        }, 10_000);
+
+        if (!wait)
+            U.dumpThreads(log);
+
+        assertTrue("Failed to wait for node stop.", wait);
+    }
+
+    /**
+     * @param expNodes Expected nodes number.
+     */
+    private void tryCreateCache(int expNodes) {
+        List<Ignite> allNodes = G.allGrids();
+
+        assertEquals(expNodes, allNodes.size());
+
+        int cntr = 0;
+
+        for (Ignite ignite : allNodes) {
+            CacheConfiguration<Object, Object> ccfg = new 
CacheConfiguration<>();
+
+            ccfg.setName("cache-" + cntr++);
+
+            log.info("Try create cache [node=" + ignite.name() + ", cache=" + 
ccfg.getName() + ']');
+
+            ignite.getOrCreateCache(ccfg).put(1, 1);
+        }
+    }
+
+    /**
+     *
+     */
+    static class DummyPredicate implements IgniteBiPredicate<UUID, Object> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID uuid, Object o) {
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestCustomerEventAckSpi extends TcpDiscoverySpi {
+        /** */
+        private volatile boolean stopBeforeSndAck;
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock,
+            TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (stopBeforeSndAck) {
+                if (msg instanceof TcpDiscoveryCustomEventMessage) {
+                    try {
+                        DiscoveryCustomMessage custMsg = 
GridTestUtils.getFieldValue(
+                            
((TcpDiscoveryCustomEventMessage)msg).message(marsh), "delegate");
+
+                        if (custMsg instanceof 
StartRoutineAckDiscoveryMessage) {
+                            log.info("Skip message send and stop node: " + 
msg);
+
+                            sock.close();
+
+                            GridTestUtils.runAsync(new Callable<Object>() {
+                                @Override public Object call() throws 
Exception {
+                                    ignite.close();
+
+                                    return null;
+                                }
+                            }, "stop-node");
+
+                            return;
+                        }
+                    }
+                    catch (Throwable e) {
+                        fail("Unexpected error: " + e);
+                    }
+                }
+            }
+
+            super.writeToSocket(sock, msg, bout, timeout);
+        }
+    }
+
+    /**
+     * Simulate scenario when node detects node failure trying to send 
message, but node still alive.
+     */
+    private static class TestFailedNodesSpi extends TcpDiscoverySpi {
+        /** */
+        private AtomicBoolean failMsg = new AtomicBoolean();
+
+        /** */
+        private int failOrder;
+
+        /** */
+        private boolean stopBeforeSndFail;
+
+        /** */
+        private boolean stop;
+
+        /** */
+        private volatile boolean failSingleMsg;
+
+        /**
+         * @param failOrder Spi fails connection if local node order equals to 
this order.
+         */
+        TestFailedNodesSpi(int failOrder) {
+            this.failOrder = failOrder;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock,
+            TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (stop)
+                return;
+
+            if (failSingleMsg) {
+                failSingleMsg = false;
+
+                log.info("IO error on message send [locNode=" + locNode + ", 
msg=" + msg + ']');
+
+                sock.close();
+
+                throw new SocketTimeoutException();
+            }
+
+            if (locNode.internalOrder() == failOrder &&
+                (msg instanceof TcpDiscoveryNodeAddedMessage) &&
+                failMsg.compareAndSet(false, true)) {
+                log.info("IO error on message send [locNode=" + locNode + ", 
msg=" + msg + ']');
+
+                sock.close();
+
+                throw new SocketTimeoutException();
+            }
+
+            if (stopBeforeSndFail &&
+                locNode.internalOrder() == failOrder &&
+                (msg instanceof TcpDiscoveryNodeFailedMessage)) {
+                stop = true;
+
+                log.info("Skip messages send and stop node [locNode=" + 
locNode + ", msg=" + msg + ']');
+
+                sock.close();
+
+                GridTestUtils.runAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        ignite.close();
+
+                        return null;
+                    }
+                }, "stop-node");
+
+                return;
+            }
+
+            super.writeToSocket(sock, msg, bout, timeout);
+        }
+    }
+
+    /**
      *
      */
     private static class TestCustomEventCoordinatorFailureSpi extends 
TcpDiscoverySpi {

http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
index 97ba5cf..1e710ee 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
@@ -58,14 +58,20 @@ public class TcpDiscoveryMulticastIpFinderSelfTest
 
         try {
             ipFinder1 = ipFinder();
+            ipFinder1.setResponseWaitTime(1000);
+            ipFinder1.setAddressRequestAttempts(10);
 
             ipFinder2 = new TcpDiscoveryMulticastIpFinder();
 
+            ipFinder2.setResponseWaitTime(1000);
+            ipFinder2.setAddressRequestAttempts(10);
             ipFinder2.setMulticastGroup(ipFinder1.getMulticastGroup());
             ipFinder2.setMulticastPort(ipFinder1.getMulticastPort());
 
             ipFinder3 = new TcpDiscoveryMulticastIpFinder();
 
+            ipFinder3.setResponseWaitTime(1000);
+            ipFinder3.setAddressRequestAttempts(10);
             ipFinder3.setMulticastGroup(ipFinder1.getMulticastGroup());
             ipFinder3.setMulticastPort(ipFinder1.getMulticastPort());
 
@@ -81,21 +87,13 @@ public class TcpDiscoveryMulticastIpFinderSelfTest
             ipFinder2.initializeLocalAddresses(Collections.singleton(new 
InetSocketAddress("host2", 1002)));
             ipFinder3.initializeLocalAddresses(Collections.singleton(new 
InetSocketAddress("host3", 1003)));
 
-            for (int i = 0; i < 5; i++) {
-                Collection<InetSocketAddress> addrs1 = 
ipFinder1.getRegisteredAddresses();
-                Collection<InetSocketAddress> addrs2 = 
ipFinder2.getRegisteredAddresses();
-                Collection<InetSocketAddress> addrs3 = 
ipFinder3.getRegisteredAddresses();
-
-                if (addrs1.size() != 1 || addrs2.size() != 2 || addrs3.size() 
!= 3) {
-                    info("Addrs1: " + addrs1);
-                    info("Addrs2: " + addrs2);
-                    info("Addrs2: " + addrs3);
-
-                    Thread.sleep(1000);
-                }
-                else
-                    break;
-            }
+            Collection<InetSocketAddress> addrs1 = 
ipFinder1.getRegisteredAddresses();
+            Collection<InetSocketAddress> addrs2 = 
ipFinder2.getRegisteredAddresses();
+            Collection<InetSocketAddress> addrs3 = 
ipFinder3.getRegisteredAddresses();
+
+            info("Addrs1: " + addrs1);
+            info("Addrs2: " + addrs2);
+            info("Addrs2: " + addrs3);
 
             assertEquals(1, ipFinder1.getRegisteredAddresses().size());
             assertEquals(2, ipFinder2.getRegisteredAddresses().size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 41d4b4a..3e41979 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1120,16 +1120,31 @@ public abstract class GridAbstractTest extends TestCase 
{
         if (gridName != null && gridName.matches(".*\\d")) {
             String idStr = UUID.randomUUID().toString();
 
-            char[] chars = idStr.toCharArray();
+            if (gridName.startsWith(getTestGridName())) {
+                String idxStr = String.valueOf(getTestGridIndex(gridName));
 
-            chars[0] = gridName.charAt(gridName.length() - 1);
-            chars[1] = '0';
+                while (idxStr.length() < 5)
+                    idxStr = '0' + idxStr;
 
-            chars[chars.length - 3] = '0';
-            chars[chars.length - 2] = '0';
-            chars[chars.length - 1] = gridName.charAt(gridName.length() - 1);
+                char[] chars = idStr.toCharArray();
 
-            cfg.setNodeId(UUID.fromString(new String(chars)));
+                for (int i = 0; i < idxStr.length(); i++)
+                    chars[chars.length - idxStr.length() + i] = 
idxStr.charAt(i);
+
+                cfg.setNodeId(UUID.fromString(new String(chars)));
+            }
+            else {
+                char[] chars = idStr.toCharArray();
+
+                chars[0] = gridName.charAt(gridName.length() - 1);
+                chars[1] = '0';
+
+                chars[chars.length - 3] = '0';
+                chars[chars.length - 2] = '0';
+                chars[chars.length - 1] = gridName.charAt(gridName.length() - 
1);
+
+                cfg.setNodeId(UUID.fromString(new String(chars)));
+            }
         }
 
         if (isMultiJvm())

Reply via email to