This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new e99bdba IGNITE-14231 IGNITE_ENABLE_FORCIBLE_NODE_KILL flag support in inverse connection protocol - Fixes #8826. e99bdba is described below commit e99bdbab4f906b5b4b40ee0aa60e2da18e6443af Author: Sergey Chugunov <sergey.chugu...@gmail.com> AuthorDate: Thu Mar 4 17:30:02 2021 +0300 IGNITE-14231 IGNITE_ENABLE_FORCIBLE_NODE_KILL flag support in inverse connection protocol - Fixes #8826. Signed-off-by: Ivan Bessonov <bessonov...@gmail.com> --- .../tcp/internal/CommunicationTcpUtils.java | 40 ++++++++++++- .../tcp/internal/ConnectionClientPool.java | 20 +++++-- .../tcp/internal/GridNioServerWrapper.java | 39 +++++-------- ...unicationInverseConnectionEstablishingTest.java | 67 +++++++++++++++++++--- 4 files changed, 128 insertions(+), 38 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationTcpUtils.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationTcpUtils.java index d0855d0..68dd6bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationTcpUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationTcpUtils.java @@ -28,11 +28,14 @@ import java.util.List; import java.util.Set; import java.util.function.Supplier; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.spi.IgniteSpiContext; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.communication.tcp.AttributeNames; @@ -43,6 +46,10 @@ public class CommunicationTcpUtils { /** No-op runnable. */ public static final IgniteRunnable NOOP = () -> {}; + /** */ + private static final boolean THROUBLESHOOTING_LOG_ENABLED = IgniteSystemProperties + .getBoolean(IgniteSystemProperties.IGNITE_TROUBLESHOOTING_LOGGER); + /** * @param node Node. * @return {@code True} if can use in/out connection pair for communication. @@ -154,7 +161,7 @@ public class CommunicationTcpUtils { * @param errs Error. * @return {@code True} if error was caused by some connection IO error or IgniteCheckedException due to timeout. */ - public static boolean isRecoverableException(Exception errs) { + public static boolean isRecoverableException(Throwable errs) { return X.hasCause( errs, IOException.class, @@ -162,4 +169,35 @@ public class CommunicationTcpUtils { IgniteSpiOperationTimeoutException.class ); } + + /** + * Forcibly fails client node. + * + * Is used in a single situation if a client node is visible to discovery but is not reachable via comm protocol. + * + * @param nodeToFail Client node to forcible fail. + * @param spiCtx Context to request node failing. + * @param err Error to fail client with. + * @param log Logger to print message about failed node to. + */ + public static void failNode(ClusterNode nodeToFail, + IgniteSpiContext spiCtx, + Throwable err, + IgniteLogger log + ) { + assert nodeToFail.isClient(); + + String logMsg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " + + "cluster [rmtNode=" + nodeToFail + ']'; + + if (THROUBLESHOOTING_LOG_ENABLED) + U.error(log, logMsg, err); + else + U.warn(log, logMsg); + + spiCtx.failNode(nodeToFail.id(), "TcpCommunicationSpi failed to establish connection to node [" + + "rmtNode=" + nodeToFail + + ", err=" + err + + ", connectErrs=" + X.getSuppressedList(err) + ']'); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java index 8445dcd..329ebc3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java @@ -31,6 +31,7 @@ import java.util.function.Function; import java.util.function.Supplier; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; @@ -121,6 +122,10 @@ public class ConnectionClientPool { /** Scheduled executor service which closed the socket if handshake timeout is out. **/ private final ScheduledExecutorService handshakeTimeoutExecutorService; + /** Enable forcible node kill. */ + private boolean forcibleNodeKillEnabled = IgniteSystemProperties + .getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL); + /** * @param cfg Config. * @param attrs Attributes. @@ -381,10 +386,17 @@ public class ConnectionClientPool { ? cfg.failureDetectionTimeout() : cfg.connectionTimeout(); - fut.get(failTimeout); - } - catch (IgniteCheckedException triggerException) { - IgniteSpiException spiE = new IgniteSpiException(triggerException); + fut.get(failTimeout); + } + catch (Throwable triggerException) { + if (forcibleNodeKillEnabled + && node.isClient() + && triggerException instanceof IgniteFutureTimeoutCheckedException + ) { + CommunicationTcpUtils.failNode(node, tcpCommSpi.getSpiContext(), triggerException, log); + } + + IgniteSpiException spiE = new IgniteSpiException(e); spiE.addSuppressed(e); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java index 6bc1502..1509d74 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java @@ -203,13 +203,9 @@ public class GridNioServerWrapper { private volatile ThrowableSupplier<SocketChannel, IOException> socketChannelFactory = SocketChannel::open; /** Enable forcible node kill. */ - private boolean enableForcibleNodeKill = IgniteSystemProperties + private boolean forcibleNodeKillEnabled = IgniteSystemProperties .getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL); - /** Enable troubleshooting logger. */ - private boolean enableTroubleshootingLog = IgniteSystemProperties - .getBoolean(IgniteSystemProperties.IGNITE_TROUBLESHOOTING_LOGGER); - /** NIO server. */ private GridNioServer<Message> nioSrv; @@ -647,12 +643,16 @@ public class GridNioServerWrapper { } if (ses == null) { - if (!(Thread.currentThread() instanceof IgniteDiscoveryThread) && locNodeIsSrv) { - if (node.isClient() && (addrs.size() - skippedAddrs == failedAddrsSet.size())) { - String msg = "Failed to connect to all addresses of node " + node.id() + ": " + failedAddrsSet + - "; inverse connection will be requested."; - - throw new NodeUnreachableException(msg); + // If local node and remote node are configured to use paired connections we won't even request + // inverse connection so no point in throwing NodeUnreachableException + if (!cfg.usePairedConnections() || !Boolean.TRUE.equals(node.attribute(attrs.pairedConnection()))) { + if (!(Thread.currentThread() instanceof IgniteDiscoveryThread) && locNodeIsSrv) { + if (node.isClient() && (addrs.size() - skippedAddrs == failedAddrsSet.size())) { + String msg = "Failed to connect to all addresses of node " + node.id() + ": " + failedAddrsSet + + "; inverse connection will be requested."; + + throw new NodeUnreachableException(msg); + } } } @@ -768,25 +768,14 @@ public class GridNioServerWrapper { ctx.resolveCommunicationFailure(node, errs); } - if (!commErrResolve && enableForcibleNodeKill) { + if (!commErrResolve && forcibleNodeKillEnabled) { if (ctx.node(node.id()) != null && node.isClient() && !locNodeSupplier.get().isClient() && isRecoverableException(errs) ) { - // Only server can fail client for now, as in TcpDiscovery resolveCommunicationFailure() is not supported. - String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " + - "cluster [" + "rmtNode=" + node + ']'; - - if (enableTroubleshootingLog) - U.error(log, msg, errs); - else - U.warn(log, msg); - - ctx.failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" + - "rmtNode=" + node + - ", errs=" + errs + - ", connectErrs=" + X.getSuppressedList(errs) + ']'); + CommunicationTcpUtils.failNode(node, + ctx, errs, log); } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java index cbf4777..7221d52 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java @@ -29,11 +29,14 @@ import java.util.stream.Collectors; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; @@ -41,6 +44,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage; @@ -50,6 +54,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.LogListener; +import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assume; import org.junit.Test; @@ -299,13 +304,29 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC IgniteEx srv = grid(SRVS_NUM - 1); - // We need to interrupt communication worker client nodes so that - // closed connection won't automatically reopen when we don't expect it. - // Server communication worker is interrupted for another reason - it can hang the test - // due to bug in inverse connection protocol & comm worker - it will be fixed later. + interruptCommWorkerThreads(client.name()); + + TcpCommunicationSpi spi = (TcpCommunicationSpi)srv.configuration().getCommunicationSpi(); + + GridTestUtils.invoke(spi, "onNodeLeft", clientNode.consistentId(), clientNode.id()); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> + srv.context().io().sendIoTest(clientNode, new byte[10], false).get() + ); + + assertTrue(GridTestUtils.waitForCondition(fut::isDone, 30_000)); + + assertTrue(lsnr.check()); + } + + /** + * We need to interrupt communication worker client nodes so that + * closed connection won't automatically reopen when we don't expect it. + */ + private void interruptCommWorkerThreads(String clientName) { List<Thread> tcpCommWorkerThreads = Thread.getAllStackTraces().keySet().stream() .filter(t -> t.getName().contains("tcp-comm-worker")) - .filter(t -> t.getName().contains(srv.name()) || t.getName().contains(client.name())) + .filter(t -> t.getName().contains(clientName)) .collect(Collectors.toList()); for (Thread tcpCommWorkerThread : tcpCommWorkerThreads) { @@ -313,6 +334,38 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC U.join(tcpCommWorkerThread, log); } + } + + /** + * Forcible node kill functionality is triggered in inverse connection request flow as well + * when a timeout for inverse connection is reached. + * + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, value = "true") + public void testClientSkippingInverseConnResponseIsForciblyFailed() throws Exception { + UNREACHABLE_DESTINATION.set(UNRESOLVED_HOST); + RESPOND_TO_INVERSE_REQUEST.set(false); + + AtomicBoolean clientFailedEventFlag = new AtomicBoolean(false); + + IgniteEx srv = startGrid(); + + srv.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event event) { + clientFailedEventFlag.set(true); + + return false; + } + }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); + + forceClientToSrvConnections = false; + + IgniteEx client = startClientGrid(1); + ClusterNode clientNode = client.localNode(); + + interruptCommWorkerThreads(client.name()); TcpCommunicationSpi spi = (TcpCommunicationSpi)srv.configuration().getCommunicationSpi(); @@ -322,9 +375,7 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC srv.context().io().sendIoTest(clientNode, new byte[10], false).get() ); - assertTrue(GridTestUtils.waitForCondition(fut::isDone, 30_000)); - - assertTrue(lsnr.check()); + assertTrue(GridTestUtils.waitForCondition(clientFailedEventFlag::get, 10_000)); } /**