Repository: ignite
Updated Branches:
  refs/heads/ignite-1537 679cda5d2 -> 31f0ddf5e


ignite-1.5


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/31f0ddf5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/31f0ddf5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/31f0ddf5

Branch: refs/heads/ignite-1537
Commit: 31f0ddf5e3c8aee63b66cd8dd571221a2bcb6b64
Parents: 679cda5
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Dec 24 12:01:57 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Dec 24 12:01:57 2015 +0300

----------------------------------------------------------------------
 .../stream/mqtt/IgniteMqttStreamerTest.java     | 49 ++++++++++++++------
 1 file changed, 34 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/31f0ddf5/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
 
b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 891866d..6c7f67a 100644
--- 
a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ 
b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -37,12 +37,14 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridMapEntry;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.stream.StreamMultipleTupleExtractor;
 import org.apache.ignite.stream.StreamSingleTupleExtractor;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import com.github.rholder.retry.StopStrategies;
@@ -87,7 +89,7 @@ public class IgniteMqttStreamerTest extends 
GridCommonAbstractTest {
     private MqttStreamer<Integer, String> streamer;
 
     /** The UUID of the currently active remote listener. */
-    private UUID remoteListener;
+    private UUID remoteLsnr;
 
     /** The Ignite data streamer. */
     private IgniteDataStreamer<Integer, String> dataStreamer;
@@ -105,7 +107,8 @@ public class IgniteMqttStreamerTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    @Before @SuppressWarnings("unchecked")
+    @Before
+    @SuppressWarnings("unchecked")
     public void beforeTest() throws Exception {
         grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
 
@@ -121,13 +124,13 @@ public class IgniteMqttStreamerTest extends 
GridCommonAbstractTest {
         broker.setPersistenceAdapter(null);
         broker.setPersistenceFactory(null);
 
-        PolicyMap policyMap = new PolicyMap();
-        PolicyEntry policy = new PolicyEntry();
+        PolicyMap plcMap = new PolicyMap();
+        PolicyEntry plc = new PolicyEntry();
 
-        policy.setQueuePrefetch(1);
+        plc.setQueuePrefetch(1);
 
-        broker.setDestinationPolicy(policyMap);
-        broker.getDestinationPolicy().setDefaultEntry(policy);
+        broker.setDestinationPolicy(plcMap);
+        broker.getDestinationPolicy().setDefaultEntry(plc);
         broker.setSchedulerSupport(false);
 
         // add the MQTT transport connector to the broker
@@ -204,16 +207,32 @@ public class IgniteMqttStreamerTest extends 
GridCommonAbstractTest {
 
         // action time: repeat 5 times; make sure the connection state is kept 
correctly every time
         for (int i = 0; i < 5; i++) {
+            log.info("Iteration: " + i);
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return streamer.isConnected();
+                }
+            }, 2000);
+
             assertTrue(streamer.isConnected());
 
             broker.stop();
 
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return !streamer.isConnected();
+                }
+            }, 2000);
+
             assertFalse(streamer.isConnected());
 
-            broker.start(true);
-            broker.waitUntilStarted();
+            if (i < 4) {
+                broker.start(true);
+                broker.waitUntilStarted();
 
-            Thread.sleep(500);
+                Thread.sleep(500);
+            }
         }
     }
 
@@ -355,7 +374,7 @@ public class IgniteMqttStreamerTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testSingleTopic_NoQoS_Reconnect() throws Exception {
         // configure streamer
@@ -557,7 +576,7 @@ public class IgniteMqttStreamerTest extends 
GridCommonAbstractTest {
         // Listen to cache PUT events and expect as many as messages as test 
data items
         final CountDownLatch latch = new CountDownLatch(expect);
 
-        IgniteBiPredicate<UUID, CacheEvent> callback = new 
IgniteBiPredicate<UUID, CacheEvent>() {
+        IgniteBiPredicate<UUID, CacheEvent> cb = new IgniteBiPredicate<UUID, 
CacheEvent>() {
             @Override public boolean apply(UUID uuid, CacheEvent evt) {
                 latch.countDown();
 
@@ -565,8 +584,8 @@ public class IgniteMqttStreamerTest extends 
GridCommonAbstractTest {
             }
         };
 
-        remoteListener = ignite.events(ignite.cluster().forCacheNodes(null))
-            .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
+        remoteLsnr = ignite.events(ignite.cluster().forCacheNodes(null))
+            .remoteListen(cb, null, EVT_CACHE_OBJECT_PUT);
 
         return latch;
     }
@@ -586,7 +605,7 @@ public class IgniteMqttStreamerTest extends 
GridCommonAbstractTest {
         assertEquals(cnt, cache.size(CachePeekMode.ALL));
 
         // remove the event listener
-        
grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
+        
grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteLsnr);
     }
 
     /**

Reply via email to