This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 3db1761 IGNITE-13465 Connection recovery timeout is fixed when recovery protocol is executed. - Fixes #8262. 3db1761 is described below commit 3db17612d7c433ddfbb404f92eebca6dd2f4fefe Author: Vladimir Steshin <vlads...@gmail.com> AuthorDate: Mon Oct 26 09:40:19 2020 +0300 IGNITE-13465 Connection recovery timeout is fixed when recovery protocol is executed. - Fixes #8262. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../spi/IgniteSpiOperationTimeoutHelper.java | 86 ++++++++--------- .../ignite/spi/discovery/tcp/ServerImpl.java | 103 ++++++++++++++++----- .../tcp/internal/TcpDiscoveryNodesRing.java | 11 +++ ...cheContinuousQueryFailoverAbstractSelfTest.java | 2 +- .../tcp/TcpDiscoveryNetworkIssuesTest.java | 91 +++++++++++++++++- 5 files changed, 216 insertions(+), 77 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java index 48161ba..96fea9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java @@ -30,20 +30,11 @@ import org.apache.ignite.internal.util.typedef.internal.U; * */ public class IgniteSpiOperationTimeoutHelper { - // https://issues.apache.org/jira/browse/IGNITE-11221 - // We need to reuse new logic ExponentialBackoffTimeout logic in TcpDiscovery instead of this class. + /** Flag whether to use timeout. */ + private final boolean timeoutEnabled; - /** */ - private long lastOperStartNanos; - - /** */ - private long timeout; - - /** */ - private final boolean failureDetectionTimeoutEnabled; - - /** */ - private final long failureDetectionTimeout; + /** Time in nanos which cannot be reached for current operation. */ + private final long timeoutThreshold; /** * Constructor. @@ -52,9 +43,7 @@ public class IgniteSpiOperationTimeoutHelper { * @param srvOp {@code True} if communicates with server node. */ public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp) { - failureDetectionTimeoutEnabled = adapter.failureDetectionTimeoutEnabled(); - failureDetectionTimeout = srvOp ? adapter.failureDetectionTimeout() : - adapter.clientFailureDetectionTimeout(); + this(adapter, srvOp, -1, -1); } /** @@ -62,15 +51,26 @@ public class IgniteSpiOperationTimeoutHelper { * * @param adapter SPI adapter. * @param srvOp {@code True} if communicates with server node. - * @param lastOperStartNanos Time of last related operation in nanos. + * @param lastRelatedOperationTime Time of last related operation in nanos. Ignored if negative, 0 or + * {@code adapter.failureDetectionTimeoutEnabled()} is false. + * @param absoluteThreshold Absolute time threshold (nanos) which must not be reached. Ignored if negative or 0. */ - public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp, long lastOperStartNanos) { - this(adapter, srvOp); + public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp, long lastRelatedOperationTime, + long absoluteThreshold) { + timeoutEnabled = adapter.failureDetectionTimeoutEnabled(); - this.lastOperStartNanos = lastOperStartNanos; + if (timeoutEnabled) { + long timeout = (lastRelatedOperationTime > 0 ? lastRelatedOperationTime : System.nanoTime()) + + U.millisToNanos(srvOp ? adapter.failureDetectionTimeout() : adapter.clientFailureDetectionTimeout()); - if (lastOperStartNanos > 0) - timeout = failureDetectionTimeout; + if (absoluteThreshold > 0 && timeout > absoluteThreshold) + timeout = absoluteThreshold; + + timeoutThreshold = timeout; + } else { + // Save absolute threshold if it is set. + timeoutThreshold = absoluteThreshold > 0 ? absoluteThreshold : 0; + } } /** @@ -85,42 +85,32 @@ public class IgniteSpiOperationTimeoutHelper { * this {@code IgniteSpiOperationTimeoutController}. */ public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException { - if (!failureDetectionTimeoutEnabled) - return dfltTimeout; + long now = System.nanoTime(); - if (lastOperStartNanos == 0) { - timeout = failureDetectionTimeout; - lastOperStartNanos = System.nanoTime(); - } + long left; + + if (timeoutEnabled) + left = timeoutThreshold - now; else { - long curNanos = System.nanoTime(); + left = U.millisToNanos(dfltTimeout); - timeout -= U.nanosToMillis(curNanos - lastOperStartNanos); + if (timeoutThreshold > 0 && now + left >= timeoutThreshold) + left = timeoutThreshold - now; + } - lastOperStartNanos = curNanos; + if (left <= 0) + throw new IgniteSpiOperationTimeoutException("Network operation timed out."); - if (timeout <= 0) - throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " + - "'failureDetectionTimeout' configuration property [failureDetectionTimeout=" - + failureDetectionTimeout + ']'); - } - - return timeout; + return U.nanosToMillis(left); } /** - * Checks whether the given {@link Exception} is generated because failure detection timeout has been reached. + * Checks whether the given {@link Exception} is a timeout. * - * @param e Exception. - * @return {@code true} if failure detection timeout is reached, {@code false} otherwise. + * @param e Exception to check. + * @return {@code True} if given exception is a timeout. {@code False} otherwise. */ public boolean checkFailureTimeoutReached(Exception e) { - if (!failureDetectionTimeoutEnabled) - return false; - - if (X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketTimeoutException.class, SocketException.class)) - return true; - - return (timeout - U.millisSinceNanos(lastOperStartNanos) <= 0); + return X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketTimeoutException.class, SocketException.class); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 9d5e3ca..d0c8e8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -215,6 +215,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** Maximal interval of connection check to next node in the ring. */ private static final long MAX_CON_CHECK_INTERVAL = 500; + /** Minimal timeout to find connection to some next node in the ring while connection recovering. */ + private static final long MIN_RECOVERY_TIMEOUT = 100; + /** Interval of checking connection to next node in the ring. */ private long connCheckInterval; @@ -922,7 +925,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (!openedSock && reconCnt == 2) break; - if (timeoutHelper.checkFailureTimeoutReached(e)) + if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e)) break; else if (!spi.failureDetectionTimeoutEnabled() && reconCnt == spi.getReconnectCount()) break; @@ -1576,7 +1579,7 @@ class ServerImpl extends TcpDiscoveryImpl { break; } - if (timeoutHelper.checkFailureTimeoutReached(e)) + if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e)) break; if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount()) @@ -3477,8 +3480,12 @@ class ServerImpl extends TcpDiscoveryImpl { while (true) { if (sock == null) { - if (timeoutHelper == null) - timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); + // We re-create the helper here because it could be created earlier with wrong timeout on + // message sending like IgniteConfiguration.failureDetectionTimeout. Here we are in the + // state of conenction recovering and have to work with + // TcpDiscoverSpi.getEffectiveConnectionRecoveryTimeout() + if (timeoutHelper == null || sndState != null) + timeoutHelper = serverOperationTimeoutHelper(sndState, -1); boolean success = false; @@ -3486,8 +3493,6 @@ class ServerImpl extends TcpDiscoveryImpl { // Restore ring. try { - long tsNanos = System.nanoTime(); - sock = spi.openSocket(addr, timeoutHelper); out = spi.socketStream(sock); @@ -3517,6 +3522,8 @@ class ServerImpl extends TcpDiscoveryImpl { // We should take previousNodeAlive flag into account only if we received the response from the correct node. if (res.creatorNodeId().equals(next.id()) && res.previousNodeAlive() && sndState != null) { + sndState.checkTimeout(); + // Remote node checked connection to it's previous and got success. boolean previousNode = sndState.markLastFailedNodeAlive(); @@ -3624,13 +3631,20 @@ class ServerImpl extends TcpDiscoveryImpl { onException("Failed to connect to next node [msg=" + msg + ", err=" + e + ']', e); + // Fastens failure detection. + if (sndState != null && sndState.checkTimeout()) { + segmentLocalNodeOnSendFail(failedNodes); + + return; // Nothing to do here. + } + if (!openSock) break; // Don't retry if we can not establish connection. if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount()) break; - if (timeoutHelper.checkFailureTimeoutReached(e)) + if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e)) break; else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) { @@ -3689,10 +3703,8 @@ class ServerImpl extends TcpDiscoveryImpl { addFailedNodes(pendingMsg, failedNodes); - if (timeoutHelper == null) { - timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true, - lastRingMsgSentTime); - } + if (timeoutHelper == null) + timeoutHelper = serverOperationTimeoutHelper(sndState, lastRingMsgSentTime); try { spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk( @@ -3736,7 +3748,7 @@ class ServerImpl extends TcpDiscoveryImpl { long tsNanos = System.nanoTime(); if (timeoutHelper == null) - timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true, lastRingMsgSentTime); + timeoutHelper = serverOperationTimeoutHelper(sndState, lastRingMsgSentTime); addFailedNodes(msg, failedNodes); @@ -3803,7 +3815,7 @@ class ServerImpl extends TcpDiscoveryImpl { onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']', e); - if (timeoutHelper.checkFailureTimeoutReached(e)) + if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e)) break; if (!spi.failureDetectionTimeoutEnabled()) { @@ -3840,6 +3852,11 @@ class ServerImpl extends TcpDiscoveryImpl { if (!sent) { if (sndState == null && spi.getEffectiveConnectionRecoveryTimeout() > 0) sndState = new CrossRingMessageSendState(); + else if (sndState != null && sndState.checkTimeout()) { + segmentLocalNodeOnSendFail(failedNodes); + + return; // Nothing to do here. + } boolean failedNextNode = sndState == null || sndState.markNextNodeFailed(); @@ -3873,12 +3890,6 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (sndState != null && sndState.isFailed()) { - segmentLocalNodeOnSendFail(failedNodes); - - return; // Nothing to do here. - } - next = null; errs = null; @@ -6501,6 +6512,38 @@ class ServerImpl extends TcpDiscoveryImpl { } } + /** + * Creates proper timeout helper taking in account current send state and ring state. + * + * @param sndState Current connection recovering state. Ignored if {@code null}. + * @param lastOperationNanos Time of last related operation. Ignored if negative or 0. + * @return Timeout helper. + */ + private IgniteSpiOperationTimeoutHelper serverOperationTimeoutHelper(@Nullable CrossRingMessageSendState sndState, + long lastOperationNanos) { + long absoluteThreshold = -1; + + // Active send-state means we lost connection to next node and have to find another. + // We don't know how many nodes failed. May be several failed in a row. But we got only one + // connectionRecoveryTimeout to establish new connection to the ring. We can't spend this timeout wholly on one + // or two next nodes. We should slice it and try to travers as many as we can. + if (sndState != null) { + int nodesLeft = ring.serverNodes().size() - 1 - sndState.failedNodes; + + assert nodesLeft > 0; + + long now = System.nanoTime(); + + // In case of large cluster and small connectionRecoveryTimeout we have to provide reasonable minimal + // timeout per one of the next nodes. It should not appear too small like 1, 5 or 10ms. + long perNodeTimeout = Math.max((sndState.failTimeNanos - now) / nodesLeft, MIN_RECOVERY_TIMEOUT); + + absoluteThreshold = Math.min(sndState.failTimeNanos, now + perNodeTimeout); + } + + return new IgniteSpiOperationTimeoutHelper(spi, true, lastOperationNanos, absoluteThreshold); + } + /** Fixates time of last sent message. */ private void updateLastSentMessageTime() { lastRingMsgSentTime = System.nanoTime(); @@ -8197,6 +8240,22 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Checks if message sending has completely failed due to the timeout. Sets {@code RingMessageSendState#FAILED} + * if the timeout is reached. + * + * @return {@code True} if passed timeout is reached. {@code False} otherwise. + */ + boolean checkTimeout() { + if (System.nanoTime() >= failTimeNanos) { + state = RingMessageSendState.FAILED; + + return true; + } + + return false; + } + + /** * Marks last failed node as alive. * * @return {@code False} if all failed nodes marked as alive or incorrect state. @@ -8208,12 +8267,6 @@ class ServerImpl extends TcpDiscoveryImpl { if (--failedNodes <= 0) { failedNodes = 0; - if (System.nanoTime() - failTimeNanos >= 0) { - state = RingMessageSendState.FAILED; - - return false; - } - state = RingMessageSendState.STARTING_POINT; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java index fdb9997..aaa3165 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java @@ -173,6 +173,17 @@ public class TcpDiscoveryNodesRing { } /** + * @return Server nodes. + */ + public Collection<TcpDiscoveryNode> serverNodes() { + return nodes(new PN() { + @Override public boolean apply(ClusterNode node) { + return ((TcpDiscoveryNode)node).clientRouterNodeId() == null; + } + }); + } + + /** * Checks whether the topology has remote nodes in. * * @return {@code true} if the topology has remote nodes in. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 3d6a99e..5b872ca 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -745,7 +745,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /* client node */) - 1 /* Primary node */ - backups; } - }, 5000L); + }, getConfiguration("").getFailureDetectionTimeout() * 2); awaitPartitionMapExchange(); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index dfc15e0..a751ac4 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -22,20 +22,31 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import org.apache.ignite.Ignite; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; +import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper; +import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; + /** * */ @@ -88,6 +99,12 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest { /** */ private int connectionRecoveryTimeout = -1; + /** */ + private int failureDetectionTimeout = 2_000; + + /** */ + private final GridConcurrentHashSet<Integer> segmentedNodes = new GridConcurrentHashSet<>(); + /** {@inheritDoc} */ @Override protected void afterTest() { stopAllGrids(); @@ -107,10 +124,14 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest { if (connectionRecoveryTimeout >= 0) spi.setConnectionRecoveryTimeout(connectionRecoveryTimeout); - cfg.setFailureDetectionTimeout(2_000); + cfg.setFailureDetectionTimeout(failureDetectionTimeout); cfg.setDiscoverySpi(spi); + cfg.setIncludeEventTypes(EVT_NODE_SEGMENTED); + + cfg.setSystemWorkerBlockedTimeout(10_000); + return cfg; } @@ -162,7 +183,7 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest { illNodeSegmented.set(true); return false; - }, EventType.EVT_NODE_SEGMENTED); + }, EVT_NODE_SEGMENTED); specialSpi = null; @@ -187,6 +208,51 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest { } /** + * Ensures sequential failure of two nodes has no additional issues. + */ + @Test + public void testFailTwoNodes() throws Exception { + failureDetectionTimeout = 1000; + + startGrids(5); + + awaitPartitionMapExchange(); + + final CountDownLatch failLatch = new CountDownLatch(2); + + for (int i = 0; i < 5; i++) { + ignite(i).events().localListen(evt -> { + failLatch.countDown(); + + return true; + }, EVT_NODE_FAILED); + + int nodeIdx = i; + + ignite(i).events().localListen(evt -> { + segmentedNodes.add(nodeIdx); + + return true; + }, EVT_NODE_SEGMENTED); + } + + processNetworkThreads(ignite(2), t -> t.suspend()); + processNetworkThreads(ignite(3), t -> t.suspend()); + + try { + failLatch.await(10, TimeUnit.SECONDS); + } + finally { + processNetworkThreads(ignite(2), t -> t.resume()); + processNetworkThreads(ignite(3), t -> t.resume()); + } + + assertFalse(segmentedNodes.contains(0)); + assertFalse(segmentedNodes.contains(1)); + assertFalse(segmentedNodes.contains(4)); + } + + /** * @param ig Ignite instance to get failedNodes collection from. */ private Map getFailedNodesCollection(IgniteEx ig) { @@ -211,4 +277,23 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest { out.close(); } + + /** + * Simulates network failure on certain node. + */ + private void processNetworkThreads(Ignite ignite, Consumer<Thread> proc) { + DiscoverySpi disco = ignite.configuration().getDiscoverySpi(); + + ServerImpl serverImpl = U.field(disco, "impl"); + + for (Thread thread : serverImpl.threads()) + proc.accept(thread); + + CommunicationSpi<?> comm = ignite.configuration().getCommunicationSpi(); + + GridNioServerWrapper nioServerWrapper = U.field(comm, "nioSrvWrapper"); + + for (GridWorker worker : nioServerWrapper.nio().workers()) + proc.accept(worker.runner()); + } }