Repository: ignite Updated Branches: refs/heads/ignite-1537 11dbec92a -> a48bf1f2b
ignite-1.5 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a48bf1f2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a48bf1f2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a48bf1f2 Branch: refs/heads/ignite-1537 Commit: a48bf1f2b595e6edc9dbb955d68a56071ec79911 Parents: 11dbec9 Author: sboikov <sboi...@gridgain.com> Authored: Tue Dec 15 11:27:08 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Dec 15 11:27:08 2015 +0300 ---------------------------------------------------------------------- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 30 ++++++++++----- ...CommunicationRecoveryAckClosureSelfTest.java | 39 +++++++++++++------- 2 files changed, 47 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a48bf1f2/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index a709cc4..38e3d98 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiAdapter; +import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.CommunicationListener; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.GridTestMessage; @@ -133,6 +134,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS * @param msgPerIter Messages per iteration. * @throws Exception If failed. */ + @SuppressWarnings("unchecked") private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception { createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT); @@ -196,8 +198,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS final TestListener lsnr = (TestListener)spi.getListener(); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { return lsnr.rcvCnt.get() >= expMsgs0; } }, 5000); @@ -247,6 +248,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS /** * @throws Exception If failed. */ + @SuppressWarnings("unchecked") private void checkOverflow() throws Exception { TcpCommunicationSpi spi0 = spis.get(0); TcpCommunicationSpi spi1 = spis.get(1); @@ -266,8 +268,20 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS final GridNioSession ses0 = communicationSession(spi0); - for (int i = 0; i < 150; i++) - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + int sentMsgs = 1; + + for (int i = 0; i < 150; i++) { + try { + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + + sentMsgs++; + } + catch (IgniteSpiException e) { + log.info("Send error [err=" + e + ", sentMsgs=" + sentMsgs + ']'); + + break; + } + } // Wait when session is closed because of queue overflow. GridTestUtils.waitForCondition(new GridAbsPredicate() { @@ -283,13 +297,12 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS for (int i = 0; i < 100; i++) spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0)); - final int expMsgs = 251; + final int expMsgs = sentMsgs + 100; final TestListener lsnr = (TestListener)spi1.getListener(); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { return lsnr.rcvCnt.get() >= expMsgs; } }, 5000); @@ -307,8 +320,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS final GridNioServer srv = U.field(spi, "nioSrvr"); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); return !sessions.isEmpty(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a48bf1f2/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java index fd2d91a..7521f2e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java @@ -40,6 +40,7 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiAdapter; +import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.CommunicationListener; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.GridTestMessage; @@ -135,6 +136,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic * @param msgPerIter Messages per iteration. * @throws Exception If failed. */ + @SuppressWarnings("unchecked") private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception { createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT); @@ -154,7 +156,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic final AtomicInteger ackMsgs = new AtomicInteger(0); - IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() { + IgniteInClosure<IgniteException> ackC = new CI1<IgniteException>() { @Override public void apply(IgniteException o) { assert o == null; @@ -163,9 +165,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic }; for (int j = 0; j < msgPerIter; j++) { - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC); - spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure); + spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackC); } expMsgs += msgPerIter; @@ -207,8 +209,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic final TestListener lsnr = (TestListener)spi.getListener(); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { return lsnr.rcvCnt.get() >= expMsgs0; } }, 5000); @@ -260,6 +261,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic /** * @throws Exception If failed. */ + @SuppressWarnings("unchecked") private void checkOverflow() throws Exception { TcpCommunicationSpi spi0 = spis.get(0); TcpCommunicationSpi spi1 = spis.get(1); @@ -271,7 +273,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic final AtomicInteger ackMsgs = new AtomicInteger(0); - IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() { + IgniteInClosure<IgniteException> ackC = new CI1<IgniteException>() { @Override public void apply(IgniteException o) { assert o == null; @@ -282,15 +284,27 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic int msgId = 0; // Send message to establish connection. - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC); // Prevent node1 from send GridTestUtils.setFieldValue(srv1, "skipWrite", true); final GridNioSession ses0 = communicationSession(spi0); - for (int i = 0; i < 150; i++) - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + int sentMsgs = 1; + + for (int i = 0; i < 150; i++) { + try { + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC); + + sentMsgs++; + } + catch (IgniteSpiException e) { + log.info("Send error [err=" + e + ", sentMsgs=" + sentMsgs + ']'); + + break; + } + } // Wait when session is closed because of queue overflow. GridTestUtils.waitForCondition(new GridAbsPredicate() { @@ -304,9 +318,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic GridTestUtils.setFieldValue(srv1, "skipWrite", false); for (int i = 0; i < 100; i++) - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC); - final int expMsgs = 251; + final int expMsgs = sentMsgs + 100; final TestListener lsnr = (TestListener)spi1.getListener(); @@ -335,8 +349,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic final GridNioServer srv = U.field(spi, "nioSrvr"); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); return !sessions.isEmpty();