review
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/056490d2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/056490d2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/056490d2 Branch: refs/heads/ignite-1.4-slow-server-debug Commit: 056490d239c46a3950234ce7ea9afc4b203a00c3 Parents: cb0d432 Author: Yakov Zhdanov <[email protected]> Authored: Thu Oct 22 13:52:43 2015 +0300 Committer: Yakov Zhdanov <[email protected]> Committed: Thu Oct 22 13:52:43 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/stream/mqtt/MqttStreamer.java | 125 ++++++++++--------- .../stream/mqtt/IgniteMqttStreamerTest.java | 71 +++++------ .../mqtt/IgniteMqttStreamerTestSuite.java | 2 - 3 files changed, 102 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/056490d2/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java index 39d8d6e..a075695 100644 --- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java +++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java @@ -74,10 +74,8 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; * {@link #setConnectOptions(MqttConnectOptions)} setter. * * @see <a href="https://github.com/rholder/guava-retrying">guava-retrying library</a> - * */ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback { - /** Logger. */ private IgniteLogger log; @@ -96,8 +94,10 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme /** The topics to subscribe to, if many. */ private List<String> topics; - /** The qualities of service to use for multiple topic subscriptions. If specified, it must contain the same - * number of elements as {@link #topics}. */ + /** + * The qualities of service to use for multiple topic subscriptions. If specified, it must contain the same + * number of elements as {@link #topics}. + */ private List<Integer> qualitiesOfService; /** The MQTT client ID (optional). */ @@ -118,8 +118,10 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme /** If disconnecting forcibly, the timeout. */ private Integer disconnectForciblyTimeout; - /** The strategy to determine how long to wait between retry attempts. By default, this streamer uses a - * Fibonacci-based strategy. */ + /** + * The strategy to determine how long to wait between retry attempts. By default, this streamer uses a + * Fibonacci-based strategy. + */ private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait(); /** The strategy to determine when to stop retrying to (re-)connect. By default, we never stop. */ @@ -149,7 +151,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme if (!stopped) throw new IgniteException("Attempted to start an already started MQTT Streamer"); - // for simplicity, if these are null initialize to empty lists + // For simplicity, if these are null initialize to empty lists. topics = topics == null ? new ArrayList<String>() : topics; qualitiesOfService = qualitiesOfService == null ? new ArrayList<Integer>() : qualitiesOfService; @@ -157,45 +159,47 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme try { Map<String, Object> logValues = new HashMap<>(); - // parameter validations + // Parameter validations. A.notNull(getStreamer(), "streamer"); A.notNull(getIgnite(), "ignite"); - A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null), "tuple extractor missing"); - A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " + - "both single and multiple tuple extractor"); + A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null), + "tuple extractor missing"); + A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, + "cannot provide both single and multiple tuple extractor"); A.notNullOrEmpty(brokerUrl, "broker URL"); - // if the client ID is empty, generate one + // If the client ID is empty, generate one. if (clientId == null || clientId.length() == 0) clientId = MqttClient.generateClientId(); - // if we have both a single topic and a list of topics (but the list of topic is not of - // size 1 and == topic, as this would be a case of re-initialization), fail + // If we have both a single topic and a list of topics (but the list of topic is not of + // size 1 and == topic, as this would be a case of re-initialization), fail. if (topic != null && topic.length() > 0 && !topics.isEmpty() && topics.size() != 1 && !topics.get(0).equals(topic)) - throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time"); + throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time."); - // same as above but for QoS + // Same as above but for QoS. if (qualityOfService != null && !qualitiesOfService.isEmpty() && qualitiesOfService.size() != 1 && !qualitiesOfService.get(0).equals(qualityOfService)) - throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time"); + throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time."); - // Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly + // Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly. if (disconnectForcibly && disconnectQuiesceTimeout != null) A.notNull(disconnectForciblyTimeout, "disconnect timeout cannot be null when disconnecting forcibly " + "with quiesce"); - // if we have multiple topics + // If we have multiple topics. if (!topics.isEmpty()) { for (String t : topics) A.notNullOrEmpty(t, "topic in list of topics"); - A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " + - "service must be either empty or have the same size as topics list"); + A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), + "qualities of service must be either empty or have the same size as topics list"); logValues.put("topics", topics); } - else { // just the single topic + else { + // Just the single topic. topics.add(topic); if (qualityOfService != null) @@ -204,29 +208,29 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme logValues.put("topic", topic); } - // finish building log values + // Finish building log values. logValues.put("brokerUrl", brokerUrl); logValues.put("clientId", clientId); - // cache log values + // Cache log values. cachedLogValues = "[" + Joiner.on(", ").withKeyValueSeparator("=").join(logValues) + "]"; - // create logger + // Create logger. log = getIgnite().log(); - // create the mqtt client + // Create the MQTT client. if (persistence == null) client = new MqttClient(brokerUrl, clientId); else client = new MqttClient(brokerUrl, clientId, persistence); - // set this as a callback + // Set this as a callback. client.setCallback(this); - // set stopped to false, as the connection will start async + // Set stopped to false, as the connection will start async. stopped = false; - // build retrier + // Build retrier. Retryer<Boolean> retrier = RetryerBuilder.<Boolean>newBuilder() .retryIfResult(new Predicate<Boolean>() { @Override public boolean apply(Boolean connected) { @@ -238,29 +242,29 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme .withStopStrategy(retryStopStrategy) .build(); - // create the connection retrier + // Create the connection retrier. connectionRetrier = new MqttConnectionRetrier(retrier); log.info("Starting MQTT Streamer " + cachedLogValues); - // connect + // Connect. connectionRetrier.connect(); - } - catch (Throwable t) { - throw new IgniteException("Exception while initializing MqttStreamer", t); + catch (Exception e) { + throw new IgniteException("Failed to initialize MQTT Streamer.", e); } - } /** * Stops streamer. + * + * @throws IgniteException If failed. */ public void stop() throws IgniteException { if (stopped) - throw new IgniteException("Attempted to stop an already stopped MQTT Streamer"); + throw new IgniteException("Failed to stop MQTT Streamer (already stopped)."); - // stop the retrier + // Stop the retrier. connectionRetrier.stop(); try { @@ -273,23 +277,22 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme else client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout); - - } else { + } + else { if (disconnectQuiesceTimeout == null) client.disconnect(); else client.disconnect(disconnectQuiesceTimeout); - } client.close(); + connected = false; stopped = true; - } - catch (Throwable t) { - throw new IgniteException("Exception while stopping MqttStreamer", t); + catch (Exception e) { + throw new IgniteException("Failed to stop Exception while stopping MQTT Streamer.", e); } } @@ -502,9 +505,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme } /** - * Sets whether to disconnect forcibly or not when shutting down. By default, it's <tt>false</tt>. + * Sets whether to disconnect forcibly or not when shutting down. By default, it's {@code false}. * - * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's <tt>false</tt>. + * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's {@code false}. */ public void setDisconnectForcibly(boolean disconnectForcibly) { this.disconnectForcibly = disconnectForcibly; @@ -593,7 +596,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme } /** - * Sets whether to block the start() method until connected for the first time. By default, it's <tt>false</tt>. + * Sets whether to block the start() method until connected for the first time. By default, it's {@code false}. * * @param blockUntilConnected Whether to block or not. */ @@ -601,6 +604,11 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme this.blockUntilConnected = blockUntilConnected; } + /** + * Gets whether to block the start() method until connected for the first time. By default, it's {@code false}. + * + * @return {@code true} if should connect synchronously in start. + */ public boolean isBlockUntilConnected() { return blockUntilConnected; } @@ -608,7 +616,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme /** * Returns whether this streamer is stopped. * - * @return <tt>true</tt> if stopped; <tt>false</tt> if not. + * @return {@code true} if stopped; {@code false} if not. */ public boolean isStopped() { return stopped; @@ -617,7 +625,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme /** * Returns whether this streamer is connected. * - * @return <tt>true</tt> if connected; <tt>false</tt> if not. + * @return {@code true} if connected; {@code false} if not. */ public boolean isConnected() { return connected; @@ -628,15 +636,15 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme * the (re-)connections. */ private class MqttConnectionRetrier { - /** The guava-retrying retrier object. */ private final Retryer<Boolean> retrier; /** Single-threaded pool. */ - private ExecutorService executor = Executors.newSingleThreadExecutor(); + private ExecutorService exec = Executors.newSingleThreadExecutor(); /** * Constructor. + * * @param retrier The retryier object. */ public MqttConnectionRetrier(Retryer<Boolean> retrier) { @@ -649,21 +657,21 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme public void connect() { Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - // if we're already connected, return immediately + // If we're already connected, return immediately. if (connected) return true; if (stopped) return false; - // connect to broker + // Connect to broker. if (connectOptions == null) client.connect(); else client.connect(connectOptions); - // always use the multiple topics variant of the mqtt client; even if the user specified a single - // topic and/or QoS, the initialization code would have placed it inside the 1..n structures + // Always use the multiple topics variant of the mqtt client; even if the user specified a single + // topic and/or QoS, the initialization code would have placed it inside the 1..n structures. if (qualitiesOfService.isEmpty()) client.subscribe(topics.toArray(new String[0])); @@ -679,11 +687,12 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme log.info("MQTT Streamer (re-)connected and subscribed " + cachedLogValues); connected = true; - return connected; + + return true; } }); - Future<Boolean> result = executor.submit(callable); + Future<Boolean> result = exec.submit(callable); if (blockUntilConnected) { try { @@ -699,9 +708,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme * Stops this connection utility class by shutting down the thread pool. */ public void stop() { - executor.shutdownNow(); + exec.shutdownNow(); } - } - } http://git-wip-us.apache.org/repos/asf/ignite/blob/056490d2/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java index 76404b8..6b07fde 100644 --- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java +++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java @@ -60,11 +60,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; /** * Test for {@link MqttStreamer}. - * - * @author Raul Kripalani */ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { - /** The test data. */ private static final Map<Integer, String> TEST_DATA = new HashMap<>(); @@ -106,8 +103,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * - * @throws Exception + * @throws Exception If failed. */ @Before @SuppressWarnings("unchecked") public void beforeTest() throws Exception { @@ -154,8 +150,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * - * @throws Exception + * @throws Exception If failed. */ @After public void afterTest() throws Exception { @@ -175,7 +170,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @throws Exception If failed. */ public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception { // configure streamer @@ -197,7 +192,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @throws Exception If failed. */ public void testMultipleTopics_NoQoS_OneEntryPerMessage() throws Exception { // configure streamer @@ -223,7 +218,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @throws Exception If failed. */ public void testSingleTopic_NoQoS_MultipleEntriesOneMessage() throws Exception { // configure streamer @@ -245,7 +240,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @throws Exception If failed. */ public void testMultipleTopics_NoQoS_MultipleEntriesOneMessage() throws Exception { // configure streamer @@ -271,7 +266,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @throws Exception If failed. */ public void testSingleTopic_NoQoS_ConnectOptions_Durable() throws Exception { // configure streamer @@ -312,7 +307,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @throws Exception If failed. */ public void testSingleTopic_NoQoS_Reconnect() throws Exception { // configure streamer @@ -358,7 +353,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @throws Exception If failed. */ public void testSingleTopic_NoQoS_RetryOnce() throws Exception { // configure streamer @@ -397,7 +392,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @throws Exception If failed. */ public void testMultipleTopics_MultipleQoS_OneEntryPerMessage() throws Exception { // configure streamer @@ -424,7 +419,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @throws Exception If failed. */ public void testMultipleTopics_MultipleQoS_Mismatch() throws Exception { // configure streamer @@ -443,7 +438,8 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @param dataStreamer Streamer. + * @return MQTT streamer. */ private MqttStreamer<Integer, String> createMqttStreamer(IgniteDataStreamer<Integer, String> dataStreamer) { MqttStreamer<Integer, String> streamer = new MqttStreamer<>(); @@ -460,10 +456,15 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @param topics Topics. + * @param fromIdx From index. + * @param cnt Count. + * @param singleMsg Single message flag. + * @throws MqttException If failed. */ - private void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException { - if (singleMessage) { + private void sendMessages(final List<String> topics, int fromIdx, int cnt, boolean singleMsg) + throws MqttException { + if (singleMsg) { final List<StringBuilder> sbs = new ArrayList<>(topics.size()); // initialize String Builders for each topic @@ -474,7 +475,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { }); // fill String Builders for each topic - F.forEach(F.range(fromIdx, fromIdx + count), new IgniteInClosure<Integer>() { + F.forEach(F.range(fromIdx, fromIdx + cnt), new IgniteInClosure<Integer>() { @Override public void apply(Integer integer) { sbs.get(integer % topics.size()).append(integer.toString() + "," + TEST_DATA.get(integer) + "\n"); } @@ -488,7 +489,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } } else { - for (int i = fromIdx; i < fromIdx + count; i++) { + for (int i = fromIdx; i < fromIdx + cnt; i++) { byte[] payload = (i + "," + TEST_DATA.get(i)).getBytes(); MqttMessage msg = new MqttMessage(payload); @@ -499,14 +500,16 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @param expect Expected count. + * @return Latch to be counted down in listener. */ private CountDownLatch subscribeToPutEvents(int expect) { Ignite ignite = grid(); // Listen to cache PUT events and expect as many as messages as test data items final CountDownLatch latch = new CountDownLatch(expect); - @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() { + + IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() { @Override public boolean apply(UUID uuid, CacheEvent evt) { latch.countDown(); @@ -514,32 +517,32 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } }; - remoteListener = ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(callback, null, EVT_CACHE_OBJECT_PUT); + remoteListener = ignite.events(ignite.cluster().forCacheNodes(null)) + .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT); + return latch; } /** - * @throws Exception + * @param cnt Count. */ - private void assertCacheEntriesLoaded(int count) { + private void assertCacheEntriesLoaded(int cnt) { // get the cache and check that the entries are present IgniteCache<Integer, String> cache = grid().cache(null); // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache - for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count)) + for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, cnt)) assertEquals(TEST_DATA.get(key), cache.get(key)); // assert that the cache exactly the specified amount of elements - assertEquals(count, cache.size(CachePeekMode.ALL)); + assertEquals(cnt, cache.size(CachePeekMode.ALL)); // remove the event listener grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener); } /** - * Returns a {@link StreamSingleTupleExtractor} for testing. - * - * @throws Exception + * @return {@link StreamSingleTupleExtractor} for testing. */ public static StreamSingleTupleExtractor<MqttMessage, Integer, String> singleTupleExtractor() { return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() { @@ -552,9 +555,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * Returns a {@link StreamMultipleTupleExtractor} for testing. - * - * @throws Exception + * @return {@link StreamMultipleTupleExtractor} for testing. */ public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipleTupleExtractor() { return new StreamMultipleTupleExtractor<MqttMessage, Integer, String>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/056490d2/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java index 413eaab..ed0c2f7 100644 --- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java +++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java @@ -22,8 +22,6 @@ import org.junit.runners.Suite; /** * MQTT streamer tests. - * - * @author Raul Kripalani */ @RunWith(Suite.class) @Suite.SuiteClasses({
