ignite-5860 Try process TcpDiscoveryClientReconnectMessage from socket reader instead of always processing it on coordinator.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56a63f80 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56a63f80 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56a63f80 Branch: refs/heads/ignite-6748 Commit: 56a63f80d1181e53a3e2a4c4f88e42226bbac86e Parents: 717c549 Author: Denis Mekhanikov <[email protected]> Authored: Fri Oct 27 14:12:36 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Oct 27 14:13:40 2017 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 52 +++- .../ignite/spi/discovery/tcp/ServerImpl.java | 311 ++++++++++--------- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 4 +- ...lientDiscoverySpiFailureTimeoutSelfTest.java | 20 +- .../tcp/TcpClientDiscoverySpiSelfTest.java | 275 +++++++++++++++- 5 files changed, 467 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 5dbfe6e..139c110 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -26,6 +26,7 @@ import java.net.Socket; import java.net.SocketTimeoutException; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -470,7 +471,8 @@ class ClientImpl extends TcpDiscoveryImpl { } /** - * @param recon {@code True} if reconnects. + * @param prevAddr If reconnect is in progress, then previous address of the router the client was connected to + * and {@code null} otherwise. * @param timeout Timeout. * @return Opened socket or {@code null} if timeout. * @throws InterruptedException If interrupted. @@ -478,9 +480,9 @@ class ClientImpl extends TcpDiscoveryImpl { * @see TcpDiscoverySpi#joinTimeout */ @SuppressWarnings("BusyWait") - @Nullable private T2<SocketStream, Boolean> joinTopology(boolean recon, long timeout) + @Nullable private T2<SocketStream, Boolean> joinTopology(InetSocketAddress prevAddr, long timeout) throws IgniteSpiException, InterruptedException { - Collection<InetSocketAddress> addrs = null; + List<InetSocketAddress> addrs = null; long startTime = U.currentTimeMillis(); @@ -489,7 +491,7 @@ class ClientImpl extends TcpDiscoveryImpl { throw new InterruptedException(); while (addrs == null || addrs.isEmpty()) { - addrs = spi.resolvedAddresses(); + addrs = new ArrayList<>(spi.resolvedAddresses()); if (!F.isEmpty(addrs)) { if (log.isDebugEnabled()) @@ -509,22 +511,30 @@ class ClientImpl extends TcpDiscoveryImpl { } } - Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs); + // Process failed node last. + if (prevAddr != null) { + int idx = addrs.indexOf(prevAddr); - Iterator<InetSocketAddress> it = addrs.iterator(); + if (idx != -1) + Collections.swap(addrs, idx, 0); + } + + Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs); boolean wait = false; - while (it.hasNext()) { + for (int i = addrs.size() - 1; i >= 0; i--) { if (Thread.currentThread().isInterrupted()) throw new InterruptedException(); - InetSocketAddress addr = it.next(); + InetSocketAddress addr = addrs.get(i); + + boolean recon = prevAddr != null; T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr); if (sockAndRes == null) { - it.remove(); + addrs.remove(i); continue; } @@ -852,8 +862,8 @@ class ClientImpl extends TcpDiscoveryImpl { } /** {@inheritDoc} */ - @Override protected IgniteSpiThread workerThread() { - return msgWorker; + @Override protected Collection<IgniteSpiThread> threads() { + return Arrays.asList(sockWriter, msgWorker); } /** @@ -1336,15 +1346,20 @@ class ClientImpl extends TcpDiscoveryImpl { private boolean clientAck; /** */ - private boolean join; + private final boolean join; + + /** */ + private final InetSocketAddress prevAddr; /** * @param join {@code True} if reconnects during join. + * @param prevAddr Address of the node, that this client was previously connected to. */ - protected Reconnector(boolean join) { + protected Reconnector(boolean join, InetSocketAddress prevAddr) { super(spi.ignite().name(), "tcp-client-disco-reconnector", log); this.join = join; + this.prevAddr = prevAddr; } /** @@ -1374,7 +1389,7 @@ class ClientImpl extends TcpDiscoveryImpl { try { while (true) { - T2<SocketStream, Boolean> joinRes = joinTopology(true, timeout); + T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, timeout); if (joinRes == null) { if (join) { @@ -1609,6 +1624,10 @@ class ClientImpl extends TcpDiscoveryImpl { } else if (msg instanceof SocketClosedMessage) { if (((SocketClosedMessage)msg).sock == currSock) { + Socket sock = currSock.sock; + + InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(), sock.getPort()); + currSock = null; boolean join = joinLatch.getCount() > 0; @@ -1637,8 +1656,7 @@ class ClientImpl extends TcpDiscoveryImpl { assert reconnector == null; - final Reconnector reconnector = new Reconnector(join); - this.reconnector = reconnector; + reconnector = new Reconnector(join, prevAddr); reconnector.start(); } } @@ -1811,7 +1829,7 @@ class ClientImpl extends TcpDiscoveryImpl { T2<SocketStream, Boolean> joinRes; try { - joinRes = joinTopology(false, spi.joinTimeout); + joinRes = joinTopology(null, spi.joinTimeout); } catch (IgniteSpiException e) { joinError(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index efe531a..1c3ec2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -219,6 +219,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */ private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>(); + /** Messages history used for client reconnect. */ + private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory(); + /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */ private boolean ipFinderHasLocAddr; @@ -1663,8 +1666,23 @@ class ServerImpl extends TcpDiscoveryImpl { } /** {@inheritDoc} */ - @Override protected IgniteSpiThread workerThread() { - return msgWorker; + @Override protected Collection<IgniteSpiThread> threads() { + Collection<IgniteSpiThread> threads; + + synchronized (mux) { + threads = new ArrayList<>(readers.size() + clientMsgWorkers.size() + 4); + threads.addAll(readers); + } + + threads.addAll(clientMsgWorkers.values()); + threads.add(tcpSrvr); + threads.add(ipFinderCleaner); + threads.add(msgWorker); + threads.add(statsPrinter); + + threads.removeAll(Collections.<IgniteSpiThread>singleton(null)); + + return threads; } /** @@ -2122,7 +2140,9 @@ class ServerImpl extends TcpDiscoveryImpl { else if (msg instanceof TcpDiscoveryNodeFailedMessage) clearClientAddFinished(((TcpDiscoveryNodeFailedMessage)msg).failedNodeId()); - msgs.add(msg); + synchronized (msgs) { + msgs.add(msg); + } } /** @@ -2161,14 +2181,16 @@ class ServerImpl extends TcpDiscoveryImpl { // Client connection failed before it received TcpDiscoveryNodeAddedMessage. List<TcpDiscoveryAbstractMessage> res = null; - for (TcpDiscoveryAbstractMessage msg : msgs) { - if (msg instanceof TcpDiscoveryNodeAddedMessage) { - if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id())) - res = new ArrayList<>(msgs.size()); - } + synchronized (msgs) { + for (TcpDiscoveryAbstractMessage msg : msgs) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id())) + res = new ArrayList<>(msgs.size()); + } - if (res != null) - res.add(prepare(msg, node.id())); + if (res != null) + res.add(prepare(msg, node.id())); + } } if (log.isDebugEnabled()) { @@ -2181,20 +2203,26 @@ class ServerImpl extends TcpDiscoveryImpl { return res; } else { - if (msgs.isEmpty()) - return Collections.emptyList(); + Collection<TcpDiscoveryAbstractMessage> cp; - Collection<TcpDiscoveryAbstractMessage> cp = new ArrayList<>(msgs.size()); + boolean skip; - boolean skip = true; + synchronized (msgs) { + if (msgs.isEmpty()) + return Collections.emptyList(); - for (TcpDiscoveryAbstractMessage msg : msgs) { - if (skip) { - if (msg.id().equals(lastMsgId)) - skip = false; + cp = new ArrayList<>(msgs.size()); + + skip = true; + + for (TcpDiscoveryAbstractMessage msg : msgs) { + if (skip) { + if (msg.id().equals(lastMsgId)) + skip = false; + } + else + cp.add(prepare(msg, node.id())); } - else - cp.add(prepare(msg, node.id())); } cp = !skip ? cp : null; @@ -2483,9 +2511,6 @@ class ServerImpl extends TcpDiscoveryImpl { /** Pending messages. */ private final PendingMessages pendingMsgs = new PendingMessages(); - /** Messages history used for client reconnect. */ - private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory(); - /** Last message that updated topology. */ private TcpDiscoveryAbstractMessage lastMsg; @@ -2659,8 +2684,10 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg instanceof TcpDiscoveryJoinRequestMessage) processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg); - else if (msg instanceof TcpDiscoveryClientReconnectMessage) - processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg); + else if (msg instanceof TcpDiscoveryClientReconnectMessage) { + if (sendMessageToRemotes(msg)) + sendMessageAcrossRing(msg); + } else if (msg instanceof TcpDiscoveryNodeAddedMessage) processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); @@ -2695,9 +2722,6 @@ class ServerImpl extends TcpDiscoveryImpl { else assert false : "Unknown message type: " + msg.getClass().getSimpleName(); - if (ensured && redirectToClients(msg)) - msgHist.add(msg); - if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) { // Received a message from remote node. onMessageExchanged(); @@ -2730,6 +2754,9 @@ class ServerImpl extends TcpDiscoveryImpl { */ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { if (redirectToClients(msg)) { + if (spi.ensured(msg)) + msgHist.add(msg); + byte[] msgBytes = null; for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { @@ -3836,9 +3863,6 @@ class ServerImpl extends TcpDiscoveryImpl { nodeAddedMsg.client(msg.client()); processNodeAddedMessage(nodeAddedMsg); - - if (nodeAddedMsg.verified()) - msgHist.add(nodeAddedMsg); } else if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); @@ -3941,98 +3965,6 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Processes client reconnect message. - * - * @param msg Client reconnect message. - */ - private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) { - UUID nodeId = msg.creatorNodeId(); - - UUID locNodeId = getLocalNodeId(); - - boolean isLocNodeRouter = locNodeId.equals(msg.routerNodeId()); - - if (!msg.verified()) { - TcpDiscoveryNode node = ring.node(nodeId); - - assert node == null || node.isClient(); - - if (node != null) { - node.clientRouterNodeId(msg.routerNodeId()); - node.clientAliveTime(spi.clientFailureDetectionTimeout()); - } - - if (isLocalNodeCoordinator()) { - msg.verify(locNodeId); - - if (node != null) { - Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node); - - if (pending != null) { - msg.pendingMessages(pending); - msg.success(true); - - if (log.isDebugEnabled()) - log.debug("Accept client reconnect, restored pending messages " + - "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); - } - else { - if (log.isDebugEnabled()) - log.debug("Failing reconnecting client node because failed to restore pending " + - "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); - - TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId, - node.id(), node.internalOrder()); - - processNodeFailedMessage(nodeFailedMsg); - - if (nodeFailedMsg.verified()) - msgHist.add(nodeFailedMsg); - } - } - else if (log.isDebugEnabled()) - log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']'); - - if (isLocNodeRouter) { - ClientMessageWorker wrk = clientMsgWorkers.get(nodeId); - - if (wrk != null) - wrk.addMessage(msg); - else if (log.isDebugEnabled()) - log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" + - locNodeId + ", clientNodeId=" + nodeId + ']'); - } - else { - if (sendMessageToRemotes(msg)) - sendMessageAcrossRing(msg); - } - } - else { - if (sendMessageToRemotes(msg)) - sendMessageAcrossRing(msg); - } - } - else { - if (isLocalNodeCoordinator()) - addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); - - if (isLocNodeRouter) { - ClientMessageWorker wrk = clientMsgWorkers.get(nodeId); - - if (wrk != null) - wrk.addMessage(msg); - else if (log.isDebugEnabled()) - log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" + - locNodeId + ", clientNodeId=" + nodeId + ']'); - } - else { - if (ring.hasRemoteNodes() && !isLocalNodeCoordinator()) - sendMessageAcrossRing(msg); - } - } - } - - /** * Processes node added message. * * For coordinator node method marks the messages as verified for rest of nodes to apply the @@ -4078,9 +4010,6 @@ class ServerImpl extends TcpDiscoveryImpl { processNodeAddFinishedMessage(addFinishMsg); - if (addFinishMsg.verified()) - msgHist.add(addFinishMsg); - addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); return; @@ -5145,9 +5074,6 @@ class ServerImpl extends TcpDiscoveryImpl { locNodeId, clientNode.id(), clientNode.internalOrder()); processNodeFailedMessage(nodeFailedMsg); - - if (nodeFailedMsg.verified()) - msgHist.add(nodeFailedMsg); } } } @@ -5342,9 +5268,6 @@ class ServerImpl extends TcpDiscoveryImpl { ackMsg.topologyVersion(msg.topologyVersion()); processCustomMessage(ackMsg); - - if (ackMsg.verified()) - msgHist.add(ackMsg); } catch (IgniteCheckedException e) { U.error(log, "Failed to marshal discovery custom message.", e); @@ -5446,12 +5369,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (joiningEmpty && isLocalNodeCoordinator()) { TcpDiscoveryCustomEventMessage msg; - while ((msg = pollPendingCustomeMessage()) != null) { + while ((msg = pollPendingCustomeMessage()) != null) processCustomMessage(msg); - - if (msg.verified()) - msgHist.add(msg); - } } } @@ -6005,24 +5924,22 @@ class ServerImpl extends TcpDiscoveryImpl { } } else if (msg instanceof TcpDiscoveryClientReconnectMessage) { - if (clientMsgWrk != null) { - TcpDiscoverySpiState state = spiStateCopy(); + TcpDiscoverySpiState state = spiStateCopy(); - if (state == CONNECTED) { - spi.writeToSocket(msg, sock, RES_OK, sockTimeout); + if (state == CONNECTED) { + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); - if (clientMsgWrk.getState() == State.NEW) - clientMsgWrk.start(); + if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW) + clientMsgWrk.start(); - msgWorker.addMessage(msg); + processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg); - continue; - } - else { - spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout); + continue; + } + else { + spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout); - break; - } + break; } } else if (msg instanceof TcpDiscoveryDuplicateIdMessage) { @@ -6266,6 +6183,100 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Processes client reconnect message. + * + * @param msg Client reconnect message. + */ + private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) { + UUID nodeId = msg.creatorNodeId(); + + UUID locNodeId = getLocalNodeId(); + + boolean isLocNodeRouter = msg.routerNodeId().equals(locNodeId); + + TcpDiscoveryNode node = ring.node(nodeId); + + assert node == null || node.isClient(); + + if (node != null) { + node.clientRouterNodeId(msg.routerNodeId()); + node.clientAliveTime(spi.clientFailureDetectionTimeout()); + } + + if (!msg.verified()) { + if (isLocNodeRouter || isLocalNodeCoordinator()) { + if (node != null) { + Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node); + + if (pending != null) { + msg.verify(locNodeId); + msg.pendingMessages(pending); + msg.success(true); + + if (log.isDebugEnabled()) + log.debug("Accept client reconnect, restored pending messages " + + "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); + } + else if (!isLocalNodeCoordinator()) { + if (log.isDebugEnabled()) + log.debug("Failed to restore pending messages for reconnecting client. " + + "Forwarding reconnection message to coordinator " + + "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); + } + else { + msg.verify(locNodeId); + + if (log.isDebugEnabled()) + log.debug("Failing reconnecting client node because failed to restore pending " + + "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); + + TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId, + node.id(), node.internalOrder()); + + msgWorker.addMessage(nodeFailedMsg); + } + } + else { + msg.verify(locNodeId); + + if (log.isDebugEnabled()) + log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']'); + } + + if (msg.verified() && isLocNodeRouter) { + ClientMessageWorker wrk = clientMsgWorkers.get(nodeId); + + if (wrk != null) + wrk.addMessage(msg); + else if (log.isDebugEnabled()) + log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" + + locNodeId + ", clientNodeId=" + nodeId + ']'); + } + else + msgWorker.addMessage(msg); + } + else + msgWorker.addMessage(msg); + } + else { + if (isLocalNodeCoordinator()) + msgWorker.addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); + + if (isLocNodeRouter) { + ClientMessageWorker wrk = clientMsgWorkers.get(nodeId); + + if (wrk != null) + wrk.addMessage(msg); + else if (log.isDebugEnabled()) + log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" + + locNodeId + ", clientNodeId=" + nodeId + ']'); + } + else if (ring.hasRemoteNodes() && !isLocalNodeCoordinator()) + msgWorker.addMessage(msg); + } + } + + /** * Processes client metrics update message. * * @param msg Client metrics update message. http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index b31e2e4..f3cf48d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -299,9 +299,9 @@ abstract class TcpDiscoveryImpl { /** * <strong>FOR TEST ONLY!!!</strong> * - * @return Worker thread. + * @return Worker threads. */ - protected abstract IgniteSpiThread workerThread(); + protected abstract Collection<IgniteSpiThread> threads(); /** * @throws IgniteSpiException If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java index 689ac72..f1c826a 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java @@ -56,15 +56,9 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov /** */ private final static long FAILURE_THRESHOLD = 10_000; - /** */ - private final static long CLIENT_FAILURE_THRESHOLD = 30_000; - /** Failure detection timeout for nodes configuration. */ private static long failureThreshold = FAILURE_THRESHOLD; - /** Client failure detection timeout for nodes configuration. */ - private static long clientFailureThreshold = CLIENT_FAILURE_THRESHOLD; - /** */ private static boolean useTestSpi; @@ -75,7 +69,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov /** {@inheritDoc} */ @Override protected long clientFailureDetectionTimeout() { - return clientFailureThreshold; + return clientFailureDetectionTimeout; } /** {@inheritDoc} */ @@ -153,7 +147,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov */ public void testFailureTimeoutServerClient() throws Exception { failureThreshold = 3000; - clientFailureThreshold = 2000; + clientFailureDetectionTimeout = 2000; try { startServerNodes(1); @@ -190,13 +184,12 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov long detectTime = failureDetectTime[0] - failureTime; assertTrue("Client node failure detected too fast: " + detectTime + "ms", - detectTime > clientFailureThreshold - 200); + detectTime > clientFailureDetectionTimeout - 200); assertTrue("Client node failure detected too slow: " + detectTime + "ms", - detectTime < clientFailureThreshold + 5000); + detectTime < clientFailureDetectionTimeout + 5000); } finally { failureThreshold = FAILURE_THRESHOLD; - clientFailureThreshold = CLIENT_FAILURE_THRESHOLD; } } @@ -207,7 +200,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov */ public void testFailureTimeout3Server() throws Exception { failureThreshold = 1000; - clientFailureThreshold = 10000; + clientFailureDetectionTimeout = 10000; useTestSpi = true; try { @@ -254,11 +247,10 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov assertTrue("Server node failure detected too fast: " + detectTime + "ms", detectTime > failureThreshold - 100); assertTrue("Server node failure detected too slow: " + detectTime + "ms", - detectTime < clientFailureThreshold); + detectTime < clientFailureDetectionTimeout); } finally { failureThreshold = FAILURE_THRESHOLD; - clientFailureThreshold = CLIENT_FAILURE_THRESHOLD; useTestSpi = false; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 329783e..ee88b0f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -62,8 +62,8 @@ import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; +import org.apache.ignite.spi.IgniteSpiThread; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage; @@ -73,6 +73,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; + import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; @@ -87,7 +88,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; */ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** */ protected static final AtomicInteger srvIdx = new AtomicInteger(); @@ -123,6 +124,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { private static CountDownLatch clientFailedLatch; /** */ + private static CountDownLatch clientReconnectedLatch; + + /** */ private static CountDownLatch msgLatch; /** */ @@ -138,10 +142,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { protected long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT; /** */ + protected Integer reconnectCnt; + + /** */ private boolean longSockTimeouts; /** */ - private long clientFailureDetectionTimeout = 1000; + protected long clientFailureDetectionTimeout = 1000; /** */ private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite; @@ -207,6 +214,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { disco.setJoinTimeout(joinTimeout); disco.setNetworkTimeout(netTimeout); + if (reconnectCnt != null) + disco.setReconnectCount(reconnectCnt); + disco.setClientReconnectDisabled(reconnectDisabled); if (disco instanceof TestTcpDiscoverySpi) @@ -253,6 +263,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { clientIpFinder = null; joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT; netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT; + clientFailureDetectionTimeout = 1000; longSockTimeouts = false; assert G.allGrids().isEmpty(); @@ -558,6 +569,221 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** + * Client should reconnect to available server without EVT_CLIENT_NODE_RECONNECTED event. + * + * @throws Exception If failed. + */ + public void testClientReconnectOnRouterSuspend() throws Exception { + reconnectAfterSuspend(false); + } + + /** + * Client should receive all topology updates after reconnect. + * + * @throws Exception If failed. + */ + public void testClientReconnectOnRouterSuspendTopologyChange() throws Exception { + reconnectAfterSuspend(true); + } + + /** + * @param changeTop If {@code true} topology is changed after client disconnects + * @throws Exception if failed. + */ + private void reconnectAfterSuspend(boolean changeTop) throws Exception { + reconnectCnt = 2; + + startServerNodes(2); + + Ignite srv0 = grid("server-0"); + TcpDiscoveryNode srv0Node = (TcpDiscoveryNode)srv0.cluster().localNode(); + + TcpDiscoveryNode srv1Node = (TcpDiscoveryNode)grid("server-1").cluster().localNode(); + + clientIpFinder = new TcpDiscoveryVmIpFinder(); + + clientIpFinder.setAddresses( + Collections.singleton("localhost:" + srv0Node.discoveryPort())); + + startClientNodes(1); + + Ignite client = grid("client-0"); + TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode(); + TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi(); + + UUID clientNodeId = clientNode.id(); + + checkNodes(2, 1); + + clientIpFinder.setAddresses(Collections.singleton("localhost:" + srv1Node.discoveryPort())); + + srvFailedLatch = new CountDownLatch(1); + + attachListeners(2, 1); + + log.info("Pausing router"); + + TestTcpDiscoverySpi srvSpi = (TestTcpDiscoverySpi)srv0.configuration().getDiscoverySpi(); + + int joinedNodesNum = 3; + final CountDownLatch srvJoinedLatch = new CountDownLatch(joinedNodesNum); + + if (changeTop) { + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event e) { + srvJoinedLatch.countDown(); + + return true; + } + }, EVT_NODE_JOINED); + } + + srvSpi.pauseAll(true); + + if (changeTop) + startServerNodes(joinedNodesNum); + + try { + await(srvFailedLatch, 60_000); + + if (changeTop) + await(srvJoinedLatch, 5000); + + assertEquals("connected", clientSpi.getSpiState()); + assertEquals(clientNodeId, clientNode.id()); + assertEquals(srv1Node.id(), clientNode.clientRouterNodeId()); + } + finally { + srvSpi.resumeAll(); + } + } + + /** + * @throws Exception if failed. + */ + public void testClientReconnectHistoryMissingOnRouter() throws Exception { + clientFailureDetectionTimeout = 60000; + netTimeout = 60000; + + startServerNodes(2); + + Ignite srv0 = grid("server-0"); + TcpDiscoveryNode srv0Node = (TcpDiscoveryNode)srv0.cluster().localNode(); + + clientIpFinder = new TcpDiscoveryVmIpFinder(); + clientIpFinder.setAddresses( + Collections.singleton("localhost:" + srv0Node.discoveryPort())); + + startClientNodes(1); + + attachListeners(0, 1); + + Ignite client = grid("client-0"); + TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode(); + TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi(); + UUID clientNodeId = clientNode.id(); + + checkNodes(2, 1); + + clientSpi.pauseAll(true); + + stopGrid(srv0.name()); + + startServerNodes(1); + + Ignite srv2 = grid("server-2"); + TcpDiscoveryNode srv2Node = (TcpDiscoveryNode)srv2.cluster().localNode(); + clientIpFinder.setAddresses( + Collections.singleton("localhost:" + srv2Node.discoveryPort())); + + clientSpi.resumeAll(); + + awaitPartitionMapExchange(); + + assertEquals("connected", clientSpi.getSpiState()); + assertEquals(clientNodeId, clientNode.id()); + assertEquals(srv2Node.id(), clientNode.clientRouterNodeId()); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectAfterPause() throws Exception { + startServerNodes(2); + startClientNodes(1); + + Ignite client = grid("client-0"); + TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi(); + + clientReconnectedLatch = new CountDownLatch(1); + + attachListeners(0, 1); + + clientSpi.pauseAll(false); + + try { + clientSpi.brakeConnection(); + + Thread.sleep(clientFailureDetectionTimeout() * 2); + } + finally { + clientSpi.resumeAll(); + } + + await(clientReconnectedLatch); + } + + /** + * @throws Exception if failed. + */ + public void testReconnectAfterMassiveTopologyChange() throws Exception { + clientIpFinder = IP_FINDER; + + clientFailureDetectionTimeout = 60000; + netTimeout = 60000; + + int initSrvsNum = 5; + int killNum = 3; + int iterations = 10; + + startServerNodes(initSrvsNum); + startClientNodes(1); + + Ignite client = grid("client-0"); + TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode(); + TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi(); + final UUID clientNodeId = clientNode.id(); + + final CountDownLatch srvJoinedLatch = new CountDownLatch(iterations * killNum); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event e) { + srvJoinedLatch.countDown(); + + return true; + } + }, EVT_NODE_JOINED); + + int minAliveSrvId = 0; + + for (int i = 0; i < iterations; i++) { + for (int j = 0; j < killNum; j++) { + stopGrid(minAliveSrvId); + + minAliveSrvId++; + } + + startServerNodes(killNum); + + awaitPartitionMapExchange(); + } + + await(srvJoinedLatch); + assertEquals("connected", clientSpi.getSpiState()); + assertEquals(clientNodeId, clientNode.id()); + } + + /** * @throws Exception If failed. */ public void testClientReconnectOnNetworkProblem() throws Exception { @@ -1410,17 +1636,16 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { srvSpi.failNode(client.cluster().localNode().id(), null); - if (changeTop) { - Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + assertTrue(failLatch.await(5000, MILLISECONDS)); - srvNodeIds.add(g.cluster().localNode().id()); + if (changeTop) { + startServerNodes(1); clientSpi.resumeAll(); } - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); assertTrue(reconnectLatch.await(5000, MILLISECONDS)); - assertTrue(failLatch.await(5000, MILLISECONDS)); assertTrue(joinLatch.await(5000, MILLISECONDS)); long topVer = changeTop ? 5L : 4L; @@ -2026,6 +2251,20 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { }, EVT_NODE_FAILED); } } + + if (clientReconnectedLatch != null) { + for (int i = 0; i < clientCnt; i++) { + G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Reconnected event fired on client: " + evt); + + clientReconnectedLatch.countDown(); + + return true; + } + }, EVT_CLIENT_NODE_RECONNECTED); + } + } } /** @@ -2095,7 +2334,16 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @throws InterruptedException If interrupted. */ protected void await(CountDownLatch latch) throws InterruptedException { - assertTrue("Latch count: " + latch.getCount(), latch.await(awaitTime(), MILLISECONDS)); + await(latch, awaitTime()); + } + + /** + * @param latch Latch. + * @param timeout Timeout. + * @throws InterruptedException If interrupted. + */ + protected void await(CountDownLatch latch, long timeout) throws InterruptedException { + assertTrue("Latch count: " + latch.getCount(), latch.await(timeout, MILLISECONDS)); } /** @@ -2324,8 +2572,10 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { public void pauseAll(boolean suspend) { pauseResumeOperation(true, openSockLock, writeLock); - if (suspend) - impl.workerThread().suspend(); + if (suspend) { + for (Thread t : impl.threads()) + t.suspend(); + } } /** @@ -2334,7 +2584,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { public void resumeAll() { pauseResumeOperation(false, openSockLock, writeLock); - impl.workerThread().resume(); + for (IgniteSpiThread t : impl.threads()) + t.resume(); } /** {@inheritDoc} */
