This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3db1761  IGNITE-13465 Connection recovery timeout is fixed when 
recovery protocol is executed. - Fixes #8262.
3db1761 is described below

commit 3db17612d7c433ddfbb404f92eebca6dd2f4fefe
Author: Vladimir Steshin <vlads...@gmail.com>
AuthorDate: Mon Oct 26 09:40:19 2020 +0300

    IGNITE-13465 Connection recovery timeout is fixed when recovery protocol is 
executed. - Fixes #8262.
    
    Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com>
---
 .../spi/IgniteSpiOperationTimeoutHelper.java       |  86 ++++++++---------
 .../ignite/spi/discovery/tcp/ServerImpl.java       | 103 ++++++++++++++++-----
 .../tcp/internal/TcpDiscoveryNodesRing.java        |  11 +++
 ...cheContinuousQueryFailoverAbstractSelfTest.java |   2 +-
 .../tcp/TcpDiscoveryNetworkIssuesTest.java         |  91 +++++++++++++++++-
 5 files changed, 216 insertions(+), 77 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
index 48161ba..96fea9e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -30,20 +30,11 @@ import org.apache.ignite.internal.util.typedef.internal.U;
  *
  */
 public class IgniteSpiOperationTimeoutHelper {
-    // https://issues.apache.org/jira/browse/IGNITE-11221
-    // We need to reuse new logic ExponentialBackoffTimeout logic in 
TcpDiscovery instead of this class.
+    /** Flag whether to use timeout. */
+    private final boolean timeoutEnabled;
 
-    /** */
-    private long lastOperStartNanos;
-
-    /** */
-    private long timeout;
-
-    /** */
-    private final boolean failureDetectionTimeoutEnabled;
-
-    /** */
-    private final long failureDetectionTimeout;
+    /** Time in nanos which cannot be reached for current operation. */
+    private final long timeoutThreshold;
 
     /**
      * Constructor.
@@ -52,9 +43,7 @@ public class IgniteSpiOperationTimeoutHelper {
      * @param srvOp {@code True} if communicates with server node.
      */
     public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean 
srvOp) {
-        failureDetectionTimeoutEnabled = 
adapter.failureDetectionTimeoutEnabled();
-        failureDetectionTimeout = srvOp ? adapter.failureDetectionTimeout() :
-            adapter.clientFailureDetectionTimeout();
+        this(adapter, srvOp, -1, -1);
     }
 
     /**
@@ -62,15 +51,26 @@ public class IgniteSpiOperationTimeoutHelper {
      *
      * @param adapter SPI adapter.
      * @param srvOp {@code True} if communicates with server node.
-     * @param lastOperStartNanos Time of last related operation in nanos.
+     * @param lastRelatedOperationTime Time of last related operation in 
nanos. Ignored if negative, 0 or
+     * {@code adapter.failureDetectionTimeoutEnabled()} is false.
+     * @param absoluteThreshold Absolute time threshold (nanos) which must not 
be reached. Ignored if negative or 0.
      */
-    public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean 
srvOp, long lastOperStartNanos) {
-        this(adapter, srvOp);
+    public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean 
srvOp, long lastRelatedOperationTime,
+        long absoluteThreshold) {
+        timeoutEnabled = adapter.failureDetectionTimeoutEnabled();
 
-        this.lastOperStartNanos = lastOperStartNanos;
+        if (timeoutEnabled) {
+            long timeout = (lastRelatedOperationTime > 0 ? 
lastRelatedOperationTime : System.nanoTime()) +
+                U.millisToNanos(srvOp ? adapter.failureDetectionTimeout() : 
adapter.clientFailureDetectionTimeout());
 
-        if (lastOperStartNanos > 0)
-            timeout = failureDetectionTimeout;
+            if (absoluteThreshold > 0 && timeout > absoluteThreshold)
+                timeout = absoluteThreshold;
+
+            timeoutThreshold = timeout;
+        } else {
+            // Save absolute threshold if it is set.
+            timeoutThreshold = absoluteThreshold > 0 ? absoluteThreshold : 0;
+        }
     }
 
     /**
@@ -85,42 +85,32 @@ public class IgniteSpiOperationTimeoutHelper {
      * this {@code IgniteSpiOperationTimeoutController}.
      */
     public long nextTimeoutChunk(long dfltTimeout) throws 
IgniteSpiOperationTimeoutException {
-        if (!failureDetectionTimeoutEnabled)
-            return dfltTimeout;
+        long now = System.nanoTime();
 
-        if (lastOperStartNanos == 0) {
-            timeout = failureDetectionTimeout;
-            lastOperStartNanos = System.nanoTime();
-        }
+        long left;
+
+        if (timeoutEnabled)
+            left = timeoutThreshold - now;
         else {
-            long curNanos = System.nanoTime();
+            left = U.millisToNanos(dfltTimeout);
 
-            timeout -= U.nanosToMillis(curNanos - lastOperStartNanos);
+            if (timeoutThreshold > 0 && now + left >= timeoutThreshold)
+                left = timeoutThreshold - now;
+        }
 
-            lastOperStartNanos = curNanos;
+        if (left <= 0)
+            throw new IgniteSpiOperationTimeoutException("Network operation 
timed out.");
 
-            if (timeout <= 0)
-                throw new IgniteSpiOperationTimeoutException("Network 
operation timed out. Increase " +
-                    "'failureDetectionTimeout' configuration property 
[failureDetectionTimeout="
-                    + failureDetectionTimeout + ']');
-        }
-        
-        return timeout;
+        return U.nanosToMillis(left);
     }
 
     /**
-     * Checks whether the given {@link Exception} is generated because failure 
detection timeout has been reached.
+     * Checks whether the given {@link Exception} is a timeout.
      *
-     * @param e Exception.
-     * @return {@code true} if failure detection timeout is reached, {@code 
false} otherwise.
+     * @param e Exception to check.
+     * @return {@code True} if given exception is a timeout. {@code False} 
otherwise.
      */
     public boolean checkFailureTimeoutReached(Exception e) {
-        if (!failureDetectionTimeoutEnabled)
-            return false;
-
-        if (X.hasCause(e, IgniteSpiOperationTimeoutException.class, 
SocketTimeoutException.class, SocketException.class))
-            return true;
-
-        return (timeout - U.millisSinceNanos(lastOperStartNanos) <= 0);
+        return X.hasCause(e, IgniteSpiOperationTimeoutException.class, 
SocketTimeoutException.class, SocketException.class);
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9d5e3ca..d0c8e8e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -215,6 +215,9 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** Maximal interval of connection check to next node in the ring. */
     private static final long MAX_CON_CHECK_INTERVAL = 500;
 
+    /** Minimal timeout to find connection to some next node in the ring while 
connection recovering. */
+    private static final long MIN_RECOVERY_TIMEOUT = 100;
+
     /** Interval of checking connection to next node in the ring. */
     private long connCheckInterval;
 
@@ -922,7 +925,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (!openedSock && reconCnt == 2)
                             break;
 
-                        if (timeoutHelper.checkFailureTimeoutReached(e))
+                        if (spi.failureDetectionTimeoutEnabled() && 
timeoutHelper.checkFailureTimeoutReached(e))
                             break;
                         else if (!spi.failureDetectionTimeoutEnabled() && 
reconCnt == spi.getReconnectCount())
                             break;
@@ -1576,7 +1579,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     break;
                 }
 
-                if (timeoutHelper.checkFailureTimeoutReached(e))
+                if (spi.failureDetectionTimeoutEnabled() && 
timeoutHelper.checkFailureTimeoutReached(e))
                     break;
 
                 if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == 
spi.getReconnectCount())
@@ -3477,8 +3480,12 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     while (true) {
                         if (sock == null) {
-                            if (timeoutHelper == null)
-                                timeoutHelper = new 
IgniteSpiOperationTimeoutHelper(spi, true);
+                            // We re-create the helper here because it could 
be created earlier with wrong timeout on
+                            // message sending like 
IgniteConfiguration.failureDetectionTimeout. Here we are in the
+                            // state of conenction recovering and have to work 
with
+                            // 
TcpDiscoverSpi.getEffectiveConnectionRecoveryTimeout()
+                            if (timeoutHelper == null || sndState != null)
+                                timeoutHelper = 
serverOperationTimeoutHelper(sndState, -1);
 
                             boolean success = false;
 
@@ -3486,8 +3493,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             // Restore ring.
                             try {
-                                long tsNanos = System.nanoTime();
-
                                 sock = spi.openSocket(addr, timeoutHelper);
 
                                 out = spi.socketStream(sock);
@@ -3517,6 +3522,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 // We should take previousNodeAlive flag into 
account only if we received the response from the correct node.
                                 if (res.creatorNodeId().equals(next.id()) && 
res.previousNodeAlive() && sndState != null) {
+                                    sndState.checkTimeout();
+
                                     // Remote node checked connection to it's 
previous and got success.
                                     boolean previousNode = 
sndState.markLastFailedNodeAlive();
 
@@ -3624,13 +3631,20 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 onException("Failed to connect to next node 
[msg=" + msg + ", err=" + e + ']', e);
 
+                                // Fastens failure detection.
+                                if (sndState != null && 
sndState.checkTimeout()) {
+                                    segmentLocalNodeOnSendFail(failedNodes);
+
+                                    return; // Nothing to do here.
+                                }
+
                                 if (!openSock)
                                     break; // Don't retry if we can not 
establish connection.
 
                                 if (!spi.failureDetectionTimeoutEnabled() && 
++reconCnt == spi.getReconnectCount())
                                     break;
 
-                                if 
(timeoutHelper.checkFailureTimeoutReached(e))
+                                if (spi.failureDetectionTimeoutEnabled() && 
timeoutHelper.checkFailureTimeoutReached(e))
                                     break;
                                 else if (!spi.failureDetectionTimeoutEnabled() 
&& (e instanceof
                                     SocketTimeoutException || X.hasCause(e, 
SocketTimeoutException.class))) {
@@ -3689,10 +3703,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                     addFailedNodes(pendingMsg, failedNodes);
 
-                                    if (timeoutHelper == null) {
-                                        timeoutHelper = new 
IgniteSpiOperationTimeoutHelper(spi, true,
-                                            lastRingMsgSentTime);
-                                    }
+                                    if (timeoutHelper == null)
+                                        timeoutHelper = 
serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
 
                                     try {
                                         spi.writeToSocket(sock, out, 
pendingMsg, timeoutHelper.nextTimeoutChunk(
@@ -3736,7 +3748,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 long tsNanos = System.nanoTime();
 
                                 if (timeoutHelper == null)
-                                    timeoutHelper = new 
IgniteSpiOperationTimeoutHelper(spi, true, lastRingMsgSentTime);
+                                    timeoutHelper = 
serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
 
                                 addFailedNodes(msg, failedNodes);
 
@@ -3803,7 +3815,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             onException("Failed to send message to next node 
[next=" + next.id() + ", msg=" + msg + ']',
                                 e);
 
-                            if (timeoutHelper.checkFailureTimeoutReached(e))
+                            if (spi.failureDetectionTimeoutEnabled() && 
timeoutHelper.checkFailureTimeoutReached(e))
                                 break;
 
                             if (!spi.failureDetectionTimeoutEnabled()) {
@@ -3840,6 +3852,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (!sent) {
                     if (sndState == null && 
spi.getEffectiveConnectionRecoveryTimeout() > 0)
                         sndState = new CrossRingMessageSendState();
+                    else if (sndState != null && sndState.checkTimeout()) {
+                        segmentLocalNodeOnSendFail(failedNodes);
+
+                        return; // Nothing to do here.
+                    }
 
                     boolean failedNextNode = sndState == null || 
sndState.markNextNodeFailed();
 
@@ -3873,12 +3890,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                     }
 
-                    if (sndState != null && sndState.isFailed()) {
-                        segmentLocalNodeOnSendFail(failedNodes);
-
-                        return; // Nothing to do here.
-                    }
-
                     next = null;
 
                     errs = null;
@@ -6501,6 +6512,38 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
     }
 
+    /**
+     * Creates proper timeout helper taking in account current send state and 
ring state.
+     *
+     * @param sndState Current connection recovering state. Ignored if {@code 
null}.
+     * @param lastOperationNanos Time of last related operation. Ignored if 
negative or 0.
+     * @return Timeout helper.
+     */
+    private IgniteSpiOperationTimeoutHelper 
serverOperationTimeoutHelper(@Nullable CrossRingMessageSendState sndState,
+        long lastOperationNanos) {
+        long absoluteThreshold = -1;
+
+        // Active send-state means we lost connection to next node and have to 
find another.
+        // We don't know how many nodes failed. May be several failed in a 
row. But we got only one
+        // connectionRecoveryTimeout to establish new connection to the ring. 
We can't spend this timeout wholly on one
+        // or two next nodes. We should slice it and try to travers as many as 
we can.
+        if (sndState != null) {
+            int nodesLeft = ring.serverNodes().size() - 1 - 
sndState.failedNodes;
+
+            assert nodesLeft > 0;
+
+            long now = System.nanoTime();
+
+            // In case of large cluster and small connectionRecoveryTimeout we 
have to provide reasonable minimal
+            // timeout per one of the next nodes. It should not appear too 
small like 1, 5 or 10ms.
+            long perNodeTimeout = Math.max((sndState.failTimeNanos - now) / 
nodesLeft, MIN_RECOVERY_TIMEOUT);
+
+            absoluteThreshold = Math.min(sndState.failTimeNanos, now + 
perNodeTimeout);
+        }
+
+        return new IgniteSpiOperationTimeoutHelper(spi, true, 
lastOperationNanos, absoluteThreshold);
+    }
+
     /** Fixates time of last sent message. */
     private void updateLastSentMessageTime() {
         lastRingMsgSentTime = System.nanoTime();
@@ -8197,6 +8240,22 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * Checks if message sending has completely failed due to the timeout. 
Sets {@code RingMessageSendState#FAILED}
+         * if the timeout is reached.
+         *
+         * @return {@code True} if passed timeout is reached. {@code False} 
otherwise.
+         */
+        boolean checkTimeout() {
+            if (System.nanoTime() >= failTimeNanos) {
+                state = RingMessageSendState.FAILED;
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
          * Marks last failed node as alive.
          *
          * @return {@code False} if all failed nodes marked as alive or 
incorrect state.
@@ -8208,12 +8267,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (--failedNodes <= 0) {
                     failedNodes = 0;
 
-                    if (System.nanoTime() - failTimeNanos >= 0) {
-                        state = RingMessageSendState.FAILED;
-
-                        return false;
-                    }
-
                     state = RingMessageSendState.STARTING_POINT;
                 }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index fdb9997..aaa3165 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -173,6 +173,17 @@ public class TcpDiscoveryNodesRing {
     }
 
     /**
+     * @return Server nodes.
+     */
+    public Collection<TcpDiscoveryNode> serverNodes() {
+        return nodes(new PN() {
+            @Override public boolean apply(ClusterNode node) {
+                return ((TcpDiscoveryNode)node).clientRouterNodeId() == null;
+            }
+        });
+    }
+
+    /**
      * Checks whether the topology has remote nodes in.
      *
      * @return {@code true} if the topology has remote nodes in.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 3d6a99e..5b872ca 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -745,7 +745,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                 return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /* 
client node */)
                     - 1 /* Primary node */ - backups;
             }
-        }, 5000L);
+        }, getConfiguration("").getFailureDetectionTimeout() * 2);
 
         awaitPartitionMapExchange();
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
index dfc15e0..a751ac4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
@@ -22,20 +22,31 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+
 /**
  *
  */
@@ -88,6 +99,12 @@ public class TcpDiscoveryNetworkIssuesTest extends 
GridCommonAbstractTest {
     /** */
     private int connectionRecoveryTimeout = -1;
 
+    /** */
+    private int failureDetectionTimeout = 2_000;
+
+    /** */
+    private final GridConcurrentHashSet<Integer> segmentedNodes = new 
GridConcurrentHashSet<>();
+
     /** {@inheritDoc} */
     @Override protected void afterTest() {
         stopAllGrids();
@@ -107,10 +124,14 @@ public class TcpDiscoveryNetworkIssuesTest extends 
GridCommonAbstractTest {
         if (connectionRecoveryTimeout >= 0)
             spi.setConnectionRecoveryTimeout(connectionRecoveryTimeout);
 
-        cfg.setFailureDetectionTimeout(2_000);
+        cfg.setFailureDetectionTimeout(failureDetectionTimeout);
 
         cfg.setDiscoverySpi(spi);
 
+        cfg.setIncludeEventTypes(EVT_NODE_SEGMENTED);
+
+        cfg.setSystemWorkerBlockedTimeout(10_000);
+
         return cfg;
     }
 
@@ -162,7 +183,7 @@ public class TcpDiscoveryNetworkIssuesTest extends 
GridCommonAbstractTest {
             illNodeSegmented.set(true);
 
             return false;
-            }, EventType.EVT_NODE_SEGMENTED);
+            }, EVT_NODE_SEGMENTED);
 
         specialSpi = null;
 
@@ -187,6 +208,51 @@ public class TcpDiscoveryNetworkIssuesTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Ensures sequential failure of two nodes has no additional issues.
+     */
+    @Test
+    public void testFailTwoNodes() throws Exception {
+        failureDetectionTimeout = 1000;
+
+        startGrids(5);
+
+        awaitPartitionMapExchange();
+
+        final CountDownLatch failLatch = new CountDownLatch(2);
+
+        for (int i = 0; i < 5; i++) {
+            ignite(i).events().localListen(evt -> {
+                failLatch.countDown();
+
+                return true;
+            }, EVT_NODE_FAILED);
+
+            int nodeIdx = i;
+
+            ignite(i).events().localListen(evt -> {
+                segmentedNodes.add(nodeIdx);
+
+                return true;
+            }, EVT_NODE_SEGMENTED);
+        }
+
+        processNetworkThreads(ignite(2), t -> t.suspend());
+        processNetworkThreads(ignite(3), t -> t.suspend());
+
+        try {
+            failLatch.await(10, TimeUnit.SECONDS);
+        }
+        finally {
+            processNetworkThreads(ignite(2), t -> t.resume());
+            processNetworkThreads(ignite(3), t -> t.resume());
+        }
+
+        assertFalse(segmentedNodes.contains(0));
+        assertFalse(segmentedNodes.contains(1));
+        assertFalse(segmentedNodes.contains(4));
+    }
+
+    /**
      * @param ig Ignite instance to get failedNodes collection from.
      */
     private Map getFailedNodesCollection(IgniteEx ig) {
@@ -211,4 +277,23 @@ public class TcpDiscoveryNetworkIssuesTest extends 
GridCommonAbstractTest {
 
         out.close();
     }
+
+    /**
+     * Simulates network failure on certain node.
+     */
+    private void processNetworkThreads(Ignite ignite, Consumer<Thread> proc) {
+        DiscoverySpi disco = ignite.configuration().getDiscoverySpi();
+
+        ServerImpl serverImpl = U.field(disco, "impl");
+
+        for (Thread thread : serverImpl.threads())
+            proc.accept(thread);
+
+        CommunicationSpi<?> comm = 
ignite.configuration().getCommunicationSpi();
+
+        GridNioServerWrapper nioServerWrapper = U.field(comm, "nioSrvWrapper");
+
+        for (GridWorker worker : nioServerWrapper.nio().workers())
+            proc.accept(worker.runner());
+    }
 }

Reply via email to