Repository: ignite Updated Branches: refs/heads/master cee9171f1 -> 2cad0ab39
IGNITE-8525: Support for IgniteZeroMqStreamer non-multi-part pub-sub. - Fixes #4020. Signed-off-by: shroman <rsht...@yahoo.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2cad0ab3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2cad0ab3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2cad0ab3 Branch: refs/heads/master Commit: 2cad0ab39665b1d2cc1d4d3edf45e74699070411 Parents: cee9171 Author: shroman <rsht...@yahoo.com> Authored: Wed May 23 13:50:00 2018 +0900 Committer: shroman <rsht...@yahoo.com> Committed: Wed May 23 13:50:00 2018 +0900 ---------------------------------------------------------------------- .../stream/zeromq/IgniteZeroMqStreamer.java | 43 +++++++++++--------- .../stream/zeromq/IgniteZeroMqStreamerTest.java | 24 ++++++++++- 2 files changed, 46 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2cad0ab3/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java ---------------------------------------------------------------------- diff --git a/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java b/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java index 18cb387..1a8e15b 100644 --- a/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java +++ b/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java @@ -95,29 +95,34 @@ public class IgniteZeroMqStreamer<K, V> extends StreamAdapter<byte[], K, V> impl ctx = ZMQ.context(ioThreads); - zeroMqExSrv.execute(new Runnable() { - @Override public void run() { - ZMQ.Socket socket = ctx.socket(socketType); - socket.connect(addr); - - if (ZeroMqTypeSocket.SUB.getType() == socketType) - socket.subscribe(topic); - - while (isStarted) { - try { - if (ZeroMqTypeSocket.SUB.getType() == socketType) - socket.recv(0); - addMessage(socket.recv(0)); - } - catch (ZMQException e) { - if (e.getErrorCode() == ZMQ.Error.ETERM.getCode()) { - break; + zeroMqExSrv.execute(() -> { + ZMQ.Socket socket = ctx.socket(socketType); + socket.connect(addr); + + if (ZeroMqTypeSocket.SUB.getType() == socketType) + socket.subscribe(topic); + + while (isStarted) { + try { + byte[] msg = socket.recv(0); + + if (ZeroMqTypeSocket.SUB.getType() == socketType) { + if (socket.hasReceiveMore()) { + addMessage(socket.recv(0)); + continue; } } - } - socket.close(); + addMessage(msg); + } + catch (ZMQException e) { + if (e.getErrorCode() == ZMQ.Error.ETERM.getCode()) { + break; + } + } } + + socket.close(); }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2cad0ab3/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java b/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java index 5d2d0c2..0992126 100644 --- a/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java +++ b/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java @@ -44,6 +44,9 @@ public class IgniteZeroMqStreamerTest extends GridCommonAbstractTest { /** Topic name for PUB-SUB. */ private final byte[] TOPIC = "0mq".getBytes(); + /** If pub-sub envelopes are used. */ + private static boolean multipart_pubsub; + /** Constructor. */ public IgniteZeroMqStreamerTest() { super(true); @@ -74,6 +77,19 @@ public class IgniteZeroMqStreamerTest extends GridCommonAbstractTest { /** * @throws Exception Test exception. */ + public void testZeroMqSubSocketMultipart() throws Exception { + try (IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { + try (IgniteZeroMqStreamer streamer = newStreamerInstance( + dataStreamer, 3, ZeroMqTypeSocket.SUB, ADDR, TOPIC);) { + multipart_pubsub = true; + executeStreamer(streamer, ZMQ.PUB, TOPIC); + } + } + } + + /** + * @throws Exception Test exception. + */ public void testZeroMqSubSocket() throws Exception { try (IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { try (IgniteZeroMqStreamer streamer = newStreamerInstance( @@ -130,7 +146,7 @@ public class IgniteZeroMqStreamerTest extends GridCommonAbstractTest { String cachedValue = cache.get(testId); // ZeroMQ message successfully put to cache. - assertTrue(cachedValue != null && cachedValue.equals(String.valueOf(testId))); + assertTrue(cachedValue != null && cachedValue.endsWith(String.valueOf(testId))); assertTrue(cache.size() == CACHE_ENTRY_COUNT); @@ -173,7 +189,11 @@ public class IgniteZeroMqStreamerTest extends GridCommonAbstractTest { for (int i = 0; i < CACHE_ENTRY_COUNT; i++) { if (ZMQ.PUB == clientSocket) socket.sendMore(topic); - socket.send(String.valueOf(i).getBytes("UTF-8")); + + if (ZMQ.PUB == clientSocket && multipart_pubsub) + socket.send((topic + " " + String.valueOf(i)).getBytes("UTF-8")); + else + socket.send(String.valueOf(i).getBytes("UTF-8")); } } }