Merging IGNITE-1171 - fixed problems with custom events in discovery

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f3ef6a8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f3ef6a8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f3ef6a8

Branch: refs/heads/master
Commit: 6f3ef6a84ee1c3e77d32ca9930835d1720918e20
Parents: 517d0f5
Author: Yakov Zhdanov <yzhda...@gridgain.com>
Authored: Wed Sep 23 16:36:15 2015 +0300
Committer: Yakov Zhdanov <yzhda...@gridgain.com>
Committed: Wed Sep 23 16:36:15 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/IgniteAtomicLong.java     |   2 +-
 .../cache/DynamicCacheDescriptor.java           |  10 +-
 .../GridCachePartitionExchangeManager.java      |   6 +
 .../processors/cache/GridCacheProcessor.java    |  18 +-
 .../continuous/CacheContinuousQueryManager.java |  10 +-
 .../communication/tcp/TcpCommunicationSpi.java  |   7 +-
 .../discovery/DiscoverySpiCustomMessage.java    |  12 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 301 ++++++++++++++----
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   6 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   2 +-
 .../tcp/internal/TcpDiscoveryNodesRing.java     |  94 ++----
 .../messages/TcpDiscoveryDiscardMessage.java    |  15 +-
 .../TcpDiscoveryNodeAddFinishedMessage.java     |   2 +-
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  19 +-
 .../distributed/CacheAffEarlySelfTest.java      | 245 ---------------
 .../distributed/CacheAffinityEarlyTest.java     | 168 ++++++++++
 ...GridCacheValueConsistencyAtomicSelfTest.java |   2 +-
 .../tcp/TcpDiscoveryMultiThreadedTest.java      |  53 ++--
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 315 ++++++++++++++++++-
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 20 files changed, 864 insertions(+), 425 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
index 83e2525..bac1a68 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
@@ -160,4 +160,4 @@ public interface IgniteAtomicLong extends Closeable {
      * @throws IgniteException If operation failed.
      */
     @Override public void close();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index f3c3be9..3cfc34e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -39,9 +39,6 @@ public class DynamicCacheDescriptor {
     @GridToStringExclude
     private CacheConfiguration cacheCfg;
 
-    /** Cancelled flag. */
-    private boolean cancelled;
-
     /** Locally configured flag. */
     private boolean locCfg;
 
@@ -156,6 +153,13 @@ public class DynamicCacheDescriptor {
     }
 
     /**
+     * @return Started flag.
+     */
+    public boolean started() {
+        return started;
+    }
+
+    /**
      * @return Cache configuration.
      */
     public CacheConfiguration cacheConfiguration() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 34c571c..eb76233 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1435,6 +1435,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         private static final long serialVersionUID = 0L;
 
         /** */
+        @GridToStringInclude
         private AffinityTopologyVersion topVer;
 
         /**
@@ -1455,5 +1456,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
             return done;
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AffinityReadyFuture.class, this, 
super.toString());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e92ea57..74124bf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1522,10 +1522,15 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @return Collection of started cache names.
      */
     public Collection<String> cacheNames() {
-        return F.viewReadOnly(registeredCaches.keySet(),
-            new IgniteClosure<String, String>() {
-                @Override public String apply(String s) {
-                    return unmaskNull(s);
+        return F.viewReadOnly(registeredCaches.values(),
+            new IgniteClosure<DynamicCacheDescriptor, String>() {
+                @Override public String apply(DynamicCacheDescriptor desc) {
+                    return desc.cacheConfiguration().getName();
+                }
+            },
+            new IgnitePredicate<DynamicCacheDescriptor>() {
+                @Override public boolean apply(DynamicCacheDescriptor desc) {
+                    return desc.started();
                 }
             });
     }
@@ -1568,6 +1573,11 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 req.deploymentId(),
                 topVer
             );
+
+            DynamicCacheDescriptor desc = 
registeredCaches.get(maskNull(req.cacheName()));
+
+            if (desc != null)
+                desc.onStart();
         }
 
         // Start statically configured caches received from remote nodes 
during exchange.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index da02b97..c719f1e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -448,8 +448,12 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
             taskNameHash,
             skipPrimaryCheck);
 
-        UUID id = cctx.kernalContext().continuous().startRoutine(hnd, bufSize, 
timeInterval,
-            autoUnsubscribe, grp.predicate()).get();
+        UUID id = cctx.kernalContext().continuous().startRoutine(
+            hnd,
+            bufSize,
+            timeInterval,
+            autoUnsubscribe,
+            grp.predicate()).get();
 
         if (notifyExisting) {
             final Iterator<GridCacheEntryEx> it = 
cctx.cache().allEntries().iterator();
@@ -811,4 +815,4 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2594213..c93d5af 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2126,8 +2126,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 if (failureDetectionTimeoutEnabled() && (e instanceof 
HandshakeTimeoutException ||
                     timeoutHelper.checkFailureTimeoutReached(e))) {
-                    log.debug("Handshake timed out (failure threshold reached) 
[failureDetectionTimeout=" +
-                        failureDetectionTimeout() + ", err=" + e.getMessage() 
+ ", client=" + client + ']');
+                    if (log.isDebugEnabled())
+                        log.debug("Handshake timed out (failure threshold 
reached) [failureDetectionTimeout=" +
+                            failureDetectionTimeout() + ", err=" + 
e.getMessage() + ", client=" + client + ']');
 
                     throw e;
                 }
@@ -2700,7 +2701,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      *
      * FOR TEST PURPOSES ONLY!!!
      */
-    void simulateNodeFailure() {
+    public void simulateNodeFailure() {
         if (nioSrvr != null)
             nioSrvr.stop();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index 373c121..a0f9b75 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,13 +18,15 @@
 package org.apache.ignite.spi.discovery;
 
 import java.io.Serializable;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Message to send across ring.
  *
- * @see 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager#sendCustomEvent(
- * org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage)
+ * @see GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)
  */
 public interface DiscoverySpiCustomMessage extends Serializable {
     /**
@@ -36,4 +38,4 @@ public interface DiscoverySpiCustomMessage extends 
Serializable {
      * @return {@code true} if message can be modified during listener 
notification. Changes will be send to next nodes.
      */
     public boolean isMutable();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8a205d2..d8ee953 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -37,10 +37,13 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Queue;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -64,6 +67,7 @@ import 
org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -145,7 +149,7 @@ import static 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
 /**
  *
  */
-@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+@SuppressWarnings("All")
 class ServerImpl extends TcpDiscoveryImpl {
     /** */
     private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 
1, 2000, TimeUnit.MILLISECONDS,
@@ -1368,8 +1372,13 @@ class ServerImpl extends TcpDiscoveryImpl {
      * @param msgs Messages to include.
      * @param discardMsgId Discarded message ID.
      */
-    private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID 
destNodeId,
-        @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable 
IgniteUuid discardMsgId) {
+    private void prepareNodeAddedMessage(
+        TcpDiscoveryAbstractMessage msg,
+        UUID destNodeId,
+        @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+        @Nullable IgniteUuid discardMsgId,
+        @Nullable IgniteUuid discardCustomMsgId
+        ) {
         assert destNodeId != null;
 
         if (msg instanceof TcpDiscoveryNodeAddedMessage) {
@@ -1393,7 +1402,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 nodeAddedMsg.topology(topToSnd);
-                nodeAddedMsg.messages(msgs, discardMsgId);
+                nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
 
                 Map<Long, Collection<ClusterNode>> hist;
 
@@ -1416,7 +1425,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             nodeAddedMsg.topology(null);
             nodeAddedMsg.topologyHistory(null);
-            nodeAddedMsg.messages(null, null);
+            nodeAddedMsg.messages(null, null, null);
         }
     }
 
@@ -1825,7 +1834,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private TcpDiscoveryAbstractMessage 
prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
-                prepareNodeAddedMessage(msg, destNodeId, null, null);
+                prepareNodeAddedMessage(msg, destNodeId, null, null, null);
 
             return msg;
         }
@@ -1834,16 +1843,22 @@ class ServerImpl extends TcpDiscoveryImpl {
     /**
      * Pending messages container.
      */
-    private static class PendingMessages {
+    private static class PendingMessages implements 
Iterable<TcpDiscoveryAbstractMessage> {
         /** */
         private static final int MAX = 1024;
 
         /** Pending messages. */
         private final Queue<TcpDiscoveryAbstractMessage> msgs = new 
ArrayDeque<>(MAX * 2);
 
+        /** Processed custom message IDs. */
+        private Set<IgniteUuid> procCustomMsgs = new 
GridBoundedLinkedHashSet<IgniteUuid>(MAX * 2);
+
         /** Discarded message ID. */
         private IgniteUuid discardId;
 
+        /** Discarded message ID. */
+        private IgniteUuid customDiscardId;
+
         /**
          * Adds pending message and shrinks queue if it exceeds limit
          * (messages that were not discarded yet are never removed).
@@ -1869,31 +1884,118 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msgs Message.
          * @param discardId Discarded message ID.
          */
-        void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, 
@Nullable IgniteUuid discardId) {
+        void reset(
+            @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+            @Nullable IgniteUuid discardId,
+            @Nullable IgniteUuid customDiscardId
+        ) {
             this.msgs.clear();
 
             if (msgs != null)
                 this.msgs.addAll(msgs);
 
             this.discardId = discardId;
+            this.customDiscardId = customDiscardId;
         }
 
         /**
-         * Clears pending messages.
+         * Discards message with provided ID and all before it.
+         *
+         * @param id Discarded message ID.
          */
-        void clear() {
-            msgs.clear();
+        void discard(IgniteUuid id, boolean custom) {
+            if (custom)
+                customDiscardId = id;
+            else
+                discardId = id;
+        }
 
-            discardId = null;
+        /**
+         * Gets iterator for non-discarded messages.
+         *
+         * @return Non-discarded messages iterator.
+         */
+        public Iterator<TcpDiscoveryAbstractMessage> iterator() {
+            return new SkipIterator();
         }
 
         /**
-         * Discards message with provided ID and all before it.
          *
-         * @param id Discarded message ID.
          */
-        void discard(IgniteUuid id) {
-            discardId = id;
+        private class SkipIterator implements 
Iterator<TcpDiscoveryAbstractMessage> {
+            /** Skip non-custom messages flag. */
+            private boolean skipMsg = discardId != null;
+
+            /** Skip custom messages flag. */
+            private boolean skipCustomMsg;
+
+            /** Internal iterator. */
+            private Iterator<TcpDiscoveryAbstractMessage> msgIt = 
msgs.iterator();
+
+            /** Next message. */
+            private TcpDiscoveryAbstractMessage next;
+
+            {
+                advance();
+            }
+
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return next != null;
+            }
+
+            /** {@inheritDoc} */
+            @Override public TcpDiscoveryAbstractMessage next() {
+                if (next == null)
+                    throw new NoSuchElementException();
+
+                TcpDiscoveryAbstractMessage next0 = next;
+
+                advance();
+
+                return next0;
+            }
+
+            /** {@inheritDoc} */
+            @Override public void remove() {
+                throw new UnsupportedOperationException();
+            }
+
+            /**
+             * Advances iterator to the next available item.
+             */
+            private void advance() {
+                next = null;
+
+                while (msgIt.hasNext()) {
+                    TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+
+                    if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
+                        if (skipCustomMsg) {
+                            assert customDiscardId != null;
+
+                            if (F.eq(customDiscardId, msg0.id()))
+                                skipCustomMsg = false;
+
+                            continue;
+                        }
+                    }
+                    else {
+                        if (skipMsg) {
+                            assert discardId != null;
+
+                            if (F.eq(discardId, msg0.id()))
+                                skipMsg = false;
+
+                            continue;
+                        }
+                    }
+
+                    next = msg0;
+
+                    break;
+                }
+            }
         }
     }
 
@@ -1941,6 +2043,12 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Connection check threshold. */
         private long connCheckThreshold;
 
+        /** Pending custom messages that should not be sent between NodeAdded 
and NodeAddFinished messages. */
+        private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new 
ArrayDeque<>();
+
+        /** Collection to track joining nodes. */
+        private Set<UUID> joiningNodes = new HashSet<>();
+
         /**
          */
         protected RingMessageWorker() {
@@ -2046,6 +2154,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             sendHeartbeatMessage();
 
             checkHeartbeatsReceiving();
+
+            checkPendingCustomMessages();
         }
 
         /**
@@ -2323,20 +2433,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     debugLog("Pending messages will be sent 
[failure=" + failure +
                                         ", forceSndPending=" + forceSndPending 
+ ']');
 
-                                boolean skip = pendingMsgs.discardId != null;
-
-                                for (TcpDiscoveryAbstractMessage pendingMsg : 
pendingMsgs.msgs) {
-                                    if (skip) {
-                                        if 
(pendingMsg.id().equals(pendingMsgs.discardId))
-                                            skip = false;
-
-                                        continue;
-                                    }
-
+                                for (TcpDiscoveryAbstractMessage pendingMsg : 
pendingMsgs) {
                                     long tstamp = U.currentTimeMillis();
 
                                     prepareNodeAddedMessage(pendingMsg, 
next.id(), pendingMsgs.msgs,
-                                        pendingMsgs.discardId);
+                                        pendingMsgs.discardId, 
pendingMsgs.customDiscardId);
 
                                     if (timeoutHelper == null)
                                         timeoutHelper = new 
IgniteSpiOperationTimeoutHelper(spi);
@@ -2354,13 +2455,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     int res = spi.readReceipt(sock, 
timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                     if (log.isDebugEnabled())
-                                        log.debug("Pending message has been 
sent to next node [msg=" + msg.id() +
-                                            ", pendingMsgId=" + pendingMsg + 
", next=" + next.id() +
+                                        log.debug("Pending message has been 
sent to next node [msgId=" + msg.id() +
+                                            ", pendingMsgId=" + 
pendingMsg.id() + ", next=" + next.id() +
                                             ", res=" + res + ']');
 
                                     if (debugMode)
-                                        debugLog("Pending message has been 
sent to next node [msg=" + msg.id() +
-                                            ", pendingMsgId=" + pendingMsg + 
", next=" + next.id() +
+                                        debugLog("Pending message has been 
sent to next node [msgId=" + msg.id() +
+                                            ", pendingMsgId=" + 
pendingMsg.id() + ", next=" + next.id() +
                                             ", res=" + res + ']');
 
                                     // Resetting timeout control object to 
create a new one for the next bunch of
@@ -2377,7 +2478,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     msg = new 
TcpDiscoveryStatusCheckMessage(locNode, null);
                             }
                             else
-                                prepareNodeAddedMessage(msg, next.id(), 
pendingMsgs.msgs, pendingMsgs.discardId);
+                                prepareNodeAddedMessage(msg, next.id(), 
pendingMsgs.msgs, pendingMsgs.discardId,
+                                    pendingMsgs.customDiscardId);
 
                             try {
                                 long tstamp = U.currentTimeMillis();
@@ -2478,21 +2580,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                     }
 
-                    if (msg instanceof TcpDiscoveryStatusCheckMessage) {
-                        TcpDiscoveryStatusCheckMessage msg0 = 
(TcpDiscoveryStatusCheckMessage)msg;
-
-                        if (next.id().equals(msg0.failedNodeId())) {
-                            next = null;
-
-                            if (log.isDebugEnabled())
-                                log.debug("Discarding status check since next 
node has indeed failed [next=" + next +
-                                    ", msg=" + msg + ']');
-
-                            // Discard status check message by exiting loop 
and handle failure.
-                            break;
-                        }
-                    }
-
                     next = null;
 
                     searchNext = true;
@@ -2524,6 +2611,29 @@ class ServerImpl extends TcpDiscoveryImpl {
                 for (TcpDiscoveryNode n : failedNodes)
                     msgWorker.addMessage(new 
TcpDiscoveryNodeFailedMessage(locNodeId, n.id(), n.internalOrder()));
 
+                if (!sent) {
+                    if (log.isDebugEnabled())
+                        log.debug("Pending messages will be resent to local 
node");
+
+                    if (debugMode)
+                        log.debug("Pending messages will be resent to local 
node");
+
+                    for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) 
{
+                        prepareNodeAddedMessage(pendingMsg, locNodeId, 
pendingMsgs.msgs, pendingMsgs.discardId,
+                            pendingMsgs.customDiscardId);
+
+                        msgWorker.addMessage(pendingMsg);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Pending message has been sent to local 
node [msg=" + msg.id() +
+                                ", pendingMsgId=" + pendingMsg + ", next=" + 
next.id() + ']');
+
+                        if (debugMode)
+                            debugLog("Pending message has been sent to local 
node [msg=" + msg.id() +
+                                ", pendingMsgId=" + pendingMsg + ", next=" + 
next.id() + ']');
+                    }
+                }
+
                 LT.warn(log, null, "Local node has detected failed nodes and 
started cluster-wide procedure. " +
                         "To speed up failure detection please see 'Failure 
Detection' section under javadoc" +
                         " for 'TcpDiscoverySpi'");
@@ -3077,7 +3187,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     processNodeAddFinishedMessage(addFinishMsg);
 
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, 
msg.id()));
+                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, 
msg.id(), false));
 
                     return;
                 }
@@ -3118,6 +3228,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
+                joiningNodes.add(node.id());
+
                 if (!isLocalNodeCoordinator() && spi.nodeAuth != null && 
spi.nodeAuth.isGlobalNodeAuthentication()) {
                     boolean authFailed = true;
 
@@ -3222,6 +3334,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 n.visible(true);
                             }
 
+                            joiningNodes.clear();
+
                             locNode.setAttributes(node.attributes());
 
                             locNode.visible(true);
@@ -3237,10 +3351,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                             topHist.clear();
                             topHist.putAll(msg.topologyHistory());
 
-                            pendingMsgs.discard(msg.discardedMessageId());
+                            pendingMsgs.reset(msg.messages(), 
msg.discardedMessageId(),
+                                msg.discardedCustomMessageId());
 
                             // Clear data to minimize message size.
-                            msg.messages(null, null);
+                            msg.messages(null, null, null);
                             msg.topology(null);
                             msg.topologyHistory(null);
                             msg.clearDiscoveryData();
@@ -3307,7 +3422,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (msg.verified()) {
                     spi.stats.onRingMessageReceived(msg);
 
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, 
msg.id()));
+                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, 
msg.id(), false));
 
                     return;
                 }
@@ -3342,7 +3457,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() 
== CONNECTED && fireEvt) {
+            joiningNodes.remove(nodeId);
+
+            TcpDiscoverySpiState state = spiStateCopy();
+
+            if (msg.verified() && !locNodeId.equals(nodeId) && state != 
CONNECTING && fireEvt) {
                 spi.stats.onNodeJoined();
 
                 // Make sure that node with greater order will never get 
EVT_NODE_JOINED
@@ -3357,7 +3476,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     boolean b = ring.topologyVersion(topVer);
 
                     assert b : "Topology version has not been updated: [ring=" 
+ ring + ", msg=" + msg +
-                        ", lastMsg=" + lastMsg + ", spiState=" + 
spiStateCopy() + ']';
+                        ", lastMsg=" + lastMsg + ", spiState=" + state + ']';
 
                     if (log.isDebugEnabled())
                         log.debug("Topology version has been updated: [ring=" 
+ ring + ", msg=" + msg + ']');
@@ -3365,7 +3484,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     lastMsg = msg;
                 }
 
-                notifyDiscovery(EVT_NODE_JOINED, topVer, node);
+                if (state == CONNECTED)
+                    notifyDiscovery(EVT_NODE_JOINED, topVer, node);
 
                 try {
                     if (spi.ipFinder.isShared() && locNodeCoord)
@@ -3381,7 +3501,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (msg.verified() && locNodeId.equals(nodeId) && spiStateCopy() 
== CONNECTING) {
+            if (msg.verified() && locNodeId.equals(nodeId) && state == 
CONNECTING) {
                 assert node != null;
 
                 assert topVer > 0 : "Invalid topology version: " + msg;
@@ -3402,6 +3522,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             if (ring.hasRemoteNodes())
                 sendMessageAcrossRing(msg);
+
+            checkPendingCustomMessages();
         }
 
         /**
@@ -3481,7 +3603,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (msg.verified()) {
                     spi.stats.onRingMessageReceived(msg);
 
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, 
msg.id()));
+                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, 
msg.id(), false));
 
                     return;
                 }
@@ -3553,6 +3675,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
                 }
 
+                joiningNodes.remove(leftNode.id());
+
                 spi.stats.onNodeLeft();
 
                 notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode);
@@ -3580,6 +3704,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 U.closeQuiet(sock);
             }
+
+            checkPendingCustomMessages();
         }
 
         /**
@@ -3650,7 +3776,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (msg.verified()) {
                     spi.stats.onRingMessageReceived(msg);
 
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, 
msg.id()));
+                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, 
msg.id(), false));
 
                     return;
                 }
@@ -3707,6 +3833,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         ", msg=" + msg.warning() + ']');
                 }
 
+                joiningNodes.remove(node.id());
+
                 notifyDiscovery(EVT_NODE_FAILED, topVer, node);
 
                 spi.stats.onNodeFailed();
@@ -3720,6 +3848,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 U.closeQuiet(sock);
             }
+
+            checkPendingCustomMessages();
         }
 
         /**
@@ -4046,7 +4176,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (msg.verified())
-                pendingMsgs.discard(msgId);
+                pendingMsgs.discard(msgId, msg.customMessageDiscard());
 
             if (ring.hasRemoteNodes())
                 sendMessageAcrossRing(msg);
@@ -4098,18 +4228,23 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
             if (isLocalNodeCoordinator()) {
-                boolean sndNext;
+                if (!joiningNodes.isEmpty()) {
+                    pendingCustomMsgs.add(msg);
 
-                if (!msg.verified()) {
+                    return;
+                }
+
+                boolean sndNext = !msg.verified();
+
+                if (sndNext) {
                     msg.verify(getLocalNodeId());
                     msg.topologyVersion(ring.topologyVersion());
 
-                    notifyDiscoveryListener(msg);
-
-                    sndNext = true;
+                    if (pendingMsgs.procCustomMsgs.add(msg.id()))
+                        notifyDiscoveryListener(msg);
+                    else
+                        sndNext = false;
                 }
-                else
-                    sndNext = false;
 
                 if (sndNext && ring.hasRemoteNodes())
                     sendMessageAcrossRing(msg);
@@ -4139,12 +4274,30 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                     }
 
-                    addMessage(new 
TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
+                    addMessage(new 
TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
                 }
             }
             else {
-                if (msg.verified())
+                TcpDiscoverySpiState state0;
+
+                synchronized (mux) {
+                    state0 = spiState;
+                }
+
+                if (msg.verified() && msg.topologyVersion() != 
ring.topologyVersion()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding custom event message [msg=" + 
msg + ", ring=" + ring + ']');
+
+                    return;
+                }
+
+                if (msg.verified() && state0 == CONNECTED && 
pendingMsgs.procCustomMsgs.add(msg.id())) {
+                    assert joiningNodes.isEmpty() : "Joining nodes: " + 
joiningNodes + ", msg=" + msg + ", loc=" + locNode.id() +
+                        ", topver=" + ring.topologyVersion();
+                    assert msg.topologyVersion() == ring.topologyVersion() : 
"msg: " + msg + ", topver=" + ring.topologyVersion();
+
                     notifyDiscoveryListener(msg);
+                }
 
                 if (ring.hasRemoteNodes())
                     sendMessageAcrossRing(msg);
@@ -4152,6 +4305,18 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * Checks and flushes custom event messages if no nodes are attempting 
to join the grid.
+         */
+        private void checkPendingCustomMessages() {
+            if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) {
+                TcpDiscoveryCustomEventMessage msg;
+
+                while ((msg = pendingCustomMsgs.poll()) != null)
+                    processCustomMessage(msg);
+            }
+        }
+
+        /**
          * @param msg Custom message.
          */
         private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage 
msg) {
@@ -5081,7 +5246,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             log.debug("Redirecting message to client [sock=" + 
sock + ", locNodeId="
                                 + getLocalNodeId() + ", rmtNodeId=" + 
clientNodeId + ", msg=" + msg + ']');
 
-                        prepareNodeAddedMessage(msg, clientNodeId, null, null);
+                        prepareNodeAddedMessage(msg, clientNodeId, null, null, 
null);
 
                         writeToSocket(sock, msg, 
spi.failureDetectionTimeoutEnabled() ?
                             spi.failureDetectionTimeout() : 
spi.getSocketTimeout());

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index e5be530..2786d0b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -313,4 +313,4 @@ abstract class TcpDiscoveryImpl {
 
         return res;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 80fcc46..6254605 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2038,4 +2038,4 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi, T
             return S.toString(SocketTimeoutObject.class, this);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index 2b17696..7ca092c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -17,7 +17,17 @@
 
 package org.apache.ignite.spi.discovery.tcp.internal;
 
-import java.util.ArrayList;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.PN;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.Nullable;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -29,16 +39,6 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.PN;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Convenient way to represent topology for {@link 
org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
@@ -81,6 +81,9 @@ public class TcpDiscoveryNodesRing {
     /** */
     private long nodeOrder;
 
+    /** */
+    private long maxInternalOrder;
+
     /** Lock. */
     @GridToStringExclude
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -99,6 +102,8 @@ public class TcpDiscoveryNodesRing {
             this.locNode = locNode;
 
             clear();
+
+            maxInternalOrder = locNode.internalOrder();
         }
         finally {
             rwLock.writeLock().unlock();
@@ -204,7 +209,9 @@ public class TcpDiscoveryNodesRing {
             if (nodesMap.containsKey(node.id()))
                 return false;
 
-            assert node.internalOrder() > maxInternalOrder() : "Adding node to 
the middle of the ring " +
+            long maxInternalOrder0 = maxInternalOrder();
+
+            assert node.internalOrder() > maxInternalOrder0 : "Adding node to 
the middle of the ring " +
                 "[ring=" + this + ", node=" + node + ']';
 
             nodesMap.put(node.id(), node);
@@ -216,6 +223,8 @@ public class TcpDiscoveryNodesRing {
             nodes.add(node);
 
             nodeOrder = node.internalOrder();
+
+            maxInternalOrder = node.internalOrder();
         }
         finally {
             rwLock.writeLock().unlock();
@@ -231,9 +240,13 @@ public class TcpDiscoveryNodesRing {
         rwLock.readLock().lock();
 
         try {
-            TcpDiscoveryNode last = nodes.last();
+            if (maxInternalOrder == 0) {
+                TcpDiscoveryNode last = nodes.last();
+
+                return last != null ? maxInternalOrder = last.internalOrder() 
: -1;
+            }
 
-            return last != null ? last.internalOrder() : -1;
+            return maxInternalOrder;
         }
         finally {
             rwLock.readLock().unlock();
@@ -336,47 +349,6 @@ public class TcpDiscoveryNodesRing {
     }
 
     /**
-     * Removes nodes from the topology.
-     *
-     * @param nodeIds IDs of the nodes to remove.
-     * @return Collection of removed nodes.
-     */
-    public Collection<TcpDiscoveryNode> removeNodes(Collection<UUID> nodeIds) {
-        assert !F.isEmpty(nodeIds);
-
-        rwLock.writeLock().lock();
-
-        try {
-            boolean firstRmv = true;
-
-            Collection<TcpDiscoveryNode> res = null;
-
-            for (UUID id : nodeIds) {
-                TcpDiscoveryNode rmv = nodesMap.remove(id);
-
-                if (rmv != null) {
-                    if (firstRmv) {
-                        nodes = new TreeSet<>(nodes);
-
-                        res = new ArrayList<>(nodeIds.size());
-
-                        firstRmv = false;
-                    }
-
-                    nodes.remove(rmv);
-
-                    res.add(rmv);
-                }
-            }
-
-            return res == null ? Collections.<TcpDiscoveryNode>emptyList() : 
res;
-        }
-        finally {
-            rwLock.writeLock().unlock();
-        }
-    }
-
-    /**
      * Removes all remote nodes, leaves only local node.
      * <p>
      * This should be called when SPI should be disconnected from topology and
@@ -397,6 +369,7 @@ public class TcpDiscoveryNodesRing {
                 nodesMap.put(locNode.id(), locNode);
 
             nodeOrder = 0;
+            maxInternalOrder = 0;
 
             topVer = 0;
         }
@@ -622,13 +595,8 @@ public class TcpDiscoveryNodesRing {
         rwLock.writeLock().lock();
 
         try {
-            if (nodeOrder == 0) {
-                TcpDiscoveryNode last = nodes.last();
-
-                assert last != null;
-
-                nodeOrder = last.internalOrder();
-            }
+            if (nodeOrder == 0)
+                nodeOrder = maxInternalOrder();
 
             return ++nodeOrder;
         }
@@ -681,4 +649,4 @@ public class TcpDiscoveryNodesRing {
             rwLock.readLock().unlock();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
index 1e1fa6b..145f19e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
@@ -32,16 +32,20 @@ public class TcpDiscoveryDiscardMessage extends 
TcpDiscoveryAbstractMessage {
     /** ID of the message to discard (this and all preceding). */
     private final IgniteUuid msgId;
 
+    /** True if this is discard ID for custom event message. */
+    private final boolean customMsgDiscard;
+
     /**
      * Constructor.
      *
      * @param creatorNodeId Creator node ID.
      * @param msgId Message ID.
      */
-    public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId) {
+    public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId, 
boolean customMsgDiscard) {
         super(creatorNodeId);
 
         this.msgId = msgId;
+        this.customMsgDiscard = customMsgDiscard;
     }
 
     /**
@@ -53,6 +57,15 @@ public class TcpDiscoveryDiscardMessage extends 
TcpDiscoveryAbstractMessage {
         return msgId;
     }
 
+    /**
+     * Flag indicating whether the ID to discard is for a custom message or 
not.
+     *
+     * @return Custom message flag.
+     */
+    public boolean customMessageDiscard() {
+        return customMsgDiscard;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryDiscardMessage.class, this, "super", 
super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index c6a469f..1b99a56 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -101,4 +101,4 @@ public class TcpDiscoveryNodeAddFinishedMessage extends 
TcpDiscoveryAbstractMess
     @Override public String toString() {
         return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, 
"super", super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 01c6789..5a7146d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -48,6 +48,9 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractMessage {
     /** Discarded message ID. */
     private IgniteUuid discardMsgId;
 
+    /** Discarded message ID. */
+    private IgniteUuid discardCustomMsgId;
+
     /** Current topology. Initialized by coordinator. */
     @GridToStringInclude
     private Collection<TcpDiscoveryNode> top;
@@ -117,14 +120,28 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * Gets discarded custom message ID.
+     *
+     * @return Discarded message ID.
+     */
+    @Nullable public IgniteUuid discardedCustomMessageId() {
+        return discardCustomMsgId;
+    }
+
+    /**
      * Sets pending messages to send to new node.
      *
      * @param msgs Pending messages to send to new node.
      * @param discardMsgId Discarded message ID.
      */
-    public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage> 
msgs, @Nullable IgniteUuid discardMsgId) {
+    public void messages(
+        @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+        @Nullable IgniteUuid discardMsgId,
+        @Nullable IgniteUuid discardCustomMsgId
+    ) {
         this.msgs = msgs;
         this.discardMsgId = discardMsgId;
+        this.discardCustomMsgId = discardCustomMsgId;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
deleted file mode 100644
index 7f0ca11..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Random;
-import java.util.UUID;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteFutureTimeoutException;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class CacheAffEarlySelfTest extends GridCommonAbstractTest {
-    /** Grid count. */
-    private static int GRID_CNT = 8;
-
-    /** Operation timeout. */
-    private static long OP_TIMEOUT = 5000;
-
-    /** Always dump threads or only once per operation. */
-    private static boolean ALWAYS_DUMP_THREADS = false;
-
-    /** Stopped. */
-    private volatile boolean stopped;
-
-    /** Iteration. */
-    private int iters = 10;
-
-    /** Concurrent. */
-    private boolean concurrent = true;
-
-    /** Futs. */
-    private Collection<IgniteInternalFuture<?>> futs = new 
ArrayList<>(GRID_CNT);
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(true);
-        
finder.setAddresses(Collections.singletonList("127.0.0.1:47500..47510"));
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-        discoSpi.setIpFinder(finder);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        OptimizedMarshaller marsh = new OptimizedMarshaller();
-        marsh.setRequireSerializable(false);
-
-        cfg.setMarshaller(marsh);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return 6 * 60 * 1000L;
-    }
-
-    /**
-     *
-     */
-    public void testStartNodes() throws Exception {
-        for (int i = 0; i < iters; i++) {
-            try {
-                System.out.println("*** Iteration " + (i + 1) + '/' + iters);
-
-                IgniteInternalFuture<?> fut = multithreadedAsync(new 
Runnable() {
-                    @Override public void run() {
-                        try {
-                            doTest();
-                        }
-                        catch (Exception e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }, 1);
-
-                fut.get(30000);
-            }
-            catch (IgniteFutureTimeoutCheckedException e) {
-                // No-op.
-            }
-            finally {
-                stopAllGrids(true);
-            }
-        }
-    }
-
-    /**
-     *
-     */
-    public void doTest() throws Exception {
-        for (int i = 0; i < GRID_CNT; i++) {
-            final int idx = i;
-
-            final Ignite grid = concurrent ? null : startGrid(idx);
-
-            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-                @Override public void run() {
-                    Random rnd = new Random();
-
-                    try {
-                        final Ignite ignite = grid == null ? startGrid(idx) : 
grid;
-
-                        final IgniteCache<UUID, UUID> cache = 
getCache(ignite).withAsync();
-
-                        CacheAffEarlySelfTest.this.execute(cache, new 
IgniteInClosure<IgniteCache<UUID,UUID>>() {
-                            @Override public void apply(IgniteCache<UUID, 
UUID> entries) {
-                                cache.put(ignite.cluster().localNode().id(), 
UUID.randomUUID());
-                            }
-                        });
-
-                        while (!stopped) {
-                            int val = Math.abs(rnd.nextInt(100));
-                            if (val >= 0 && val < 40)
-                                execute(cache, new 
IgniteInClosure<IgniteCache<UUID, UUID>>() {
-                                    @Override public void 
apply(IgniteCache<UUID, UUID> entries) {
-                                        
cache.containsKey(ignite.cluster().localNode().id());
-                                    }
-                                });
-                            else if (val >= 40 && val < 80)
-                                execute(cache, new 
IgniteInClosure<IgniteCache<UUID, UUID>>() {
-                                    @Override public void 
apply(IgniteCache<UUID, UUID> entries) {
-                                        
cache.get(ignite.cluster().localNode().id());
-                                    }
-                                });
-                            else
-                                execute(cache, new 
IgniteInClosure<IgniteCache<UUID, UUID>>() {
-                                    @Override public void 
apply(IgniteCache<UUID, UUID> entries) {
-                                        
cache.put(ignite.cluster().localNode().id(), UUID.randomUUID());
-                                    }
-                                });
-
-                            Thread.sleep(50);
-                        }
-                    }
-                    catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            }, 1);
-
-            futs.add(fut);
-        }
-
-        Thread.sleep(10000);
-
-        stopped = true;
-
-        for (IgniteInternalFuture<?> fut : futs)
-            fut.get();
-    }
-
-    /**
-     * @param cache Cache.
-     * @param c Closure.
-     */
-    private void execute(IgniteCache<UUID, UUID> cache, 
IgniteInClosure<IgniteCache<UUID, UUID>> c) {
-        c.apply(cache);
-
-        IgniteFuture<Object> fut = cache.future();
-
-        boolean success = false;
-
-        int iter = 0;
-
-        while (!success) {
-            try {
-                fut.get(OP_TIMEOUT);
-
-                success = true;
-            }
-            catch (IgniteFutureTimeoutException e) {
-                debug(iter == 0 || ALWAYS_DUMP_THREADS);
-            }
-
-            iter++;
-        }
-    }
-
-    /**
-     *
-     */
-    private void debug(boolean dumpThreads) {
-        log.info("DUMPING DEBUG INFO:");
-
-        for (Ignite ignite : G.allGrids())
-            ((IgniteKernal)ignite).dumpDebugInfo();
-
-        if (dumpThreads) {
-            U.dumpThreads(null);
-
-            U.dumpThreads(log);
-        }
-    }
-
-    /**
-     * @param grid Grid.
-     */
-    private IgniteCache<UUID, UUID> getCache(Ignite grid) {
-        CacheConfiguration<UUID, UUID> ccfg = defaultCacheConfiguration();
-
-        ccfg.setCacheMode(CacheMode.PARTITIONED);
-        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
-        ccfg.setBackups(1);
-        ccfg.setNearConfiguration(null);
-
-        return grid.getOrCreateCache(ccfg);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java
new file mode 100644
index 0000000..6b67139
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheAffinityEarlyTest extends GridCommonAbstractTest {
+    /** Grid count. */
+    private static int GRID_CNT = 8;
+
+    /** Stopped. */
+    private volatile boolean stopped;
+
+    /** Iteration. */
+    private static final int iters = 10;
+
+    /** Concurrent. */
+    private static final boolean concurrent = true;
+
+    /** Futs. */
+    private Collection<IgniteInternalFuture<?>> futs = new 
ArrayList<>(GRID_CNT);
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        discoSpi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        OptimizedMarshaller marsh = new OptimizedMarshaller();
+        marsh.setRequireSerializable(false);
+
+        cfg.setMarshaller(marsh);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 6 * 60 * 1000L;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartNodes() throws Exception {
+        for (int i = 0; i < iters; i++) {
+            try {
+                log.info("Iteration: " + (i + 1) + '/' + iters);
+
+                doTest();
+            }
+            finally {
+                stopAllGrids(true);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTest() throws Exception {
+        final AtomicBoolean failed = new AtomicBoolean();
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            final int idx = i;
+
+            final Ignite grid = concurrent ? null : startGrid(idx);
+
+            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+                @Override public void run() {
+                    Random rnd = new Random();
+
+                    try {
+                        Ignite ignite = grid == null ? startGrid(idx) : grid;
+
+                        IgniteCache<Object, Object> cache = getCache(ignite);
+
+                        cache.put(ignite.cluster().localNode().id(), 
UUID.randomUUID());
+
+                        while (!stopped) {
+                            int val = Math.abs(rnd.nextInt(100));
+
+                            if (val >= 0 && val < 40)
+                                
cache.containsKey(ignite.cluster().localNode().id());
+                            else if (val >= 40 && val < 80)
+                                cache.get(ignite.cluster().localNode().id());
+                            else
+                                cache.put(ignite.cluster().localNode().id(), 
UUID.randomUUID());
+
+                            Thread.sleep(50);
+                        }
+                    }
+                    catch (Exception e) {
+                        log.error("Unexpected error: " + e, e);
+
+                        failed.set(true);
+                    }
+                }
+            }, 1);
+
+            futs.add(fut);
+        }
+
+        Thread.sleep(10_000);
+
+        stopped = true;
+
+        for (IgniteInternalFuture<?> fut : futs)
+            fut.get();
+
+        assertFalse(failed.get());
+    }
+
+    /**
+     * @param grid Grid.
+     * @return Cache.
+     */
+    private IgniteCache getCache(Ignite grid) {
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setCacheMode(CacheMode.PARTITIONED);
+        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+        ccfg.setBackups(1);
+        ccfg.setNearConfiguration(null);
+
+        return grid.getOrCreateCache(ccfg);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
index 7451911..18c8d8e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
@@ -35,4 +35,4 @@ public class GridCacheValueConsistencyAtomicSelfTest extends 
GridCacheValueConsi
     @Override protected int iterationCount() {
         return 100_000;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 16fa662..1ccbe1f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -102,7 +102,7 @@ public class TcpDiscoveryMultiThreadedTest extends 
GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
-        return 3 * 60 * 1000;
+        return 5 * 60 * 1000;
     }
 
     /**
@@ -249,35 +249,48 @@ public class TcpDiscoveryMultiThreadedTest extends 
GridCommonAbstractTest {
      * @throws Exception If any error occurs.
      */
     public void testMultipleStartOnCoordinatorStop() throws Exception{
-        clientFlagGlobal = false;
+        for (int k = 0; k < 3; k++) {
+            log.info("Iteration: " + k);
 
-        startGrids(GRID_CNT);
+            clientFlagGlobal = false;
 
-        final CyclicBarrier barrier = new CyclicBarrier(GRID_CNT + 4);
+            final int START_NODES = 5;
+            final int JOIN_NODES = 10;
 
-        final AtomicInteger startIdx = new AtomicInteger(GRID_CNT);
+            startGrids(START_NODES);
 
-        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Object>() {
-            @Override public Object call() throws Exception {
-                barrier.await();
+            final CyclicBarrier barrier = new CyclicBarrier(JOIN_NODES + 1);
 
-                Ignite ignite = startGrid(startIdx.getAndIncrement());
+            final AtomicInteger startIdx = new AtomicInteger(START_NODES);
 
-                assertFalse(ignite.configuration().isClientMode());
+            IgniteInternalFuture<?> fut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    int idx = startIdx.getAndIncrement();
 
-                log.info("Started node: " + ignite.name());
+                    Thread.currentThread().setName("start-thread-" + idx);
 
-                return null;
-            }
-        }, GRID_CNT + 3, "start-thread");
+                    barrier.await();
 
-        barrier.await();
+                    Ignite ignite = startGrid(idx);
 
-        U.sleep(ThreadLocalRandom.current().nextInt(10, 100));
+                    assertFalse(ignite.configuration().isClientMode());
 
-        for (int i = 0; i < GRID_CNT; i++)
-            stopGrid(i);
+                    log.info("Started node: " + ignite.name());
+
+                    return null;
+                }
+            }, JOIN_NODES, "start-thread");
 
-        fut.get();
+            barrier.await();
+
+            U.sleep(ThreadLocalRandom.current().nextInt(10, 100));
+
+            for (int i = 0; i < START_NODES; i++)
+                stopGrid(i);
+
+            fut.get();
+
+            stopAllGrids();
+        }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 981f649..0280e9c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -35,8 +35,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
@@ -45,6 +47,7 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
+import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
@@ -52,11 +55,14 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import 
org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
@@ -87,6 +93,9 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     /** */
     private UUID nodeId;
 
+    /** */
+    private TcpDiscoverySpi nodeSpi;
+
     /**
      * @throws Exception If fails.
      */
@@ -99,8 +108,11 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TcpDiscoverySpi spi = 
gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
-            new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
+        TcpDiscoverySpi spi = nodeSpi;
+
+        if (spi == null)
+            spi = 
gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
+                new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
 
         discoMap.put(gridName, spi);
 
@@ -1164,6 +1176,305 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed
+     */
+    public void testCustomEventRace1_1() throws Exception {
+        try {
+            customEventRace1(true, false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testCustomEventRace1_2() throws Exception {
+        try {
+            customEventRace1(false, false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testCustomEventRace1_3() throws Exception {
+        try {
+            customEventRace1(true, true);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param cacheStartFrom1 If {code true} starts cache from node1.
+     * @param stopCrd If {@code true} stops coordinator.
+     * @throws Exception If failed
+     */
+    private void customEventRace1(final boolean cacheStartFrom1, boolean 
stopCrd) throws Exception {
+        TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi();
+
+        nodeSpi = spi0;
+
+        final Ignite ignite0 = startGrid(0);
+
+        nodeSpi = new TestCustomEventRaceSpi();
+
+        final Ignite ignite1 = startGrid(1);
+
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+
+        spi0.nodeAdded1 = latch1;
+        spi0.nodeAdded2 = latch2;
+        spi0.debug = true;
+
+        IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                log.info("Start 2");
+
+                nodeSpi = new TestCustomEventRaceSpi();
+
+                Ignite ignite2 = startGrid(2);
+
+                return null;
+            }
+        });
+
+        latch1.await();
+
+        final String CACHE_NAME = "cache";
+
+        IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                CacheConfiguration ccfg = new CacheConfiguration();
+
+                ccfg.setName(CACHE_NAME);
+
+                Ignite ignite = cacheStartFrom1 ? ignite1 : ignite0;
+
+                ignite.createCache(ccfg);
+
+                return null;
+            }
+        });
+
+        if (stopCrd) {
+            spi0.stop = true;
+
+            latch2.countDown();
+
+            ignite0.close();
+        }
+        else {
+            U.sleep(500);
+
+            latch2.countDown();
+        }
+
+        fut1.get();
+        fut2.get();
+
+        IgniteCache<Object, Object> cache = grid(2).cache(CACHE_NAME);
+
+        assertNotNull(cache);
+
+        cache.put(1, 1);
+
+        assertEquals(1, cache.get(1));
+
+        nodeSpi = new TestCustomEventRaceSpi();
+
+        Ignite ignite = startGrid(3);
+
+        cache = ignite.cache(CACHE_NAME);
+
+        cache.put(2, 2);
+
+        assertEquals(1, cache.get(1));
+        assertEquals(2, cache.get(2));
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testCustomEventCoordinatorFailure1() throws Exception {
+        try {
+            customEventCoordinatorFailure(true);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testCustomEventCoordinatorFailure2() throws Exception {
+        try {
+            customEventCoordinatorFailure(false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param twoNodes If {@code true} starts two nodes, otherwise three.
+     * @throws Exception If failed
+     */
+    private void customEventCoordinatorFailure(boolean twoNodes) throws 
Exception {
+        TestCustomEventCoordinatorFailureSpi spi0 = new 
TestCustomEventCoordinatorFailureSpi();
+
+        nodeSpi = spi0;
+
+        Ignite ignite0 = startGrid(0);
+
+        nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+
+        Ignite ignite1 = startGrid(1);
+
+        nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+
+        Ignite ignite2 = twoNodes ? null : startGrid(2);
+
+        final Ignite createCacheNode = ignite2 != null ? ignite2 : ignite1;
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        spi0.latch = latch;
+
+        final String CACHE_NAME = "test-cache";
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                log.info("Create test cache");
+
+                CacheConfiguration ccfg = new CacheConfiguration();
+
+                ccfg.setName(CACHE_NAME);
+
+                createCacheNode.createCache(ccfg);
+
+                return null;
+            }
+        }, "create-cache-thread");
+
+        
((TcpCommunicationSpi)ignite0.configuration().getCommunicationSpi()).simulateNodeFailure();
+
+        latch.await();
+
+        ignite0.close();
+
+        fut.get();
+
+        IgniteCache<Object, Object> cache = grid(1).cache(CACHE_NAME);
+
+        assertNotNull(cache);
+
+        cache.put(1, 1);
+
+        assertEquals(1, cache.get(1));
+
+        log.info("Try start one more node.");
+
+        nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+
+        Ignite ignite = startGrid(twoNodes ? 2 : 3);
+
+        cache = ignite.cache(CACHE_NAME);
+
+        assertNotNull(cache);
+
+        cache.put(2, 2);
+
+        assertEquals(1, cache.get(1));
+        assertEquals(2, cache.get(2));
+    }
+
+    /**
+     *
+     */
+    private static class TestCustomEventCoordinatorFailureSpi extends 
TcpDiscoverySpi {
+        /** */
+        private volatile CountDownLatch latch;
+
+        /** */
+        private boolean stop;
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout, long timeout) throws IOException, 
IgniteCheckedException {
+            if (msg instanceof TcpDiscoveryCustomEventMessage && latch != 
null) {
+                log.info("Stop node on custom event: " + msg);
+
+                latch.countDown();
+
+                stop = true;
+            }
+
+            if (stop)
+                return;
+
+            super.writeToSocket(sock, msg, bout, timeout);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestCustomEventRaceSpi extends TcpDiscoverySpi {
+        /** */
+        private volatile CountDownLatch nodeAdded1;
+
+        /** */
+        private volatile CountDownLatch nodeAdded2;
+
+        /** */
+        private volatile boolean stop;
+
+        /** */
+        private boolean debug;
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout, long timeout) throws IOException, 
IgniteCheckedException {
+            if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                if (nodeAdded1 != null) {
+                    nodeAdded1.countDown();
+
+                    if (debug)
+                        log.info("--- Wait node added: " + msg);
+
+                    U.await(nodeAdded2);
+
+                    nodeAdded1 = null;
+                    nodeAdded2 = null;
+                }
+
+                if (stop)
+                    return;
+
+                if (debug)
+                    log.info("--- Send node added: " + msg);
+            }
+
+            if (debug && msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+                log.info("--- Send node finished: " + msg);
+
+            if (debug && msg instanceof TcpDiscoveryCustomEventMessage)
+                log.info("--- Send custom event: " + msg);
+
+            super.writeToSocket(sock, msg, bout, timeout);
+        }
+    }
+
+    /**
      * Starts new grid with given index. Method optimize is not invoked.
      *
      * @param idx Index of the grid to start.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 88977fb..289da3d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -74,6 +74,7 @@ import 
org.apache.ignite.internal.processors.cache.IgniteInternalCacheTypesTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionAtomicSelfTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionSelfTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest;
@@ -195,6 +196,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(IgniteCacheConfigurationDefaultTemplateTest.class);
         suite.addTestSuite(IgniteDynamicClientCacheStartSelfTest.class);
         suite.addTestSuite(IgniteDynamicCacheStartNoExchangeTimeoutTest.class);
+        suite.addTestSuite(CacheAffinityEarlyTest.class);
 
         suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);
 

Reply via email to