IGNITE-1747 MQTT Streamer: remove 'connected' flag and add 2 tests. MQTT Connection State can be checked by calling MqttClient#isConnected.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e487226 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e487226 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e487226 Branch: refs/heads/ignite-1.4-slow-server-debug Commit: 0e48722679226a0845ff74dd447f77c2da50ece4 Parents: 056490d Author: Raul Kripalani <[email protected]> Authored: Fri Oct 23 17:51:46 2015 +0100 Committer: Raul Kripalani <[email protected]> Committed: Fri Oct 23 17:51:46 2015 +0100 ---------------------------------------------------------------------- .../apache/ignite/stream/mqtt/MqttStreamer.java | 43 +++++++---------- .../stream/mqtt/IgniteMqttStreamerTest.java | 50 +++++++++++++++++++- 2 files changed, 67 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0e487226/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java index a075695..e546da2 100644 --- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java +++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java @@ -136,9 +136,6 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme /** State keeping. */ private volatile boolean stopped = true; - /** State keeping. */ - private volatile boolean connected; - /** Cached log prefix for cache messages. */ private String cachedLogValues; @@ -231,10 +228,10 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme stopped = false; // Build retrier. - Retryer<Boolean> retrier = RetryerBuilder.<Boolean>newBuilder() - .retryIfResult(new Predicate<Boolean>() { - @Override public boolean apply(Boolean connected) { - return !connected; + Retryer<Void> retrier = RetryerBuilder.<Void>newBuilder() + .retryIfResult(new Predicate<Void>() { + @Override public boolean apply(Void v) { + return !client.isConnected() && !stopped; } }) .retryIfException().retryIfRuntimeException() @@ -288,7 +285,6 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme client.close(); - connected = false; stopped = true; } catch (Exception e) { @@ -307,9 +303,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme * {@inheritDoc} */ @Override public void connectionLost(Throwable throwable) { - connected = false; - - // if we have been stopped, we do not try to establish the connection again + // If we have been stopped, we do not try to establish the connection again. if (stopped) return; @@ -623,12 +617,13 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme } /** - * Returns whether this streamer is connected. + * Returns whether this streamer is connected by delegating to the underlying {@link MqttClient#isConnected()} * * @return {@code true} if connected; {@code false} if not. + * @see MqttClient#isConnected() */ public boolean isConnected() { - return connected; + return client.isConnected(); } /** @@ -637,17 +632,17 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme */ private class MqttConnectionRetrier { /** The guava-retrying retrier object. */ - private final Retryer<Boolean> retrier; + private final Retryer<Void> retrier; /** Single-threaded pool. */ - private ExecutorService exec = Executors.newSingleThreadExecutor(); + private final ExecutorService exec = Executors.newSingleThreadExecutor(); /** * Constructor. * * @param retrier The retryier object. */ - public MqttConnectionRetrier(Retryer<Boolean> retrier) { + public MqttConnectionRetrier(Retryer<Void> retrier) { this.retrier = retrier; } @@ -655,14 +650,14 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme * Method called by the streamer to ask us to (re-)connect. */ public void connect() { - Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() { - @Override public Boolean call() throws Exception { + Callable<Void> callable = retrier.wrap(new Callable<Void>() { + @Override public Void call() throws Exception { // If we're already connected, return immediately. - if (connected) - return true; + if (client.isConnected()) + return null; if (stopped) - return false; + return null; // Connect to broker. if (connectOptions == null) @@ -686,13 +681,11 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme log.info("MQTT Streamer (re-)connected and subscribed " + cachedLogValues); - connected = true; - - return true; + return null; } }); - Future<Boolean> result = exec.submit(callable); + Future<Void> result = exec.submit(callable); if (blockUntilConnected) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/0e487226/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java index 6b07fde..891866d 100644 --- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java +++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java @@ -172,6 +172,54 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testConnectDisconnect() throws Exception { + // configure streamer + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setTopic(SINGLE_TOPIC_NAME); + streamer.setBlockUntilConnected(true); + + // action time: repeat 10 times; make sure the connection state is kept correctly every time + for (int i = 0; i < 10; i++) { + streamer.start(); + + assertTrue(streamer.isConnected()); + + streamer.stop(); + + assertFalse(streamer.isConnected()); + } + } + + /** + * @throws Exception If failed. + */ + public void testConnectionStatusWithBrokerDisconnection() throws Exception { + // configure streamer + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setTopic(SINGLE_TOPIC_NAME); + streamer.setBlockUntilConnected(true); + streamer.setRetryWaitStrategy(WaitStrategies.noWait()); + + streamer.start(); + + // action time: repeat 5 times; make sure the connection state is kept correctly every time + for (int i = 0; i < 5; i++) { + assertTrue(streamer.isConnected()); + + broker.stop(); + + assertFalse(streamer.isConnected()); + + broker.start(true); + broker.waitUntilStarted(); + + Thread.sleep(500); + } + } + + /** + * @throws Exception If failed. + */ public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception { // configure streamer streamer.setSingleTupleExtractor(singleTupleExtractor()); @@ -307,7 +355,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception If failed. + * @throws Exception */ public void testSingleTopic_NoQoS_Reconnect() throws Exception { // configure streamer
