Repository: incubator-ignite Updated Branches: refs/heads/ignite-1169 e7bd078b4 -> 381773897
IGNITE-1169 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/38177389 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/38177389 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/38177389 Branch: refs/heads/ignite-1169 Commit: 38177389733707cd4e53026b08165b1772e66d97 Parents: e7bd078 Author: nikolay_tikhonov <[email protected]> Authored: Fri Jul 31 10:01:34 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Fri Jul 31 10:01:34 2015 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 27 ++++++++++---------- .../util/nio/GridCommunicationClient.java | 2 +- .../util/nio/GridNioFinishedFuture.java | 5 ++-- .../ignite/internal/util/nio/GridNioFuture.java | 5 ++-- .../internal/util/nio/GridNioFutureImpl.java | 7 ++--- .../util/nio/GridNioRecoveryDescriptor.java | 2 +- .../ignite/internal/util/nio/GridNioServer.java | 2 +- .../util/nio/GridShmemCommunicationClient.java | 5 +++- .../util/nio/GridTcpNioCommunicationClient.java | 2 +- .../communication/tcp/TcpCommunicationSpi.java | 8 +++--- ...mmunicationSpiRecoveryAckFutureSelfTest.java | 19 +++++++------- 11 files changed, 44 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 272950e..33e8c1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -987,7 +987,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa boolean ordered, long timeout, boolean skipOnTimeout, - IgniteInClosure<Exception> ackClosure + IgniteInClosure<IgniteException> ackClosure ) throws IgniteCheckedException { assert node != null; assert topic != null; @@ -1017,7 +1017,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa try { if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi) - ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessageWithAck(node, ioMsg, ackClosure); + ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackClosure); else getSpi().sendMessage(node, ioMsg); } @@ -1161,11 +1161,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topic Topic to send the message to. * @param msg Message to send. * @param plc Type of processing. + * @param ackClosure Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void sendWithAck(ClusterNode node, Object topic, Message msg, byte plc) + public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false, null); + send(node, topic, -1, msg, plc, false, 0, false, ackClosure); } /** @@ -1188,8 +1189,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param ackClosure Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void sendWithAck(ClusterNode node, GridTopic topic, Message msg, byte plc, - IgniteInClosure<Exception> ackClosure) throws IgniteCheckedException { + public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, + IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException { send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackClosure); } @@ -1225,14 +1226,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param ackClosure Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void sendOrderedMessageWithAck( + public void sendOrderedMessage( ClusterNode node, Object topic, Message msg, byte plc, long timeout, boolean skipOnTimeout, - IgniteInClosure<Exception> ackClosure + IgniteInClosure<IgniteException> ackClosure ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; @@ -1246,7 +1247,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param plc Type of processing. * @param timeout Timeout to keep a message on receiving queue. * @param skipOnTimeout Whether message can be skipped on timeout. - * @param ackClosure Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ public void sendOrderedMessage( @@ -1255,8 +1255,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa Message msg, byte plc, long timeout, - boolean skipOnTimeout, - IgniteInClosure<Exception> ackClosure + boolean skipOnTimeout ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; @@ -1265,7 +1264,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); } /** @@ -1278,14 +1277,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param ackClosure Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void sendOrderedMessageWithAck( + public void sendOrderedMessage( UUID nodeId, Object topic, Message msg, byte plc, long timeout, boolean skipOnTimeout, - IgniteInClosure<Exception> ackClosure + IgniteInClosure<IgniteException> ackClosure ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java index 336aab9..1a26ad5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java @@ -99,7 +99,7 @@ public interface GridCommunicationClient { * @throws IgniteCheckedException If failed. * @return {@code True} if should try to resend message. */ - public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<Exception> closure) + public boolean sendMessage(@Nullable UUID nodeId, Message msg, @Nullable IgniteInClosure<IgniteException> closure) throws IgniteCheckedException; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java index 21cf17c..aac238a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.util.nio; +import org.apache.ignite.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -58,12 +59,12 @@ public class GridNioFinishedFuture<R> extends GridFinishedFuture<R> implements G } /** {@inheritDoc} */ - @Override public void ackClosure(IgniteInClosure<Exception> closure) { + @Override public void ackClosure(IgniteInClosure<IgniteException> closure) { // No-op. } /** {@inheritDoc} */ - @Override public IgniteInClosure<Exception> ackClosure() { + @Override public IgniteInClosure<IgniteException> ackClosure() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java index 2b77089..5a884f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.util.nio; +import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.lang.*; @@ -46,10 +47,10 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> { * * @param closure Ack closure. */ - public void ackClosure(IgniteInClosure<Exception> closure); + public void ackClosure(IgniteInClosure<IgniteException> closure); /** * @return Ack closure. */ - public IgniteInClosure<Exception> ackClosure(); + public IgniteInClosure<IgniteException> ackClosure(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java index 847b7d6..e71bf92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.util.nio; +import org.apache.ignite.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -32,7 +33,7 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi protected boolean msgThread; /** */ - protected IgniteInClosure<Exception> ackClosure; + protected IgniteInClosure<IgniteException> ackClosure; /** {@inheritDoc} */ @Override public void messageThread(boolean msgThread) { @@ -50,12 +51,12 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi } /** {@inheritDoc} */ - @Override public void ackClosure(IgniteInClosure<Exception> closure) { + @Override public void ackClosure(IgniteInClosure<IgniteException> closure) { ackClosure = closure; } /** {@inheritDoc} */ - @Override public IgniteInClosure<Exception> ackClosure() { + @Override public IgniteInClosure<IgniteException> ackClosure() { return ackClosure; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index e528361..a21600b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -368,7 +368,7 @@ public class GridNioRecoveryDescriptor { ((GridNioFutureImpl)msg).onDone(e); if (msg.ackClosure() != null) - msg.ackClosure().apply(e); + msg.ackClosure().apply(new IgniteException(e)); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index f4a27fa..5c4916e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -392,7 +392,7 @@ public class GridNioServer<T> { int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut); - IgniteInClosure<Exception> ackClosure; + IgniteInClosure<IgniteException> ackClosure; if (!sys && (ackClosure = ses.removeMeta(ACK_CLOSURE.ordinal())) != null) fut.ackClosure(ackClosure); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index 9cf87c6..67d4664 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -115,7 +115,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien /** {@inheritDoc} */ @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg, - IgniteInClosure<Exception> closure) + IgniteInClosure<IgniteException> closure) throws IgniteCheckedException { if (closed()) throw new IgniteCheckedException("Communication client was closed: " + this); @@ -133,6 +133,9 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien markUsed(); + if (closure != null) + closure.apply(null); + return false; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index 4122e48..7933001 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -100,7 +100,7 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie } /** {@inheritDoc} */ - @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<Exception> closure) + @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> closure) throws IgniteCheckedException { // Node ID is never provided in asynchronous send mode. assert nodeId == null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index b055eff..4cb59fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1697,7 +1697,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { - sendMessage(node, msg, null); + sendMessage0(node, msg, null); } /** @@ -1712,9 +1712,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * Note that this is not guaranteed that failed communication will result * in thrown exception as this is dependant on SPI implementation. */ - public void sendMessageWithAck(ClusterNode node, Message msg, IgniteInClosure<Exception> ackClosure) + public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { - sendMessage(node, msg, ackClosure); + sendMessage0(node, msg, ackClosure); } /** @@ -1725,7 +1725,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * Note that this is not guaranteed that failed communication will result * in thrown exception as this is dependant on SPI implementation. */ - private void sendMessage(ClusterNode node, Message msg, IgniteInClosure<Exception> ackClosure) + private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { assert node != null; assert msg != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java index 3f788ba..c082b4f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.spi.communication.tcp; import org.apache.ignite.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.nio.*; @@ -143,8 +142,8 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic final AtomicInteger ackMsgs = new AtomicInteger(0); - IgniteInClosure<Exception> ackClosure = new CI1<Exception>() { - @Override public void apply(Exception o) { + IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() { + @Override public void apply(IgniteException o) { assert o == null; ackMsgs.incrementAndGet(); @@ -152,9 +151,9 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic }; for (int j = 0; j < msgPerIter; j++) { - spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); - spi1.sendMessageWithAck(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure); + spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure); } expMsgs += msgPerIter; @@ -260,8 +259,8 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic final AtomicInteger ackMsgs = new AtomicInteger(0); - IgniteInClosure<Exception> ackClosure = new CI1<Exception>() { - @Override public void apply(Exception o) { + IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() { + @Override public void apply(IgniteException o) { assert o == null; ackMsgs.incrementAndGet(); @@ -271,7 +270,7 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic int msgId = 0; // Send message to establish connection. - spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); // Prevent node1 from send GridTestUtils.setFieldValue(srv1, "skipWrite", true); @@ -279,7 +278,7 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic final GridNioSession ses0 = communicationSession(spi0); for (int i = 0; i < 150; i++) - spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); // Wait when session is closed because of queue overflow. GridTestUtils.waitForCondition(new GridAbsPredicate() { @@ -293,7 +292,7 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic GridTestUtils.setFieldValue(srv1, "skipWrite", false); for (int i = 0; i < 100; i++) - spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); final int expMsgs = 251;
