ignite-1.5 Fixed test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/49c29886 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/49c29886 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/49c29886 Branch: refs/heads/ignite-2236 Commit: 49c298866b7c113aa62af4fe0587d6d39edd9f50 Parents: debe34d Author: sboikov <sboi...@gridgain.com> Authored: Thu Dec 24 15:52:59 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Dec 24 15:52:59 2015 +0300 ---------------------------------------------------------------------- .../util/nio/GridNioRecoveryDescriptor.java | 7 - .../ignite/internal/util/nio/GridNioServer.java | 7 + .../communication/tcp/TcpCommunicationSpi.java | 12 +- .../internal/util/nio/GridNioSelfTest.java | 127 ++++++++++++------- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 14 ++ .../GridUriDeploymentFileProcessorSelfTest.java | 19 ++- 6 files changed, 120 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/49c29886/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 5647239..429f990 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -134,13 +134,6 @@ public class GridNioRecoveryDescriptor { } /** - * @return Received messages count. - */ - public long receivedCount() { - return rcvCnt; - } - - /** * @return Maximum size of unacknowledged messages queue. */ public int queueLimit() { http://git-wip-us.apache.org/repos/asf/ignite/blob/49c29886/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index be28c30..17a0b8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -309,6 +309,13 @@ public class GridNioServer<T> { } /** + * @return Configured port. + */ + public int port() { + return locAddr != null ? locAddr.getPort() : -1; + } + + /** * Creates and returns a builder for a new instance of this class. * * @return Builder for new instance. http://git-wip-us.apache.org/repos/asf/ignite/blob/49c29886/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index bf6e869..6cdfe9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -620,7 +620,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter nioSrvr.resend(ses); if (sndRes) - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.receivedCount())); + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); recovery.connected(); @@ -714,7 +714,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } }; - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr); + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr); } else { try { @@ -2587,16 +2587,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else ch.write(ByteBuffer.wrap(U.IGNITE_HEADER)); - ClusterNode localNode = getLocalNode(); + ClusterNode locNode = getLocalNode(); - if (localNode == null) + if (locNode == null) throw new IgniteCheckedException("Local node has not been started or " + "fully initialized [isStopping=" + getSpiContext().isStopping() + ']'); if (recovery != null) { - HandshakeMessage msg = new HandshakeMessage(localNode.id(), + HandshakeMessage msg = new HandshakeMessage(locNode.id(), recovery.incrementConnectCount(), - recovery.receivedCount()); + recovery.received()); if (log.isDebugEnabled()) log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/49c29886/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java index 594e3c2..6089795 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.lang.reflect.Field; +import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; @@ -61,6 +62,9 @@ public class GridNioSelfTest extends GridCommonAbstractTest { /** Message count in test without reconnect. */ private static final int MSG_CNT = 2000; + /** */ + private static final int START_PORT = 55443; + /** Message id provider. */ private static final AtomicInteger idProvider = new AtomicInteger(1); @@ -80,13 +84,15 @@ public class GridNioSelfTest extends GridCommonAbstractTest { private static volatile Marshaller marsh; /** Test port. */ - private int port = 55443; + private static int port; /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { getTestResources().startThreads(true); marsh = getTestResources().getMarshaller(); + + port = START_PORT; } /** {@inheritDoc} */ @@ -94,13 +100,6 @@ public class GridNioSelfTest extends GridCommonAbstractTest { getTestResources().stopThreads(); } - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - port++; - } - /** * @throws Exception If failed. */ @@ -127,19 +126,18 @@ public class GridNioSelfTest extends GridCommonAbstractTest { } }; - GridNioServer<?> srvr = startServer(port, new GridPlainParser(), lsnr); + final GridNioServer<?> srvr = startServer(new GridPlainParser(), lsnr); try { IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { - @Override - public void run() { + @Override public void run() { byte[] msg = new byte[MSG_SIZE]; for (int i = 0; i < msg.length; i++) msg[i] = (byte) (i ^ (i * i - 1)); // Some data for (int i = 0; i < RECONNECT_MSG_CNT; i++) - validateSendMessage(msg); + validateSendMessage(srvr.port(), msg); } }, THREAD_CNT); @@ -177,11 +175,11 @@ public class GridNioSelfTest extends GridCommonAbstractTest { } }; - GridNioServer<?> srvr = startServer(port, new GridPlainParser(), lsnr); + GridNioServer<?> srvr = startServer(new GridPlainParser(), lsnr); Socket s = createSocket(); - s.connect(new InetSocketAddress(U.getLocalHost(), port), 1000); + s.connect(new InetSocketAddress(U.getLocalHost(), srvr.port()), 1000); try { byte[] msg = new byte[MSG_SIZE]; @@ -235,12 +233,12 @@ public class GridNioSelfTest extends GridCommonAbstractTest { } }; - GridNioServer<?> srvr = startServer(port, new GridPlainParser(), lsnr); + GridNioServer<?> srvr = startServer(new GridPlainParser(), lsnr); try { Socket s = createSocket(); - s.connect(new InetSocketAddress(U.getLocalHost(), port), 1000); + s.connect(new InetSocketAddress(U.getLocalHost(), srvr.port()), 1000); if (!(s instanceof SSLSocket)) { // These methods are not supported by SSL sockets. @@ -277,7 +275,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { } }; - GridNioServer<?> srvr = startServer(port, new GridPlainParser(), lsnr); + final GridNioServer<?> srvr = startServer(new GridPlainParser(), lsnr); final AtomicLong cnt = new AtomicLong(); @@ -285,8 +283,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { try { IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { - @Override - public void run() { + @Override public void run() { try { byte[] msg = new byte[MSG_SIZE]; @@ -294,7 +291,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { msg[i] = (byte) (i ^ (i * i - 1)); // Some data try (Socket s = createSocket()) { - s.connect(new InetSocketAddress(U.getLocalHost(), port), 1000); + s.connect(new InetSocketAddress(U.getLocalHost(), srvr.port()), 1000); OutputStream out = s.getOutputStream(); @@ -369,12 +366,12 @@ public class GridNioSelfTest extends GridCommonAbstractTest { } }; - GridNioServer<?> srvr = startServer(port, new GridPlainParser(), lsnr); + GridNioServer<?> srvr = startServer(new GridPlainParser(), lsnr); try { Socket s = createSocket(); - s.connect(new InetSocketAddress(U.getLocalHost(), port), 1000); + s.connect(new InetSocketAddress(U.getLocalHost(), srvr.port()), 1000); // This is needed for SSL to begin handshake. s.getOutputStream().write(new byte[1]); @@ -439,16 +436,12 @@ public class GridNioSelfTest extends GridCommonAbstractTest { } }; - GridNioServer.Builder<?> builder = serverBuilder(port, new GridPlainParser(), lsnr); - - GridNioServer<?> srvr = builder.sendQueueLimit(5).build(); - - srvr.start(); + GridNioServer<?> srvr = startServer(new GridPlainParser(), lsnr, 5); try { Socket s = createSocket(); - s.connect(new InetSocketAddress(U.getLocalHost(), port), 1000); + s.connect(new InetSocketAddress(U.getLocalHost(), srvr.port()), 1000); s.getOutputStream().write(new byte[1]); @@ -473,9 +466,10 @@ public class GridNioSelfTest extends GridCommonAbstractTest { /** * Sends message and validates reply. * + * @param port Port. * @param msg Message to send. */ - private void validateSendMessage(byte[] msg) { + private void validateSendMessage(int port, byte[] msg) { try { Socket s = createSocket(); @@ -552,19 +546,54 @@ public class GridNioSelfTest extends GridCommonAbstractTest { /** * Starts server with specified arguments. * - * @param port Port to listen. * @param parser Parser to use. * @param lsnr Listener. * @return Started server. * @throws Exception If failed. */ - protected final GridNioServer<?> startServer(int port, GridNioParser parser, GridNioServerListener lsnr) + protected final GridNioServer<?> startServer(GridNioParser parser, GridNioServerListener lsnr) throws Exception { - GridNioServer<?> srvr = serverBuilder(port, parser, lsnr).build(); + return startServer(parser, lsnr, null); + } + + /** + * Starts server with specified arguments. + * + * @param parser Parser to use. + * @param lsnr Listener. + * @param queueLimit Optional send queue limit. + * @return Started server. + * @throws Exception If failed. + */ + protected final GridNioServer<?> startServer(GridNioParser parser, + GridNioServerListener lsnr, + @Nullable Integer queueLimit) throws Exception { + for (int i = 0; i < 10; i++) { + try { + int srvPort = port++; + + GridNioServer.Builder<?> builder = serverBuilder(srvPort, parser, lsnr); + + if (queueLimit != null) + builder.sendQueueLimit(queueLimit); + + GridNioServer<?> srvr = builder.build(); + + srvr.start(); + + return srvr; + } + catch (IgniteCheckedException e) { + if (i < 9 && e.hasCause(BindException.class)) + log.error("Failed to start server, will try another port: " + e); + else + throw e; + } + } - srvr.start(); + fail("Failed to start server."); - return srvr; + return null; } /** @@ -604,13 +633,13 @@ public class GridNioSelfTest extends GridCommonAbstractTest { NioListener lsnr = new NioListener(latch); - GridNioServer<?> srvr = startServer(port, new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr); + GridNioServer<?> srvr = startServer(new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr); TestClient client = null; try { for (int i = 0; i < 5; i++) { - client = createClient(U.getLocalHost(), port, U.getLocalHost()); + client = createClient(U.getLocalHost(), srvr.port(), U.getLocalHost()); client.sendMessage(createMessage(), MSG_SIZE); client.sendMessage(createMessage(), MSG_SIZE); @@ -638,13 +667,13 @@ public class GridNioSelfTest extends GridCommonAbstractTest { NioListener lsnr = new NioListener(latch); - GridNioServer<?> srvr1 = startServer(port, new BufferedParser(false), lsnr); - GridNioServer<?> srvr2 = startServer(port + 1, new BufferedParser(false), lsnr); + GridNioServer<?> srvr1 = startServer(new BufferedParser(false), lsnr); + GridNioServer<?> srvr2 = startServer(new BufferedParser(false), lsnr); GridNioSession ses = null; try { - SocketChannel ch = SocketChannel.open(new InetSocketAddress(U.getLocalHost(), port + 1)); + SocketChannel ch = SocketChannel.open(new InetSocketAddress(U.getLocalHost(), srvr2.port())); GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null); @@ -676,7 +705,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { NioListener lsnr = new NioListener(latch); - GridNioServer<?> srvr = startServer(port, new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr); + final GridNioServer<?> srvr = startServer(new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr); try { final byte[] data = createMessage(); @@ -686,7 +715,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { TestClient client = null; try { - client = createClient(U.getLocalHost(), port, U.getLocalHost()); + client = createClient(U.getLocalHost(), srvr.port(), U.getLocalHost()); for (int i = 0; i < MSG_CNT; i++) client.sendMessage(data, data.length); @@ -722,7 +751,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { final AtomicReference<Exception> err = new AtomicReference<>(); - GridNioServer<?> srvr = startServer(port, new GridBufferedParser(true, ByteOrder.nativeOrder()), + final GridNioServer<?> srvr = startServer(new GridBufferedParser(true, ByteOrder.nativeOrder()), new EchoListener()); try { @@ -734,7 +763,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { TestClient client = null; try { - client = createClient(U.getLocalHost(), port, U.getLocalHost()); + client = createClient(U.getLocalHost(), srvr.port(), U.getLocalHost()); MessageWithId msg = new MessageWithId(idProvider.getAndIncrement()); @@ -827,7 +856,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { final AtomicLong cntr = new AtomicLong(); - GridNioServer<?> srvr = startServer(port, new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr); + final GridNioServer<?> srvr = startServer(new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr); try { multithreaded(new Runnable() { @@ -835,7 +864,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { TestClient client = null; try { - client = createClient(U.getLocalHost(), port, U.getLocalHost()); + client = createClient(U.getLocalHost(), srvr.port(), U.getLocalHost()); while (cntr.getAndIncrement() < MSG_CNT * THREAD_CNT) { MessageWithId msg = new MessageWithId(idProvider.getAndIncrement()); @@ -908,14 +937,14 @@ public class GridNioSelfTest extends GridCommonAbstractTest { } }; - GridNioServer<?> srvr = startServer(port, new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr); + final GridNioServer<?> srvr = startServer(new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr); srvr.idleTimeout(1000); try { multithreaded(new Runnable() { @Override public void run() { - try (TestClient ignored = createClient(U.getLocalHost(), port, U.getLocalHost())) { + try (TestClient ignored = createClient(U.getLocalHost(), srvr.port(), U.getLocalHost())) { info("Before sleep."); U.sleep(4000); @@ -976,7 +1005,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { } }; - GridNioServer<?> srvr = startServer(port, new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr); + final GridNioServer<?> srvr = startServer(new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr); // Set flag using reflection. Field f = srvr.getClass().getDeclaredField("skipWrite"); @@ -990,7 +1019,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { try { multithreaded(new Runnable() { @Override public void run() { - try (TestClient ignored = createClient(U.getLocalHost(), port, U.getLocalHost())) { + try (TestClient ignored = createClient(U.getLocalHost(), srvr.port(), U.getLocalHost())) { info("Before sleep."); U.sleep(4000); http://git-wip-us.apache.org/repos/asf/ignite/blob/49c29886/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index 9e78fb9..d07a1e6 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -149,6 +149,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS int expMsgs = 0; + long totAcked = 0; + for (int i = 0; i < 5; i++) { info("Iteration: " + i); @@ -160,6 +162,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS expMsgs += msgPerIter; + final long totAcked0 = totAcked; + for (TcpCommunicationSpi spi : spis) { GridNioServer srv = U.field(spi, "nioSrvr"); @@ -177,6 +181,14 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { + long acked = GridTestUtils.getFieldValue(recoveryDesc, "acked"); + + return acked > totAcked0; + } + }, 5000); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { return recoveryDesc.messagesFutures().isEmpty(); } }, 10_000); @@ -204,6 +216,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS assertEquals(expMsgs, lsnr.rcvCnt.get()); } + + totAcked += msgPerIter; } } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/49c29886/modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/GridUriDeploymentFileProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/GridUriDeploymentFileProcessorSelfTest.java b/modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/GridUriDeploymentFileProcessorSelfTest.java index b87551d..cbcac9c 100644 --- a/modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/GridUriDeploymentFileProcessorSelfTest.java +++ b/modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/GridUriDeploymentFileProcessorSelfTest.java @@ -20,7 +20,11 @@ package org.apache.ignite.spi.deployment.uri; import java.io.File; import java.util.ArrayList; import java.util.List; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.lang.GridAbsPredicateX; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.config.GridTestProperties; import org.apache.ignite.testframework.junits.spi.GridSpiTest; import org.apache.ignite.testframework.junits.spi.GridSpiTestConfig; @@ -69,7 +73,7 @@ public class GridUriDeploymentFileProcessorSelfTest extends GridUriDeploymentAbs * if {@code false} then it should be undeployed. * @throws Exception If failed. */ - private void proceedTest(String garFileName, String garDescFileName, String taskId, boolean deployed) + private void proceedTest(String garFileName, String garDescFileName, final String taskId, final boolean deployed) throws Exception { info("This test checks broken tasks. All exceptions that might happen are the part of the test."); @@ -123,10 +127,17 @@ public class GridUriDeploymentFileProcessorSelfTest extends GridUriDeploymentAbs // Copy to deployment directory. U.copy(garFile, destDir, true); - // Wait for SPI - Thread.sleep(1000); - try { + // Wait for SPI + GridTestUtils.waitForCondition(new GridAbsPredicateX() { + @Override public boolean applyx() throws IgniteCheckedException { + if (deployed) + return getSpi().findResource(taskId) != null; + else + return getSpi().findResource(taskId) == null; + } + }, 5000); + if (deployed) assert getSpi().findResource(taskId) != null; else