Added more debug info in test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/02d398c6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/02d398c6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/02d398c6 Branch: refs/heads/master Commit: 02d398c69ce63b375357dc90377c6c03bd4f0d1b Parents: e531919 Author: sboikov <sboi...@gridgain.com> Authored: Mon Sep 14 11:18:15 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Sep 14 11:18:15 2015 +0300 ---------------------------------------------------------------------- .../stream/socket/SocketStreamerSelfTest.java | 27 +++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/02d398c6/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java index 185599d..da15de3 100644 --- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java @@ -36,6 +36,7 @@ import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.marshaller.Marshaller; @@ -88,7 +89,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { - startGrids(GRID_CNT); + startGridsMultiThreaded(GRID_CNT); try (ServerSocket sock = new ServerSocket(0)) { port = sock.getLocalPort(); @@ -232,8 +233,9 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest { * @param converter Converter. * @param r Runnable.. */ - private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception - { + private void test(@Nullable SocketMessageConverter<Tuple> converter, + @Nullable byte[] delim, + Runnable r) throws Exception { SocketStreamer<Tuple, Integer, String> sockStmr = null; Ignite ignite = grid(0); @@ -243,7 +245,6 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest { cache.clear(); try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) { - stmr.allowOverwrite(true); stmr.autoFlushFrequency(10); @@ -268,8 +269,12 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest { final CountDownLatch latch = new CountDownLatch(CNT); + final GridConcurrentHashSet<CacheEvent> evts = new GridConcurrentHashSet<>(); + IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() { @Override public boolean apply(UUID uuid, CacheEvent evt) { + evts.add(evt); + latch.countDown(); return true; @@ -284,8 +289,18 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest { latch.await(); - for (int i = 0; i < CNT; i++) - assertEquals(Integer.toString(i), cache.get(i)); + for (int i = 0; i < CNT; i++) { + Object val = cache.get(i); + String exp = Integer.toString(i); + + if (!exp.equals(val)) + log.error("Unexpected cache value [key=" + i + + ", exp=" + exp + + ", val=" + val + + ", evts=" + evts + ']'); + + assertEquals(exp, val); + } assertEquals(CNT, cache.size(CachePeekMode.PRIMARY)); }