Repository: ignite Updated Branches: refs/heads/master 2cfd55dcf -> e8f8e0acf
ignite-3727 Support local listeners async execution for IgniteMessage.send Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e8f8e0ac Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e8f8e0ac Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e8f8e0ac Branch: refs/heads/master Commit: e8f8e0acf254133dd978d72adb73c5362752f706 Parents: 2cfd55d Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Authored: Wed Feb 15 13:51:33 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Feb 15 13:52:22 2017 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteMessaging.java | 11 +- .../ignite/internal/IgniteMessagingImpl.java | 6 +- .../internal/managers/GridManagerAdapter.java | 2 +- .../managers/communication/GridIoManager.java | 55 +- .../communication/GridIoManagerSelfTest.java | 6 +- ...niteMessagingConfigVariationFullApiTest.java | 195 +++++-- .../ignite/messaging/GridMessagingSelfTest.java | 114 +++- .../messaging/IgniteMessagingSendAsyncTest.java | 544 +++++++++++++++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 2 + .../hadoop/shuffle/HadoopShuffle.java | 2 +- 10 files changed, 865 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java index ab554af..e64ded5 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java @@ -77,6 +77,10 @@ public interface IgniteMessaging extends IgniteAsyncSupport { /** * Sends given message with specified topic to the nodes in the underlying cluster group. + * <p> + * By default all local listeners will be executed in the calling thread, or if you use + * {@link #withAsync()}, listeners will execute in public thread pool (in this case it is user's + * responsibility to implement back-pressure and limit number of concurrently executed async messages). * * @param topic Topic to send to, {@code null} for default topic. * @param msg Message to send. @@ -87,6 +91,10 @@ public interface IgniteMessaging extends IgniteAsyncSupport { /** * Sends given messages with the specified topic to the nodes in the underlying cluster group. + * <p> + * By default all local listeners will be executed in the calling thread, or if you use + * {@link #withAsync()}, listeners will execute in public thread pool (in this case it is user's + * responsibility to implement back-pressure and limit number of concurrently executed async messages). * * @param topic Topic to send to, {@code null} for default topic. * @param msgs Messages to send. Order of the sending is undefined. If the method produces @@ -99,7 +107,8 @@ public interface IgniteMessaging extends IgniteAsyncSupport { /** * Sends given message with specified topic to the nodes in the underlying cluster group. Messages sent with * this method will arrive in the same order they were sent. Note that if a topic is used - * for ordered messages, then it cannot be reused for non-ordered messages. + * for ordered messages, then it cannot be reused for non-ordered messages. Note that local listeners + * are always executed in public thread pool, no matter default or {@link #withAsync()} mode is used. * <p> * The {@code timeout} parameter specifies how long an out-of-order message will stay in a queue, * waiting for messages that are ordered ahead of it to arrive. If timeout expires, then all ordered http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java index 2800777..541fad4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java @@ -86,7 +86,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> if (snapshot.isEmpty()) throw U.emptyTopologyException(); - ctx.io().sendUserMessage(snapshot, msg, topic, false, 0); + ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync()); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -111,7 +111,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> for (Object msg : msgs) { A.notNull(msg, "msg"); - ctx.io().sendUserMessage(snapshot, msg, topic, false, 0); + ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync()); } } catch (IgniteCheckedException e) { @@ -137,7 +137,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> if (timeout == 0) timeout = ctx.config().getNetworkTimeout(); - ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout); + ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout, false); } catch (IgniteCheckedException e) { throw U.convertException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 584cc56..5992eda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -390,7 +390,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan if (msg instanceof Message) ctx.io().send(node, topic, (Message)msg, SYSTEM_POOL); else - ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0); + ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0, false); } catch (IgniteCheckedException e) { throw unwrapException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 7ef7bc0..84b4543 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -785,7 +785,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa finally { threadProcessingMessage(false); - msgC.run(); + if (msgC != null) + msgC.run(); } } @@ -1237,6 +1238,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param timeout Timeout. * @param skipOnTimeout Whether message can be skipped on timeout. * @param ackC Ack closure. + * @param async If {@code true} message for local node will be processed in pool, otherwise in current thread. * @throws IgniteCheckedException Thrown in case of any errors. */ private void send( @@ -1248,7 +1250,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa boolean ordered, long timeout, boolean skipOnTimeout, - IgniteInClosure<IgniteException> ackC + IgniteInClosure<IgniteException> ackC, + boolean async ) throws IgniteCheckedException { assert node != null; assert topic != null; @@ -1266,6 +1269,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (ordered) processOrderedMessage(locNodeId, ioMsg, plc, null); + else if (async) { + assert msg instanceof GridIoUserMessage : ioMsg; // Async execution was added only for IgniteMessaging. + + processRegularMessage(locNodeId, ioMsg, plc, null); + } else processRegularMessage0(ioMsg, locNodeId); @@ -1323,7 +1331,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false); } /** @@ -1335,7 +1343,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, Object topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false, null); + send(node, topic, -1, msg, plc, false, 0, false, null, false); } /** @@ -1343,11 +1351,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topic Topic to send the message to. * @param msg Message to send. * @param plc Type of processing. + * @param async Async flag. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void send(ClusterNode node, GridTopic topic, Message msg, byte plc) + public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, async); } /** @@ -1360,7 +1369,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, topicOrd, msg, plc, false, 0, false, null); + send(node, topic, topicOrd, msg, plc, false, 0, false, null, false); } /** @@ -1382,7 +1391,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false); } /** @@ -1409,7 +1418,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false); } /** @@ -1422,7 +1431,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false); } /** @@ -1458,7 +1467,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false, ackC); + send(node, topic, -1, msg, plc, false, 0, false, ackC, false); } /** @@ -1514,10 +1523,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false); } - /** + /** * Sends a peer deployable user message. * * @param nodes Destination nodes. @@ -1525,7 +1534,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @throws IgniteCheckedException Thrown in case of any errors. */ public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException { - sendUserMessage(nodes, msg, null, false, 0); + sendUserMessage(nodes, msg, null, false, 0, false); } /** @@ -1536,11 +1545,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topic Message topic to use. * @param ordered Is message ordered? * @param timeout Message timeout in milliseconds for ordered messages. + * @param async Async flag. * @throws IgniteCheckedException Thrown in case of any errors. */ @SuppressWarnings("ConstantConditions") public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg, - @Nullable Object topic, boolean ordered, long timeout) throws IgniteCheckedException { + @Nullable Object topic, boolean ordered, long timeout, boolean async) throws IgniteCheckedException { boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId); byte[] serMsg = null; @@ -1585,7 +1595,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (ordered) sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true); else if (loc) - send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL); + send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async); else { ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId)); @@ -1594,10 +1604,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (!rmtNodes.isEmpty()) send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL); - // Will call local listeners in current thread synchronously, so must go the last + // Will call local listeners in current thread synchronously or through pool, + // depending async flag, so must go the last // to allow remote nodes execute the requested operation in parallel. if (locNode != null) - send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL); + send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async); } } @@ -1664,7 +1675,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false); } /** @@ -1701,7 +1712,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa // messages to one node vs. many. if (!nodes.isEmpty()) { for (ClusterNode node : nodes) - send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null); + send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null, false); } else if (log.isDebugEnabled()) log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" + @@ -1929,8 +1940,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (rmv && log.isDebugEnabled()) log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']'); - if (lsnr instanceof ArrayListener) - { + if (lsnr instanceof ArrayListener) { for (GridMessageListener childLsnr : ((ArrayListener)lsnr).arr) closeListener(childLsnr); } @@ -1942,6 +1952,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** * Closes a listener, if applicable. + * * @param lsnr Listener. */ private void closeListener(GridMessageListener lsnr) { http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java index c2cfa76..f5499d3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java @@ -145,7 +145,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest { GridIoManager ioMgr = spy(new TestGridIoManager(ctx)); try { - ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, false, 123L); + ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, false, 123L, false); } catch (IgniteCheckedException ignored) { // No-op. We are using mocks so real sending is impossible. @@ -169,7 +169,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest { GridIoManager ioMgr = spy(new TestGridIoManager(ctx)); try { - ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, true, 123L); + ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, true, 123L, false); } catch (Exception ignored) { // No-op. We are using mocks so real sending is impossible. @@ -196,7 +196,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc) + @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async) throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java index 31b0663..49aab10 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java @@ -58,7 +58,18 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria public void testLocalServer() throws Exception { runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - localServerInternal(); + localServerInternal(false); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testLocalServerAsync() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + localServerInternal(true); } }); } @@ -83,7 +94,21 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - serverClientMessage(); + serverClientMessage(false); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testServerClientMessageAsync() throws Exception { + if (!testsCfg.withClients()) + return; + + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + serverClientMessage(true); } }); } @@ -97,7 +122,21 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientClientMessage(); + clientClientMessage(false); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testClientClientMessageAsync() throws Exception { + if (!testsCfg.withClients()) + return; + + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + clientClientMessage(true); } }); } @@ -111,7 +150,21 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientServerMessage(); + clientServerMessage(false); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testClientServerMessageAsync() throws Exception { + if (!testsCfg.withClients()) + return; + + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + clientServerMessage(true); } }); } @@ -133,7 +186,18 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria public void testOrderedMessage() throws Exception { runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - orderedMessage(); + orderedMessage(false); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testOrderedMessageAsync() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + orderedMessage(true); } }); } @@ -147,7 +211,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientServerOrderedMessage(); + clientServerOrderedMessage(false); } }); } @@ -155,13 +219,42 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria /** * @throws Exception If failed. */ + public void testClientServerOrderedMessageAsync() throws Exception { + if (!testsCfg.withClients()) + return; + + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + clientServerOrderedMessage(true); + } + }); + } + + + /** + * @throws Exception If failed. + */ public void testClientClientOrderedMessage() throws Exception { if (!testsCfg.withClients()) return; runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientClientOrderedMessage(); + clientClientOrderedMessage(false); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testClientClientOrderedMessageAsync() throws Exception { + if (!testsCfg.withClients()) + return; + + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + clientClientOrderedMessage(true); } }); } @@ -175,16 +268,32 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - serverClientOrderedMessage(); + serverClientOrderedMessage(false); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testServerClientOrderedMessageAsync() throws Exception { + if (!testsCfg.withClients()) + return; + + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + serverClientOrderedMessage(true); } }); } /** * Single server test. + * + * @param async Async message send flag. * @throws Exception If failed. */ - private void localServerInternal() throws Exception { + private void localServerInternal(boolean async) throws Exception { int messages = MSGS; Ignite ignite = grid(SERVER_NODE_IDX); @@ -197,7 +306,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria try { for (int i = 0; i < messages; i++) - sendMessage(ignite, grp, value(i)); + sendMessage(ignite, grp, value(i), async); assertTrue(LATCH.await(10, TimeUnit.SECONDS)); @@ -238,52 +347,59 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria /** * Server sends a message and client receives it. + * + * @param async Async message send flag. * @throws Exception If failed. */ - private void serverClientMessage() throws Exception { + private void serverClientMessage(boolean async) throws Exception { Ignite ignite = grid(SERVER_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendMessages(ignite, grp); + registerListenerAndSendMessages(ignite, grp, async); } /** * Client sends a message and client receives it. + * + * @param async Async message send flag. * @throws Exception If failed. */ - private void clientClientMessage() throws Exception { + private void clientClientMessage(boolean async) throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendMessages(ignite, grp); + registerListenerAndSendMessages(ignite, grp, async); } /** * Client sends a message and client receives it. + * + * @param async Async message send flag. * @throws Exception If failed. */ - private void clientServerMessage() throws Exception { + private void clientServerMessage(boolean async) throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forServers(); assert grp.nodes().size() > 0; - registerListenerAndSendMessages(ignite, grp); + registerListenerAndSendMessages(ignite, grp, async); } /** * @param ignite Ignite. * @param grp Cluster group. + * @param async Async message send flag. * @throws Exception If fail. */ - private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp) throws Exception { + private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception { int messages = MSGS; LATCH = new CountDownLatch(grp.nodes().size() * messages); @@ -292,7 +408,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria try { for (int i = 0; i < messages; i++) - sendMessage(ignite, grp, value(i)); + sendMessage(ignite, grp, value(i), async); assertTrue(LATCH.await(10, TimeUnit.SECONDS)); @@ -335,67 +451,68 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria } /** - * + * @param async Async message send flag. * @throws Exception If fail. */ - private void orderedMessage() throws Exception { + private void orderedMessage(boolean async) throws Exception { Ignite ignite = grid(SERVER_NODE_IDX); ClusterGroup grp = gridCount() > 1 ? ignite.cluster().forRemotes() : ignite.cluster().forLocal(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp); + registerListenerAndSendOrderedMessages(ignite, grp, async); } /** - * + * @param async Async message send flag. * @throws Exception If fail. */ - private void clientServerOrderedMessage() throws Exception { + private void clientServerOrderedMessage(boolean async) throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forServers(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp); + registerListenerAndSendOrderedMessages(ignite, grp, async); } /** - * + * @param async Async message send flag. * @throws Exception If fail. */ - private void clientClientOrderedMessage() throws Exception { + private void clientClientOrderedMessage(boolean async) throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp); + registerListenerAndSendOrderedMessages(ignite, grp, async); } /** - * + * @param async Async message send flag. * @throws Exception If fail. */ - private void serverClientOrderedMessage() throws Exception { + private void serverClientOrderedMessage(boolean async) throws Exception { Ignite ignite = grid(SERVER_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp); + registerListenerAndSendOrderedMessages(ignite, grp, async); } /** * @param ignite Ignite. * @param grp Cluster group. + * @param async Async message send flag. * @throws Exception If fail. */ - private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp) throws Exception { + private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception { int messages = MSGS; LATCH = new CountDownLatch(grp.nodes().size() * messages); @@ -403,8 +520,12 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria UUID opId = ignite.message(grp).remoteListen(MESSAGE_TOPIC, new OrderedMessageListener()); try { - for (int i=0; i < messages; i++) - ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000); + for (int i=0; i < messages; i++){ + if (async) + ignite.message(grp).withAsync().sendOrdered(MESSAGE_TOPIC, value(i), 2000); + else + ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000); + } assertTrue(LATCH.await(10, TimeUnit.SECONDS)); @@ -419,9 +540,13 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria * @param nodeSnd Sender Ignite node. * @param grp Cluster group. * @param msg Message. + * @param async Async message send flag. */ - private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg) { - nodeSnd.message(grp).send(MESSAGE_TOPIC, msg); + private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg, boolean async) { + if (async) + nodeSnd.message(grp).withAsync().send(MESSAGE_TOPIC, msg); + else + nodeSnd.message(grp).send(MESSAGE_TOPIC, msg); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java index e796eb5..a166c3d 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java @@ -24,6 +24,7 @@ import java.io.ObjectOutput; import java.io.Serializable; import java.net.URL; import java.net.URLClassLoader; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -36,15 +37,20 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage; +import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.IgniteInstanceResource;; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -198,7 +204,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi(); discoSpi.setIpFinder(ipFinder); @@ -944,7 +950,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser * @throws Exception If error occurs. */ public void testSendMessageWithExternalClassLoader() throws Exception { - URL[] urls = new URL[] { new URL(GridTestProperties.getProperty("p2p.uri.cls")) }; + URL[] urls = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))}; ClassLoader extLdr = new URLClassLoader(urls); @@ -1028,6 +1034,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser public void testAsync() throws Exception { final AtomicInteger msgCnt = new AtomicInteger(); + TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi(); + assertFalse(ignite2.message().isAsync()); final IgniteMessaging msg = ignite2.message().withAsync(); @@ -1044,6 +1052,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } }, IllegalStateException.class, null); + discoSpi.blockCustomEvent(); + final String topic = "topic"; UUID id = msg.remoteListen(topic, new P2<UUID, Object>() { @@ -1059,9 +1069,15 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser Assert.assertNull(id); - IgniteFuture<UUID> fut = msg.future(); + IgniteFuture<UUID> starFut = msg.future(); + + Assert.assertNotNull(starFut); + + U.sleep(500); - Assert.assertNotNull(fut); + Assert.assertFalse(starFut.isDone()); + + discoSpi.stopBlock(); GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { @@ -1071,10 +1087,14 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } }, IllegalStateException.class, null); - id = fut.get(); + id = starFut.get(); Assert.assertNotNull(id); + Assert.assertTrue(starFut.isDone()); + + discoSpi.blockCustomEvent(); + message(ignite1.cluster().forRemotes()).send(topic, "msg1"); GridTestUtils.waitForCondition(new PA() { @@ -1099,8 +1119,16 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } }, IllegalStateException.class, null); + U.sleep(500); + + Assert.assertFalse(stopFut.isDone()); + + discoSpi.stopBlock(); + stopFut.get(); + Assert.assertTrue(stopFut.isDone()); + message(ignite1.cluster().forRemotes()).send(topic, "msg2"); U.sleep(1000); @@ -1109,6 +1137,80 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } /** + * + */ + static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + private boolean blockCustomEvt; + + /** */ + private final Object mux = new Object(); + + /** */ + private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>(); + + /** {@inheritDoc} */ + @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + synchronized (mux) { + if (blockCustomEvt) { + DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate"); + if (msg0 instanceof StopRoutineDiscoveryMessage || msg0 instanceof StartRoutineDiscoveryMessage) { + log.info("Block custom message: " + msg0); + blockedMsgs.add(msg); + + mux.notifyAll(); + } + return; + } + } + + super.sendCustomEvent(msg); + } + + /** + * + */ + public void blockCustomEvent() { + synchronized (mux) { + assert blockedMsgs.isEmpty() : blockedMsgs; + + blockCustomEvt = true; + } + } + + /** + * @throws InterruptedException If interrupted. + */ + public void waitCustomEvent() throws InterruptedException { + synchronized (mux) { + while (blockedMsgs.isEmpty()) + mux.wait(); + } + } + + /** + * + */ + public void stopBlock() { + List<DiscoverySpiCustomMessage> msgs; + + synchronized (this) { + msgs = new ArrayList<>(blockedMsgs); + + blockCustomEvt = false; + + blockedMsgs.clear(); + } + + for (DiscoverySpiCustomMessage msg : msgs) { + log.info("Resend blocked message: " + msg); + + super.sendCustomEvent(msg); + } + } + } + + /** * Tests that message listener registers only for one oldest node. * * @throws Exception If an error occurred. http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java new file mode 100644 index 0000000..75e7d22 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.messaging; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteMessaging; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jsr166.ThreadLocalRandom8; +import org.junit.Assert; + +/** + * + */ +public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest implements Serializable { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Threads number for multi-thread tests. */ + private static final int THREADS = 10; + + /** */ + private final String TOPIC = "topic"; + + /** */ + private final String msgStr = "message"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * Checks if use default mode, local listeners execute in the same thread, 1 node in topology. + * + * @throws Exception If failed. + */ + public void testSendDefaultMode() throws Exception { + Ignite ignite1 = startGrid(1); + + send(ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread> () { + @Override public void apply(String msg, Thread thread) { + Assert.assertEquals(Thread.currentThread(), thread); + Assert.assertEquals(msgStr, msg); + } + }); + } + + /** + * Checks if use async mode, local listeners execute in another thread, 1 node in topology. + * + * @throws Exception If failed. + */ + public void testSendAsyncMode() throws Exception { + Ignite ignite1 = startGrid(1); + + send(ignite1.message().withAsync(), msgStr, new IgniteBiInClosure<String, Thread> () { + @Override public void apply(String msg, Thread thread) { + Assert.assertTrue(!Thread.currentThread().equals(thread)); + Assert.assertEquals(msgStr, msg); + } + }); + } + + /** + * Checks if use default mode, local listeners execute in the same thread, 2 nodes in topology. + * + * @throws Exception If failed. + */ + public void testSendDefaultMode2Nodes() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + sendWith2Nodes(ignite2, ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread> () { + @Override public void apply(String msg, Thread thread) { + Assert.assertEquals(Thread.currentThread(), thread); + Assert.assertEquals(msgStr, msg); + } + }); + } + + /** + * Checks if use async mode, local listeners execute in another thread, 2 nodes in topology. + * + * @throws Exception If failed. + */ + public void testSendAsyncMode2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + sendWith2Nodes(ignite2, ignite1.message().withAsync(), msgStr, new IgniteBiInClosure<String, Thread> () { + @Override public void apply(String msg, Thread thread) { + Assert.assertTrue(!Thread.currentThread().equals(thread)); + Assert.assertEquals(msgStr, msg); + } + }); + } + + /** + * Checks that sendOrdered works in thread pool, 1 node in topology. + * + * @throws Exception If failed. + */ + public void testSendOrderedDefaultMode() throws Exception { + Ignite ignite1 = startGrid(1); + + final List<String> msgs = orderedMessages(); + + sendOrdered(ignite1.message(), msgs, new IgniteBiInClosure< List<String>, List<Thread>> () { + @Override public void apply(List<String> received, List<Thread> threads) { + assertFalse(threads.contains(Thread.currentThread())); + assertTrue(msgs.equals(received)); + } + }); + } + + /** + * Checks that sendOrdered work in thread pool, 1 node in topology. + * + * @throws Exception If failed. + */ + public void testSendOrderedAsyncMode() throws Exception { + Ignite ignite1 = startGrid(1); + + final List<String> msgs = orderedMessages(); + + sendOrdered(ignite1.message().withAsync(), msgs, new IgniteBiInClosure< List<String>, List<Thread>> () { + @Override public void apply(List<String> received, List<Thread> threads) { + assertFalse(threads.contains(Thread.currentThread())); + assertTrue(msgs.equals(received)); + } + }); + } + + /** + * Checks that sendOrdered work in thread pool, 2 nodes in topology. + * + * @throws Exception If failed. + */ + public void testSendOrderedDefaultMode2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + final List<String> msgs = orderedMessages(); + + sendOrderedWith2Node(ignite2, ignite1.message(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() { + @Override public void apply(List<String> received, List<Thread> threads) { + assertFalse(threads.contains(Thread.currentThread())); + assertTrue(msgs.equals(received)); + } + }); + } + + /** + * Checks that sendOrdered work in thread pool, 2 nodes in topology. + * + * @throws Exception If failed. + */ + public void testSendOrderedAsyncMode2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + final List<String> msgs = orderedMessages(); + + sendOrderedWith2Node(ignite2, ignite1.message().withAsync(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() { + @Override public void apply(List<String> received, List<Thread> threads) { + assertFalse(threads.contains(Thread.currentThread())); + assertTrue(msgs.equals(received)); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testSendOrderedDefaultModeMultiThreads() throws Exception { + Ignite ignite = startGrid(1); + + sendOrderedMultiThreads(ignite.message()); + } + + /** + * @throws Exception If failed. + */ + public void testSendOrderedAsyncModeMultiThreads() throws Exception { + Ignite ignite = startGrid(1); + + sendOrderedMultiThreads(ignite.message().withAsync()); + } + + /** + * @throws Exception If failed. + */ + public void testSendOrderedDefaultModeMultiThreadsWith2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message()); + } + + /** + * @throws Exception If failed. + */ + public void testSendOrderedAsyncModeMultiThreadsWith2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message().withAsync()); + } + + /** + * @param ignite2 Second node. + * @param ignMsg IgniteMessage. + * @throws Exception If failed. + */ + private void sendOrderedMultiThreadsWith2Node( + final Ignite ignite2, + final IgniteMessaging ignMsg + ) throws Exception { + final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap(); + final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap(); + + final List<String> msgs = orderedMessages(); + + sendOrderedMultiThreadsWith2Node(ignite2, ignMsg, expMsg, actlMsg, msgs); + + } + + /** + * @param ignMsg IgniteMessaging. + * @throws Exception If failed. + */ + private void sendOrderedMultiThreads( + final IgniteMessaging ignMsg + ) throws Exception { + final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap(); + final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap(); + + final List<String> msgs = orderedMessages(); + + sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs); + } + + /** + * @param ignite2 Second node. + * @param ignMsg Ignite for send message. + * @param expMsg Expected messages map. + * @param actlMsg Actual message map. + * @param msgs List of messages. + * @throws Exception If failed. + */ + private void sendOrderedMultiThreadsWith2Node( + final Ignite ignite2, + final IgniteMessaging ignMsg, + final ConcurrentMap<String, List<String>> expMsg, + final ConcurrentMap<String, List<String>> actlMsg, + final List<String> msgs + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size()); + + final ConcurrentMap<String, List<String>> actlMsgNode2 = Maps.newConcurrentMap(); + + ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() { + @Override public boolean apply(UUID uuid, Message msg) { + actlMsgNode2.putIfAbsent(msg.threadName, Lists.<String>newArrayList()); + + actlMsgNode2.get(msg.threadName).add(msg.msg); + + latch.countDown(); + + return true; + } + }); + + sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs); + + latch.await(); + + assertEquals(expMsg.size(), actlMsgNode2.size()); + + for (Map.Entry<String, List<String>> entry : expMsg.entrySet()) + assertTrue(actlMsgNode2.get(entry.getKey()).equals(entry.getValue())); + } + + /** + * @param ignMsg Ignite for send message. + * @param expMsg Expected messages map. + * @param actlMsg Actual message map. + * @param msgs List of messages. + * @throws Exception If failed. + */ + private void sendOrderedMultiThreads( + final IgniteMessaging ignMsg, + final ConcurrentMap<String, List<String>> expMsg, + final ConcurrentMap<String, List<String>> actlMsg, + final List<String> msgs + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size()); + + ignMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() { + @Override public boolean apply(UUID uuid, Message msg) { + actlMsg.putIfAbsent(msg.threadName, Lists.<String>newArrayList()); + + actlMsg.get(msg.threadName).add(msg.msg); + + latch.countDown(); + + return true; + } + }); + + for (int i = 0; i < THREADS; i++) + new Thread(new Runnable() { + @Override public void run() { + String thdName = Thread.currentThread().getName(); + + List<String> exp = Lists.newArrayList(); + + expMsg.put(thdName, exp); + + for (String msg : msgs) { + exp.add(msg); + + ignMsg.sendOrdered(TOPIC, new Message(thdName, msg), 1000); + } + + } + }).start(); + + latch.await(); + + assertEquals(expMsg.size(), actlMsg.size()); + + for (Map.Entry<String, List<String>> entry : expMsg.entrySet()) + assertTrue(actlMsg.get(entry.getKey()).equals(entry.getValue())); + } + + /** + * @param ignite2 Second node. + * @param igniteMsg Ignite message. + * @param msgStr Message string. + * @param cls Callback for compare result. + * @throws Exception If failed. + */ + private void sendWith2Nodes( + final Ignite ignite2, + final IgniteMessaging igniteMsg, + final String msgStr, + final IgniteBiInClosure<String, Thread> cls + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + + ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() { + @Override public boolean apply(UUID uuid, String msg) { + Assert.assertEquals(msgStr, msg); + + latch.countDown(); + + return true; + } + }); + + send(igniteMsg, msgStr, cls); + + latch.await(); + } + + /** + * @param igniteMsg Ignite messaging. + * @param msgStr Message string. + * @param cls Callback for compare result. + * @throws Exception If failed. + */ + private void send( + final IgniteMessaging igniteMsg, + final String msgStr, + final IgniteBiInClosure<String, Thread> cls + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + + final AtomicReference<Thread> thread = new AtomicReference<>(); + final AtomicReference<String> val = new AtomicReference<>(); + + igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() { + @Override public boolean apply(UUID uuid, String msgStr) { + thread.set(Thread.currentThread()); + + val.set(msgStr); + + latch.countDown(); + + return true; + } + }); + + igniteMsg.send(TOPIC, msgStr); + + latch.await(); + + cls.apply(val.get(), thread.get()); + } + + /** + * @param ignite2 Second node. + * @param igniteMsg Ignite message. + * @param msgs messages for send. + * @param cls Callback for compare result. + * @throws Exception If failed. + */ + private void sendOrderedWith2Node( + final Ignite ignite2, + final IgniteMessaging igniteMsg, + final List<String> msgs, + final IgniteBiInClosure<List<String>, List<Thread>> cls + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(msgs.size()); + + final List<String> received = Lists.newArrayList(); + + ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() { + @Override public boolean apply(UUID uuid, String msg) { + received.add(msg); + + latch.countDown(); + + return true; + } + }); + + sendOrdered(igniteMsg, msgs, cls); + + latch.await(); + + assertTrue(msgs.equals(received)); + } + + /** + * @param igniteMsg Ignite message. + * @param msgs messages for send. + * @param cls Callback for compare result. + * @throws Exception If failed. + */ + private<T> void sendOrdered( + final IgniteMessaging igniteMsg, + final List<T> msgs, + final IgniteBiInClosure<List<T>,List<Thread>> cls + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(msgs.size()); + + final List<T> received = Lists.newArrayList(); + final List<Thread> threads = Lists.newArrayList(); + + for (T msg : msgs) + igniteMsg.sendOrdered(TOPIC, msg, 1000); + + igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, T>() { + @Override public boolean apply(UUID uuid, T s) { + received.add(s); + + threads.add(Thread.currentThread()); + + latch.countDown(); + + return true; + } + }); + + latch.await(); + + cls.apply(received, threads); + } + + /** + * @return List of ordered messages + */ + private List<String> orderedMessages() { + final List<String> msgs = Lists.newArrayList(); + + for (int i = 0; i < 1000; i++) + msgs.add(String.valueOf(ThreadLocalRandom8.current().nextInt())); + + return msgs; + } + + /** + * + */ + private static class Message implements Serializable{ + /** Thread name. */ + private final String threadName; + + /** Message. */ + private final String msg; + + /** + * @param threadName Thread name. + * @param msg Message. + */ + private Message(String threadName, String msg) { + this.threadName = threadName; + this.msg = msg; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 9e20d2a..688edf7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -56,6 +56,7 @@ import org.apache.ignite.marshaller.DynamicProxySerializationMultiJvmSelfTest; import org.apache.ignite.marshaller.MarshallerContextSelfTest; import org.apache.ignite.messaging.GridMessagingNoPeerClassLoadingSelfTest; import org.apache.ignite.messaging.GridMessagingSelfTest; +import org.apache.ignite.messaging.IgniteMessagingSendAsyncTest; import org.apache.ignite.messaging.IgniteMessagingWithClientTest; import org.apache.ignite.plugin.security.SecurityPermissionSetBuilderTest; import org.apache.ignite.spi.GridSpiLocalHostInjectionTest; @@ -101,6 +102,7 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTest(new TestSuite(GridSelfTest.class)); suite.addTest(new TestSuite(ClusterGroupHostsSelfTest.class)); suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class)); + suite.addTest(new TestSuite(IgniteMessagingSendAsyncTest.class)); GridTestUtils.addTestIfNeeded(suite, ClusterGroupSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridMessagingSelfTest.class, ignoredTests); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java index 8ffea8c..3db68c4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@ -147,7 +147,7 @@ public class HadoopShuffle extends HadoopComponent { if (msg instanceof Message) ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL); else - ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0); + ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false); } /**