Repository: ignite Updated Branches: refs/heads/ignite-1537 679cda5d2 -> 31f0ddf5e
ignite-1.5 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/31f0ddf5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/31f0ddf5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/31f0ddf5 Branch: refs/heads/ignite-1537 Commit: 31f0ddf5e3c8aee63b66cd8dd571221a2bcb6b64 Parents: 679cda5 Author: sboikov <sboi...@gridgain.com> Authored: Thu Dec 24 12:01:57 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Dec 24 12:01:57 2015 +0300 ---------------------------------------------------------------------- .../stream/mqtt/IgniteMqttStreamerTest.java | 49 ++++++++++++++------ 1 file changed, 34 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/31f0ddf5/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java index 891866d..6c7f67a 100644 --- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java +++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java @@ -37,12 +37,14 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridMapEntry; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.stream.StreamMultipleTupleExtractor; import org.apache.ignite.stream.StreamSingleTupleExtractor; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import com.github.rholder.retry.StopStrategies; @@ -87,7 +89,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { private MqttStreamer<Integer, String> streamer; /** The UUID of the currently active remote listener. */ - private UUID remoteListener; + private UUID remoteLsnr; /** The Ignite data streamer. */ private IgniteDataStreamer<Integer, String> dataStreamer; @@ -105,7 +107,8 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - @Before @SuppressWarnings("unchecked") + @Before + @SuppressWarnings("unchecked") public void beforeTest() throws Exception { grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration()); @@ -121,13 +124,13 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { broker.setPersistenceAdapter(null); broker.setPersistenceFactory(null); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry policy = new PolicyEntry(); + PolicyMap plcMap = new PolicyMap(); + PolicyEntry plc = new PolicyEntry(); - policy.setQueuePrefetch(1); + plc.setQueuePrefetch(1); - broker.setDestinationPolicy(policyMap); - broker.getDestinationPolicy().setDefaultEntry(policy); + broker.setDestinationPolicy(plcMap); + broker.getDestinationPolicy().setDefaultEntry(plc); broker.setSchedulerSupport(false); // add the MQTT transport connector to the broker @@ -204,16 +207,32 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { // action time: repeat 5 times; make sure the connection state is kept correctly every time for (int i = 0; i < 5; i++) { + log.info("Iteration: " + i); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return streamer.isConnected(); + } + }, 2000); + assertTrue(streamer.isConnected()); broker.stop(); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return !streamer.isConnected(); + } + }, 2000); + assertFalse(streamer.isConnected()); - broker.start(true); - broker.waitUntilStarted(); + if (i < 4) { + broker.start(true); + broker.waitUntilStarted(); - Thread.sleep(500); + Thread.sleep(500); + } } } @@ -355,7 +374,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @throws Exception If failed. */ public void testSingleTopic_NoQoS_Reconnect() throws Exception { // configure streamer @@ -557,7 +576,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { // Listen to cache PUT events and expect as many as messages as test data items final CountDownLatch latch = new CountDownLatch(expect); - IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() { + IgniteBiPredicate<UUID, CacheEvent> cb = new IgniteBiPredicate<UUID, CacheEvent>() { @Override public boolean apply(UUID uuid, CacheEvent evt) { latch.countDown(); @@ -565,8 +584,8 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } }; - remoteListener = ignite.events(ignite.cluster().forCacheNodes(null)) - .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT); + remoteLsnr = ignite.events(ignite.cluster().forCacheNodes(null)) + .remoteListen(cb, null, EVT_CACHE_OBJECT_PUT); return latch; } @@ -586,7 +605,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { assertEquals(cnt, cache.size(CachePeekMode.ALL)); // remove the event listener - grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener); + grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteLsnr); } /**