Forcible node drop makes cluster unstable in some cases. Disable forcible node 
drop by default.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e46cf958
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e46cf958
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e46cf958

Branch: refs/heads/ignite-5578-locJoin
Commit: e46cf9582bb05c22a45b868c2c78ea7ed4818d62
Parents: a71691a
Author: Andrey V. Mashenkov <[email protected]>
Authored: Tue Jul 18 15:49:00 2017 +0300
Committer: Andrey V. Mashenkov <[email protected]>
Committed: Tue Jul 18 15:49:00 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  6 ++
 .../service/GridServiceProcessor.java           |  4 +-
 .../spi/IgniteSpiOperationTimeoutHelper.java    |  8 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 81 ++++++++++++++------
 .../IgniteClientReconnectAbstractTest.java      |  5 ++
 ...niteBinaryMetadataUpdateNodeRestartTest.java | 10 +++
 .../IgniteCacheNearRestartRollbackSelfTest.java | 15 ++++
 ...teSynchronizationModesMultithreadedTest.java |  5 ++
 .../org/apache/ignite/spi/GridTcpForwarder.java | 26 +++++++
 .../tcp/TcpCommunicationSpiDropNodesTest.java   | 15 ++++
 .../TcpCommunicationSpiFaultyClientTest.java    | 20 +++--
 11 files changed, 160 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 616ac3f..5da7bd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -684,6 +684,12 @@ public final class IgniteSystemProperties {
         "IGNITE_PARTITION_RELEASE_FUTURE_DUMP_THRESHOLD";
 
     /**
+     * If this property is set, a node will forcible fail a remote node when 
it fails to establish a communication
+     * connection.
+     */
+    public static final String IGNITE_ENABLE_FORCIBLE_NODE_KILL = 
"IGNITE_ENABLE_FORCIBLE_NODE_KILL";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 23a29f8..db632ec 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1528,7 +1528,7 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
         private volatile AffinityTopologyVersion currTopVer = null;
 
         /** {@inheritDoc} */
-        @Override public void onEvent(DiscoveryEvent evt, final DiscoCache 
discoCache) {
+        @Override public void onEvent(final DiscoveryEvent evt, final 
DiscoCache discoCache) {
             GridSpinBusyLock busyLock = GridServiceProcessor.this.busyLock;
 
             if (busyLock == null || !busyLock.enterBusy())
@@ -1589,7 +1589,7 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
                                             log.info("Service processor 
detected a topology change during " +
                                                 "assignments calculation (will 
abort current iteration and " +
                                                 "re-calculate on the newer 
version): " +
-                                                "[topVer=" + topVer + ", 
newTopVer=" + currTopVer + ']');
+                                                "[topVer=" + topVer + ", 
newTopVer=" + currTopVer0 + ']');
 
                                         return;
                                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
index c685ea9..b2432ce 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -98,7 +98,9 @@ public class IgniteSpiOperationTimeoutHelper {
         if (!failureDetectionTimeoutEnabled)
             return false;
 
-        return e instanceof IgniteSpiOperationTimeoutException || e instanceof 
SocketTimeoutException ||
-            X.hasCause(e, IgniteSpiOperationTimeoutException.class, 
SocketException.class);
+        if (X.hasCause(e, IgniteSpiOperationTimeoutException.class, 
SocketTimeoutException.class, SocketException.class))
+            return true;
+
+        return (timeout - (U.currentTimeMillis() - lastOperStartTs) <= 0);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 5aca2f9..af12d3b 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -23,6 +23,7 @@ import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -341,6 +342,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
     /** */
     private ConnectionPolicy connPlc;
 
+    /** */
+    private boolean enableForcibleNodeKill = IgniteSystemProperties
+        .getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
     /** Server listener. */
     private final GridNioServerListener<Message> srvLsnr =
         new GridNioServerListenerAdapter<Message>() {
@@ -2663,8 +2668,13 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
                                     }
                                 }
                             }
-                            else
+                            else {
                                 U.sleep(200);
+
+                                if (getSpiContext().node(node.id()) == null)
+                                    throw new 
ClusterTopologyCheckedException("Failed to send message " +
+                                        "(node left topology): " + node);
+                            }
                         }
 
                         fut.onDone(client0);
@@ -3088,6 +3098,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                     }
 
                     if (failureDetectionTimeoutEnabled() && (e instanceof 
HandshakeTimeoutException ||
+                        X.hasCause(e, SocketException.class) ||
                         timeoutHelper.checkFailureTimeoutReached(e))) {
 
                         String msg = "Handshake timed out (failure detection 
timeout is reached) " +
@@ -3179,8 +3190,10 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
                         "[addr=" + addr + ", err=" + e.getMessage() + ']', e));
 
                     // Reconnect for the second time, if connection is not 
established.
-                    if (!failureDetThrReached && connectAttempts < 2 &&
-                        (e instanceof ConnectException || X.hasCause(e, 
ConnectException.class))) {
+                    if (!failureDetThrReached && connectAttempts < 5 &&
+                        (X.hasCause(e, ConnectException.class, 
HandshakeException.class, SocketTimeoutException.class))) {
+                        U.sleep(200);
+
                         connectAttempts++;
 
                         continue;
@@ -3203,21 +3216,25 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
                     "operating system firewall is disabled on local and remote 
hosts) " +
                     "[addrs=" + addrs + ']');
 
-            if (getSpiContext().node(node.id()) != null && 
(CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
-                X.hasCause(errs, ConnectException.class, 
SocketTimeoutException.class, HandshakeTimeoutException.class,
-                    IgniteSpiOperationTimeoutException.class)) {
+            if (enableForcibleNodeKill) {
+                if (getSpiContext().node(node.id()) != null && 
(CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
+                    X.hasCause(errs, ConnectException.class, 
HandshakeException.class,
+                        SocketTimeoutException.class, 
HandshakeTimeoutException.class,
+                        IgniteSpiOperationTimeoutException.class)) {
 
-                U.error(log, "TcpCommunicationSpi failed to establish 
connection to node, node will be dropped from " +
-                    "cluster [" +
-                    "rmtNode=" + node + "]", errs);
+                    U.error(log, "TcpCommunicationSpi failed to establish 
connection to node, node will be dropped from " +
+                        "cluster [" +
+                        "rmtNode=" + node + "]", errs);
 
-                getSpiContext().failNode(node.id(), "TcpCommunicationSpi 
failed to establish connection to node [" +
-                    "rmtNode=" + node +
-                    ", errs=" + errs +
-                    ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + 
']');
+                    getSpiContext().failNode(node.id(), "TcpCommunicationSpi 
failed to establish connection to node [" +
+                        "rmtNode=" + node +
+                        ", errs=" + errs +
+                        ", connectErrs=" + 
Arrays.toString(errs.getSuppressed()) + ']');
+                }
             }
 
-            throw errs;
+            if (X.hasCause(errs, ConnectException.class, 
HandshakeException.class))
+                throw errs;
         }
 
         return client;
@@ -3269,7 +3286,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                         sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), 
ch, directBuf, ByteOrder.nativeOrder(), log);
 
                         if (!sslHnd.handshake())
-                            throw new IgniteCheckedException("SSL handshake is 
not completed.");
+                            throw new HandshakeException("SSL handshake is not 
completed.");
 
                         ByteBuffer handBuff = sslHnd.applicationBuffer();
 
@@ -3279,7 +3296,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                             int read = ch.read(buf);
 
                             if (read == -1)
-                                throw new IgniteCheckedException("Failed to 
read remote node ID (connection closed).");
+                                throw new HandshakeException("Failed to read 
remote node ID (connection closed).");
 
                             buf.flip();
 
@@ -3295,7 +3312,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                             int read = ch.read(buf);
 
                             if (read == -1)
-                                throw new IgniteCheckedException("Failed to 
read remote node ID (connection closed).");
+                                throw new HandshakeException("Failed to read 
remote node ID (connection closed).");
 
                             i += read;
                         }
@@ -3304,7 +3321,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                     UUID rmtNodeId0 = U.bytesToUuid(buf.array(), 
Message.DIRECT_TYPE_SIZE);
 
                     if (!rmtNodeId.equals(rmtNodeId0))
-                        throw new IgniteCheckedException("Remote node ID is 
not as expected [expected=" + rmtNodeId +
+                        throw new HandshakeException("Remote node ID is not as 
expected [expected=" + rmtNodeId +
                             ", rcvd=" + rmtNodeId0 + ']');
                     else if (log.isDebugEnabled())
                         log.debug("Received remote node ID: " + rmtNodeId0);
@@ -3391,7 +3408,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                                 int read = ch.read(buf);
 
                                 if (read == -1)
-                                    throw new IgniteCheckedException("Failed 
to read remote node recovery handshake " +
+                                    throw new HandshakeException("Failed to 
read remote node recovery handshake " +
                                         "(connection closed).");
 
                                 buf.flip();
@@ -3429,7 +3446,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                                 int read = ch.read(buf);
 
                                 if (read == -1)
-                                    throw new IgniteCheckedException("Failed 
to read remote node recovery handshake " +
+                                    throw new HandshakeException("Failed to 
read remote node recovery handshake " +
                                         "(connection closed).");
 
                                 i += read;
@@ -3471,8 +3488,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
 
             // Ignoring whatever happened after timeout - reporting only 
timeout event.
             if (!cancelled)
-                throw new HandshakeTimeoutException("Failed to perform 
handshake due to timeout (consider increasing " +
-                    "'connectionTimeout' configuration property).");
+                throw new HandshakeTimeoutException(
+                    new IgniteSpiOperationTimeoutException("Failed to perform 
handshake due to timeout " +
+                        "(consider increasing 'connectionTimeout' 
configuration property)."));
         }
 
         return rcvCnt;
@@ -3662,18 +3680,31 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
     }
 
     /** Internal exception class for proper timeout handling. */
-    private static class HandshakeTimeoutException extends 
IgniteCheckedException {
+    private static class HandshakeException extends IgniteCheckedException {
         /** */
         private static final long serialVersionUID = 0L;
 
         /**
-         * @param msg Message.
+         * @param msg Error message.
          */
-        HandshakeTimeoutException(String msg) {
+        HandshakeException(String msg) {
             super(msg);
         }
     }
 
+    /** Internal exception class for proper timeout handling. */
+    private static class HandshakeTimeoutException extends 
IgniteCheckedException {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param cause Exception cause
+         */
+        HandshakeTimeoutException(IgniteSpiOperationTimeoutException cause) {
+            super(cause);
+        }
+    }
+
     /**
      * This worker takes responsibility to shut the server down when stopping,
      * No other thread shall stop passed server.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 10fa6ad..fa9cc35 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
@@ -152,6 +153,8 @@ public abstract class IgniteClientReconnectAbstractTest 
extends GridCommonAbstra
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
+        
System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, 
"true");
+
         int srvs = serverCount();
 
         if (srvs > 0)
@@ -172,6 +175,8 @@ public abstract class IgniteClientReconnectAbstractTest 
extends GridCommonAbstra
     @Override protected void afterTestsStopped() throws Exception {
         super.afterTestsStopped();
 
+        
System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
         stopAllGrids();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
index ee1b48e..420fdd8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
@@ -26,6 +26,7 @@ import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -89,9 +90,18 @@ public class IgniteBinaryMetadataUpdateNodeRestartTest 
extends GridCommonAbstrac
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        
System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+    }
+
+    /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
         stopAllGrids();
 
+        
System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
         super.afterTestsStopped();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
index 79f15ad..1a9e13a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
@@ -31,6 +31,7 @@ import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -92,6 +93,20 @@ public class IgniteCacheNearRestartRollbackSelfTest extends 
GridCommonAbstractTe
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        
System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        
System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+    }
+
     /**
      * @param igniteInstanceName Ignite instance name.
      * @return Cache configuration.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
index d23e870..8e57387 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
@@ -32,6 +32,7 @@ import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
@@ -106,6 +107,8 @@ public class 
IgniteTxCacheWriteSynchronizationModesMultithreadedTest extends Gri
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
+        
System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+
         startGrids(SRVS);
 
         clientMode = true;
@@ -121,6 +124,8 @@ public class 
IgniteTxCacheWriteSynchronizationModesMultithreadedTest extends Gri
     @Override protected void afterTestsStopped() throws Exception {
         stopAllGrids();
 
+        
System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
         super.afterTestsStopped();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java 
b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java
index d08321e..68d97c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java
@@ -23,6 +23,7 @@ import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -85,6 +86,10 @@ public final class GridTcpForwarder implements AutoCloseable 
{
                                 outputCon.getInputStream(), 
inputCon.getOutputStream()
                             );
 
+                            //Force closing sibling if one of thread failed.
+                            forwardThread1.setUncaughtExceptionHandler(new 
ForwarderExceptionHandler(forwardThread2));
+                            forwardThread2.setUncaughtExceptionHandler(new 
ForwarderExceptionHandler(forwardThread1));
+
                             forwardThread1.start();
                             forwardThread2.start();
 
@@ -128,6 +133,25 @@ public final class GridTcpForwarder implements 
AutoCloseable {
     }
 
     /**
+     *
+     */
+    private static class ForwarderExceptionHandler implements 
Thread.UncaughtExceptionHandler {
+        /** */
+        private Thread siblingThread;
+
+        /** */
+        public ForwarderExceptionHandler(Thread siblingThread) {
+
+            this.siblingThread = siblingThread;
+        }
+
+        /** */
+        @Override public void uncaughtException(Thread t, Throwable e) {
+            siblingThread.interrupt();
+        }
+    }
+
+    /**
      * Thread reads data from input stream and write to output stream.
      */
     private class ForwardThread extends Thread {
@@ -166,6 +190,8 @@ public final class GridTcpForwarder implements 
AutoCloseable {
             }
             catch (IOException e) {
                 log.error("IOException while forwarding data [threadName=" + 
getName() + "]", e);
+
+                throw new IgniteException(e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
index ddebd40..e215a34 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
@@ -82,6 +83,20 @@ public class TcpCommunicationSpiDropNodesTest extends 
GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        
System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        
System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+    }
+
+    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
index b4d6cbc..bead697 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
@@ -18,18 +18,16 @@
 package org.apache.ignite.spi.communication.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
-import java.net.Socket;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
@@ -44,8 +42,6 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
-import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -95,6 +91,20 @@ public class TcpCommunicationSpiFaultyClientTest extends 
GridCommonAbstractTest
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        
System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        
System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+    }
+
+    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 

Reply via email to