ignite-1758
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0f23a53b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0f23a53b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0f23a53b Branch: refs/heads/ignite-1758 Commit: 0f23a53b531a187285ed2cc01a2a99e1c0c69abc Parents: 4917cff Author: sboikov <[email protected]> Authored: Tue Oct 27 09:14:50 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Oct 28 16:09:25 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 80 +++--- .../ignite/spi/discovery/tcp/ServerImpl.java | 105 +++++--- .../tcp/TcpDiscoveryMultiThreadedTest.java | 241 +++++++++++-------- 3 files changed, 260 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0f23a53b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 03ae201..9cadca1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -920,12 +920,8 @@ class ClientImpl extends TcpDiscoveryImpl { boolean ack = msg instanceof TcpDiscoveryClientAckResponse; - if (!ack) { - if (spi.ensured(msg) && joinLatch.getCount() == 0L) - lastMsgId = msg.id(); - + if (!ack) msgWorker.addMessage(msg); - } else sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg); } @@ -1237,13 +1233,8 @@ class ClientImpl extends TcpDiscoveryImpl { return; } - else { - U.warn(log, "Client failed to reconnect because failed to restore discovery " + - "messages history, consider increasing '" + - IGNITE_DISCOVERY_HISTORY_SIZE + "' system property."); - + else return; - } } } else if (spi.ensured(msg)) { @@ -1313,9 +1304,6 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ private SocketStream currSock; - /** Indicates that pending messages are currently processed. */ - private boolean pending; - /** */ private Reconnector reconnector; @@ -1361,11 +1349,13 @@ class ClientImpl extends TcpDiscoveryImpl { } } else if (msg == SPI_STOP) { + boolean connected = state == CONNECTED; + state = STOPPED; assert spi.getSpiContext().isStopping(); - if (currSock != null) { + if (connected && currSock != null) { TcpDiscoveryAbstractMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); leftMsg.client(true); @@ -1577,6 +1567,9 @@ class ClientImpl extends TcpDiscoveryImpl { processPingRequest(); spi.stats.onMessageProcessingFinished(msg); + + if (spi.ensured(msg) && state == CONNECTED) + lastMsgId = msg.id(); } /** @@ -1630,8 +1623,10 @@ class ClientImpl extends TcpDiscoveryImpl { if (msg.topologyHistory() != null) topHist.putAll(msg.topologyHistory()); } - else if (log.isDebugEnabled()) - log.debug("Discarding node added message with empty topology: " + msg); + else { + if (log.isDebugEnabled()) + log.debug("Discarding node added message with empty topology: " + msg); + } } else if (log.isDebugEnabled()) log.debug("Discarding node added message (this message has already been processed) " + @@ -1651,8 +1646,10 @@ class ClientImpl extends TcpDiscoveryImpl { spi.onExchange(newNodeId, newNodeId, data, null); } } - else if (log.isDebugEnabled()) - log.debug("Ignore topology message, local node not added to topology: " + msg); + else { + if (log.isDebugEnabled()) + log.debug("Ignore topology message, local node not added to topology: " + msg); + } } } @@ -1679,6 +1676,11 @@ class ClientImpl extends TcpDiscoveryImpl { locNode.order(topVer); + for (Iterator<Long> it = topHist.keySet().iterator(); it.hasNext();) { + if (it.next() >= topVer) + it.remove(); + } + Collection<ClusterNode> nodes = updateTopologyHistory(topVer, msg); notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, nodes); @@ -1738,7 +1740,7 @@ class ClientImpl extends TcpDiscoveryImpl { assert top != null && top.contains(node) : "Topology does not contain node [msg=" + msg + ", node=" + node + ", top=" + top + ']'; - if (!pending && joinLatch.getCount() > 0) { + if (state != CONNECTED) { if (log.isDebugEnabled()) log.debug("Discarding node add finished message (join process is not finished): " + msg); @@ -1751,8 +1753,10 @@ class ClientImpl extends TcpDiscoveryImpl { spi.stats.onNodeJoined(); } } - else if (log.isDebugEnabled()) - log.debug("Ignore topology message, local node not added to topology: " + msg); + else { + if (log.isDebugEnabled()) + log.debug("Ignore topology message, local node not added to topology: " + msg); + } } } @@ -1782,7 +1786,7 @@ class ClientImpl extends TcpDiscoveryImpl { Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg); - if (!pending && joinLatch.getCount() > 0) { + if (state != CONNECTED) { if (log.isDebugEnabled()) log.debug("Discarding node left message (join process is not finished): " + msg); @@ -1793,8 +1797,10 @@ class ClientImpl extends TcpDiscoveryImpl { spi.stats.onNodeLeft(); } - else if (log.isDebugEnabled()) - log.debug("Ignore topology message, local node not added to topology: " + msg); + else { + if (log.isDebugEnabled()) + log.debug("Ignore topology message, local node not added to topology: " + msg); + } } } @@ -1835,7 +1841,7 @@ class ClientImpl extends TcpDiscoveryImpl { Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg); - if (!pending && joinLatch.getCount() > 0) { + if (state != CONNECTED) { if (log.isDebugEnabled()) log.debug("Discarding node failed message (join process is not finished): " + msg); @@ -1908,18 +1914,11 @@ class ClientImpl extends TcpDiscoveryImpl { reconnector = null; - pending = true; - - try { - for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) { - if (log.isDebugEnabled()) - log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']'); + for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) { + if (log.isDebugEnabled()) + log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']'); - processDiscoveryMessage(pendingMsg); - } - } - finally { - pending = false; + processDiscoveryMessage(pendingMsg); } } else { @@ -1947,7 +1946,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param msg Message. */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { - if (msg.verified() && state == CONNECTED) { + if (state == CONNECTED) { DiscoverySpiListener lsnr = spi.lsnr; if (lsnr != null) { @@ -2120,6 +2119,11 @@ class ClientImpl extends TcpDiscoveryImpl { InputStream stream() { return in; } + + /** {@inheritDoc} */ + public String toString() { + return sock.toString(); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0f23a53b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 8c63eb2..94653fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1253,9 +1253,11 @@ class ServerImpl extends TcpDiscoveryImpl { lsnr.onDiscovery(type, topVer, node, top, hist, null); } - else if (log.isDebugEnabled()) - log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState + - ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); + else { + if (log.isDebugEnabled()) + log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState + + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); + } } /** @@ -1393,7 +1395,11 @@ class ServerImpl extends TcpDiscoveryImpl { if (node.id().equals(destNodeId)) { Collection<TcpDiscoveryNode> allNodes = ring.allNodes(); - Collection<TcpDiscoveryNode> topToSnd = new ArrayList<>(allNodes.size()); + + Collection<TcpDiscoveryNode> topToSnd = nodeAddedMsg.topology(); + + if (topToSnd == null) + topToSnd = new ArrayList<>(allNodes.size()); for (TcpDiscoveryNode n0 : allNodes) { assert n0.internalOrder() != 0 : n0; @@ -2165,19 +2171,9 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Sends message across the ring. - * - * @param msg Message to send + * @param msg Message. */ - @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"}) - private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) { - assert msg != null; - - assert ring.hasRemoteNodes(); - - for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs) - msgLsnr.apply(msg); - + private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { if (redirectToClients(msg)) { byte[] marshalledMsg = null; @@ -2200,6 +2196,23 @@ class ServerImpl extends TcpDiscoveryImpl { clientMsgWorker.addMessage(msgClone); } } + } + + /** + * Sends message across the ring. + * + * @param msg Message to send + */ + @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"}) + private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) { + assert msg != null; + + assert ring.hasRemoteNodes(); + + for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs) + msgLsnr.apply(msg); + + sendMessageToClients(msg); Collection<TcpDiscoveryNode> failedNodes; @@ -2814,7 +2827,7 @@ class ServerImpl extends TcpDiscoveryImpl { "[clientNode=" + existingNode + ", msg=" + reconMsg + ']'); } else { - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(reconMsg)) sendMessageAcrossRing(reconMsg); } } @@ -3006,8 +3019,11 @@ class ServerImpl extends TcpDiscoveryImpl { nodeAddedMsg.client(msg.client()); processNodeAddedMessage(nodeAddedMsg); + + if (nodeAddedMsg.verified()) + msgHist.add(nodeAddedMsg); } - else if (ring.hasRemoteNodes()) + else if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } @@ -3126,12 +3142,12 @@ class ServerImpl extends TcpDiscoveryImpl { locNodeId + ", clientNodeId=" + nodeId + ']'); } else { - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } } else { - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } } @@ -3193,17 +3209,35 @@ class ServerImpl extends TcpDiscoveryImpl { processNodeAddFinishedMessage(addFinishMsg); + if (addFinishMsg.verified()) + msgHist.add(addFinishMsg); + addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); return; } msg.verify(locNodeId); + + if (node.isClient()) { + Collection<TcpDiscoveryNode> allNodes = ring.allNodes(); + + Collection<TcpDiscoveryNode> top = new ArrayList<>(allNodes.size()); + + for (TcpDiscoveryNode n0 : allNodes) { + assert n0.internalOrder() > 0 : n0; + + if (n0.internalOrder() < node.internalOrder()) + top.add(n0); + } + + msg.topology(top); + } } else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) { // Local node already has node from message in local topology. // Just pass it to coordinator via the ring. - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); if (log.isDebugEnabled()) @@ -3391,7 +3425,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } @@ -3526,7 +3560,7 @@ class ServerImpl extends TcpDiscoveryImpl { notifyDiscovery(EVT_NODE_JOINED, topVer, locNode); } - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); checkPendingCustomMessages(); @@ -3694,7 +3728,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (ring.hasRemoteNodes()) { + if (sendMessageToRemotes(msg)) { try { sendMessageAcrossRing(msg); } @@ -3715,6 +3749,19 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * @param msg Message to send. + * @return {@code True} if message should be send across the ring. + */ + private boolean sendMessageToRemotes(TcpDiscoveryAbstractMessage msg) { + if (ring.hasRemoteNodes()) + return true; + + sendMessageToClients(msg); + + return false; + } + + /** * Processes node failed message. * * @param msg Node failed message. @@ -3846,7 +3893,7 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onNodeFailed(); } - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); else { if (log.isDebugEnabled()) @@ -3986,7 +4033,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } @@ -4052,7 +4099,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (ring.hasRemoteNodes()) { + if (sendMessageToRemotes(msg)) { if ((locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null || !hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) { // Message is on its first ring or just created on coordinator. @@ -4098,7 +4145,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } else { @@ -4305,7 +4352,7 @@ class ServerImpl extends TcpDiscoveryImpl { notifyDiscoveryListener(msg); } - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0f23a53b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 7c6a960..fcb0116 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; @@ -85,7 +86,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { if (client()) cfg.setClientMode(true); - cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder)); + cfg.setDiscoverySpi(new TcpDiscoverySpi(). + setIpFinder(ipFinder). + setJoinTimeout(60_000). + setNetworkTimeout(60_000)); cfg.setCacheConfiguration(); @@ -112,164 +116,203 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { * @throws Exception If any error occurs. */ public void testMultiThreadedClientsRestart() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1123"); + final AtomicBoolean done = new AtomicBoolean(); - clientFlagGlobal = false; + try { + clientFlagGlobal = false; - info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min."); + info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min."); - startGridsMultiThreaded(GRID_CNT); + startGridsMultiThreaded(GRID_CNT); - clientFlagGlobal = true; + clientFlagGlobal = true; - startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); + startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); - final AtomicBoolean done = new AtomicBoolean(); + final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT); - final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT); + IgniteInternalFuture<?> fut1 = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + clientFlagPerThread.set(true); - IgniteInternalFuture<?> fut1 = multithreadedAsync( - new Callable<Object>() { - @Override public Object call() throws Exception { - clientFlagPerThread.set(true); + int idx = clientIdx.getAndIncrement(); - int idx = clientIdx.getAndIncrement(); + while (!done.get()) { + stopGrid(idx, true); + startGrid(idx); + } - while (!done.get()) { - stopGrid(idx, true); - startGrid(idx); + return null; } + }, + CLIENT_GRID_CNT + ); - return null; - } - }, - CLIENT_GRID_CNT - ); - - Thread.sleep(getTestTimeout() - 60 * 1000); + Thread.sleep(getTestTimeout() - 60 * 1000); - done.set(true); + done.set(true); - fut1.get(); + fut1.get(); + } + finally { + done.set(true); + } } /** * @throws Exception If any error occurs. */ public void testMultiThreadedClientsServersRestart() throws Throwable { - clientFlagGlobal = false; + final AtomicBoolean done = new AtomicBoolean(); - info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min."); + try { + clientFlagGlobal = false; - startGridsMultiThreaded(GRID_CNT); + info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min."); - clientFlagGlobal = true; + startGridsMultiThreaded(GRID_CNT); - startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); + clientFlagGlobal = true; - final AtomicBoolean done = new AtomicBoolean(); + startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); - final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT); + final AtomicReference<Throwable> error = new AtomicReference<>(); - final AtomicReference<Throwable> error = new AtomicReference<>(); + final BlockingQueue<Integer> clientStopIdxs = new LinkedBlockingQueue<>(); - IgniteInternalFuture<?> fut1 = multithreadedAsync( - new Callable<Object>() { - @Override public Object call() throws Exception { - try { - clientFlagPerThread.set(true); + for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++) + clientStopIdxs.add(i); - int idx = clientIdx.getAndIncrement(); + final AtomicInteger clientStartIdx = new AtomicInteger(9000); - while (!done.get() && error.get() == null) { - stopGrid(idx); + IgniteInternalFuture<?> fut1 = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + try { + clientFlagPerThread.set(true); - try { - startGrid(idx); - } - catch (Exception e) { - if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class)) - log.info("Client disconnected: " + e); - else - throw e; + while (!done.get() && error.get() == null) { + Integer stopIdx = clientStopIdxs.take(); + + log.info("Stop client: " + stopIdx); + + stopGrid(stopIdx); + + while (!done.get() && error.get() == null) { + // Generate unique name to simplify debugging. + int startIdx = clientStartIdx.getAndIncrement(); + + log.info("Start client: " + startIdx); + + try { + Ignite ignite = startGrid(startIdx); + + assertTrue(ignite.configuration().isClientMode()); + + clientStopIdxs.add(startIdx); + + break; + } + catch (Exception e) { + if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) || + X.hasCause(e, IgniteClientDisconnectedException.class)) + log.info("Client disconnected: " + e); + else + throw e; + } + } } } - } - catch (Throwable e) { - log.error("Unexpected error: " + e, e); + catch (Throwable e) { + log.error("Unexpected error: " + e, e); + + error.compareAndSet(null, e); - error.compareAndSet(null, e); + return null; + } return null; } + }, + CLIENT_GRID_CNT, + "client-restart"); - return null; - } - }, - CLIENT_GRID_CNT - ); + final BlockingQueue<Integer> srvStopIdxs = new LinkedBlockingQueue<>(); - final BlockingQueue<Integer> srvIdx = new LinkedBlockingQueue<>(); + for (int i = 0; i < GRID_CNT; i++) + srvStopIdxs.add(i); - for (int i = 0; i < GRID_CNT; i++) - srvIdx.add(i); + final AtomicInteger srvStartIdx = new AtomicInteger(GRID_CNT + CLIENT_GRID_CNT); - IgniteInternalFuture<?> fut2 = multithreadedAsync( - new Callable<Object>() { - @Override public Object call() throws Exception { - try { - clientFlagPerThread.set(false); + IgniteInternalFuture<?> fut2 = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + try { + clientFlagPerThread.set(false); - while (!done.get() && error.get() == null) { - int idx = srvIdx.take(); + while (!done.get() && error.get() == null) { + int stopIdx = srvStopIdxs.take(); - stopGrid(idx); - startGrid(idx); + log.info("Stop server: " + stopIdx); + + stopGrid(stopIdx); + + // Generate unique name to simplify debugging. + int startIdx = srvStartIdx.getAndIncrement(); + + log.info("Start server: " + startIdx); - srvIdx.add(idx); + Ignite ignite = startGrid(startIdx); + + assertFalse(ignite.configuration().isClientMode()); + + srvStopIdxs.add(startIdx); + } } - } - catch (Throwable e) { - log.error("Unexpected error: " + e, e); + catch (Throwable e) { + log.error("Unexpected error: " + e, e); - error.compareAndSet(null, e); + error.compareAndSet(null, e); + + return null; + } return null; } + }, + GRID_CNT - 1, + "server-restart"); - return null; - } - }, - GRID_CNT - 1 - ); - - long timeToExec = getTestTimeout() - 60 * 1000; + final long timeToExec = 2 * 60 * 1000; - while (timeToExec > 0) { - long start = System.currentTimeMillis(); + final long endTime = System.currentTimeMillis() + timeToExec; - Thread.sleep(3000); + while (System.currentTimeMillis() < endTime) { + Thread.sleep(3000); - timeToExec -= (System.currentTimeMillis() - start); + if (error.get() != null) { + Throwable err = error.get(); - if (error.get() != null) { - Throwable err = error.get(); + U.error(log, "Test failed: " + err.getMessage()); - U.error(log, "Test failed: " + err.getMessage()); + done.set(true); - done.set(true); + fut1.cancel(); + fut2.cancel(); - fut1.cancel(); - fut2.cancel(); - - throw err; + throw err; + } } - } - done.set(true); + done.set(true); - fut1.get(); - fut2.get(); + fut1.get(); + fut2.get(); + } + finally { + done.set(true); + } } /**
